[GPU] Native Rapids UDF – Create Custom UDF
在上一篇我們成功實作了 HiveUDF,為了要進一步利用 GPU 加速,我們需要去實作 evaluateColumnar 這一個函數,參考 Spark Rapids 的 Github 與 ColumnView / ColumnVector 裡面的範例,針對我們想做到的 UDF 我們沒有發現適合的函數去實作針對一個 Array[String] 的過濾程式,所以我們需要自己去實作 Tutorial 裡面所謂的 Native Code Examples,Tutorial 裡面針對 HiveUDF 只有提供一個範例是 StringWordCount,本篇我們紀錄藉由這個範例去實作一個支援 GPU 的 HiveUDF。
簡介
關於 cuDF C++ 你所需要知道的事情,libcudf C++ Developer Guide
https://github.com/rapidsai/cudf/blob/branch-24.02/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md#variable-size-and-nested-data-types
UDF 架構
cudf 的 UDF 主要就是輸入一個 column_view 輸出一個 std::unique_ptr<cudf:column> 以下的範例都會是這樣,下圖擷取自網址,說明 cudf 針對不同的型態的資料 column_view 要使用不同的 type 來實作。
Mastering String Transformation in RAPIDS libcudf
在以上的範例裡面我們觀察 cudf 是怎麼對 strings_column 做轉換的,首先他有兩個 column_view 分別是 names 跟 visibilities,然後利用 cudf::string_scalar 定義兩個 cudf::scalar,分別是 visible 跟 redaction,利用 cudf::strings::contains 會回傳一個 cudf::column,所以 allowed 變成一個 boolean type 的 column_view,再透過 cudf::copy_if_else 會再拿到一個新的 cudf::column,他的 type 是 STRING,經過 cudf::strings::split 之後回傳一個 cudf::table,第一個 column 是 first 第二個是 last,最後再重組成一個 table,在範例中利用 cudf::strings::concatenate 就可以再把一個 column::table 組回去。
/**
* @brief Redacts each name per the corresponding visibility entry
*
* This implementation uses libcudf APIs to create the output result.
*
* @param names Column of names
* @param visibilities Column of visibilities
* @return Redacted column of names
*/
std::unique_ptr<cudf::column> redact_strings(cudf::column_view const& names,
cudf::column_view const& visibilities)
{
auto const visible = cudf::string_scalar(std::string("public"));
auto const redaction = cudf::string_scalar(std::string("X X"));
nvtxRangePushA("redact_strings");
auto const allowed = cudf::strings::contains(visibilities, visible);
auto const redacted = cudf::copy_if_else(names, redaction, allowed->view());
auto const first_last = cudf::strings::split(redacted->view());
auto const first = first_last->view().column(0);
auto const last = first_last->view().column(1);
auto const last_initial = cudf::strings::slice_strings(last, 0, 1);
auto const last_initial_first = cudf::table_view({last_initial->view(), first});
auto result = cudf::strings::concatenate(last_initial_first, std::string(" "));
cudaStreamSynchronize(0);
nvtxRangePop();
return result;
}
string_word_count.cu / string_word_count.hpp
參考spark-rapids-example 提供的範例與 Nvidia 官網提供的範例不太一樣,有以下幾個差別:
- 使用 Thrust 函式庫使用 GPU 做迭代平行計算
- 使用 Rapids Memory Manager 做 GPU memory allocation
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "string_word_count.hpp"
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/strings/string_view.cuh>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>
namespace {
// count the words separated by whitespace characters
__device__ cudf::size_type count_words(cudf::column_device_view const& d_strings,
cudf::size_type idx) {
if (d_strings.is_null(idx)) return 0;
cudf::string_view const d_str = d_strings.element<cudf::string_view>(idx);
cudf::size_type word_count = 0;
// run of whitespace is considered a single delimiter
bool spaces = true;
auto itr = d_str.begin();
while (itr != d_str.end()) {
cudf::char_utf8 ch = *itr;
if (spaces == (ch <= ' ')) {
itr++;
} else {
word_count += static_cast<cudf::size_type>(spaces);
spaces = !spaces;
}
}
return word_count;
}
} // anonymous namespace
/**
* @brief Count the words in a string using whitespace as word boundaries
*
* @param strs The column containing the strings
* @param stream The CUDA stream to use
* @return The INT32 column containing the word count results per string
*/
std::unique_ptr<cudf::column> string_word_count(cudf::column_view const& strs) {
auto strings_count = strs.size();
if (strings_count == 0) {
return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32});
}
// the validity of the output matches the validity of the input
rmm::device_buffer null_mask = cudf::copy_bitmask(strs);
// allocate the column that will contain the word count results
std::unique_ptr<cudf::column> result =
cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT32},
strs.size(),
std::move(null_mask),
strs.null_count());
// compute the word counts, writing into the result column data buffer
auto stream = rmm::cuda_stream_default;
auto strs_device_view = cudf::column_device_view::create(strs, stream);
auto d_strs_view = *strs_device_view;
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<cudf::size_type>(0),
thrust::make_counting_iterator<cudf::size_type>(strings_count),
result->mutable_view().data<cudf::size_type>(),
[d_strs_view] __device__(cudf::size_type idx) { return count_words(d_strs_view, idx); });
return result;
}
cosine_similarity.cu / cosine_similarity.hpp
參考在處理 inner product 的時候,需要用到 Thrust 這一個算法庫,根據 Nvidia 官網的介紹,
Thrust is a C++ template library for CUDA based on the Standard Template Library (STL). Thrust allows you to implement high performance parallel applications with minimal programming effort through a high-level interface that is fully interoperable with CUDA C.
https://docs.nvidia.com/cuda/thrust/index.html
在計算 cosine_similarity 的時候,有用到以下這兩個 thrust::all_of 與 thrust::transform,後者是以上兩個範例都有用到的函式,
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cosine_similarity.hpp"
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/lists/lists_column_device_view.cuh>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/table/table_view.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/logical.h>
#include <thrust/transform.h>
#include <cmath>
namespace {
/**
* @brief Functor for computing the cosine similarity between two list of float columns
*/
struct cosine_similarity_functor {
float const* const v1;
float const* const v2;
int32_t const* const v1_offsets;
int32_t const* const v2_offsets;
// This kernel executes thread-per-row which should be fine for relatively short lists
// but may need to be revisited for performance if operating on long lists.
__device__ float operator()(cudf::size_type row_idx) {
auto const v1_start_idx = v1_offsets[row_idx];
auto const v1_num_elems = v1_offsets[row_idx + 1] - v1_start_idx;
auto const v2_start_idx = v2_offsets[row_idx];
auto const v2_num_elems = v2_offsets[row_idx + 1] - v2_start_idx;
auto const num_elems = std::min(v1_num_elems, v2_num_elems);
double mag1 = 0;
double mag2 = 0;
double dot_product = 0;
for (auto i = 0; i < num_elems; i++) {
float const f1 = v1[v1_start_idx + i];
mag1 += f1 * f1;
float const f2 = v2[v2_start_idx + i];
mag2 += f2 * f2;
dot_product += f1 * f2;
}
mag1 = std::sqrt(mag1);
mag2 = std::sqrt(mag2);
return static_cast<float>(dot_product / (mag1 * mag2));
}
};
} // anonymous namespace
/**
* @brief Compute the cosine similarity between two LIST of FLOAT32 columns
*
* The input vectors must have matching shapes, i.e.: same row count and same number of
* list elements per row. A null list row is supported, but null float entries within a
* list are not supported.
*
* @param lv1 The first LIST of FLOAT32 column view
* @param lv2 The second LIST of FLOAT32 column view
* @return A FLOAT32 column containing the cosine similarity corresponding to each input row
*/
std::unique_ptr<cudf::column> cosine_similarity(cudf::lists_column_view const& lv1,
cudf::lists_column_view const& lv2) {
// sanity-check the input types
if (lv1.child().type().id() != lv2.child().type().id() ||
lv1.child().type().id() != cudf::type_id::FLOAT32) {
throw std::invalid_argument("inputs are not lists of floats");
}
// sanity check the input shape
auto const row_count = lv1.size();
if (row_count != lv2.size()) {
throw std::invalid_argument("input row counts do not match");
}
if (row_count == 0) {
return cudf::make_empty_column(cudf::data_type{cudf::type_id::FLOAT32});
}
if (lv1.child().null_count() != 0 || lv2.child().null_count() != 0) {
throw std::invalid_argument("null floats are not supported");
}
auto const stream = rmm::cuda_stream_default;
auto d_view1_ptr = cudf::column_device_view::create(lv1.parent());
auto d_lists1 = cudf::detail::lists_column_device_view(*d_view1_ptr);
auto d_view2_ptr = cudf::column_device_view::create(lv2.parent());
auto d_lists2 = cudf::detail::lists_column_device_view(*d_view2_ptr);
bool const are_offsets_equal =
thrust::all_of(rmm::exec_policy(stream),
thrust::make_counting_iterator<cudf::size_type>(0),
thrust::make_counting_iterator<cudf::size_type>(row_count),
[d_lists1, d_lists2] __device__(cudf::size_type idx) {
auto ldv1 = cudf::list_device_view(d_lists1, idx);
auto ldv2 = cudf::list_device_view(d_lists2, idx);
return ldv1.is_null() || ldv2.is_null() || ldv1.size() == ldv2.size();
});
if (not are_offsets_equal) {
throw std::invalid_argument("input list lengths do not match for every row");
}
// allocate the vector of float results
rmm::device_uvector<float> float_results(row_count, stream);
// compute the cosine similarity
auto const lv1_data = lv1.child().data<float>();
auto const lv2_data = lv2.child().data<float>();
auto const lv1_offsets = lv1.offsets().data<int32_t>();
auto const lv2_offsets = lv2.offsets().data<int32_t>();
thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator<cudf::size_type>(0),
thrust::make_counting_iterator<cudf::size_type>(row_count),
float_results.data(),
cosine_similarity_functor({lv1_data, lv2_data, lv1_offsets, lv2_offsets}));
// the validity of the output is the bitwise-and of the two input validity masks
auto [null_mask, null_count] = cudf::bitmask_and(cudf::table_view({lv1.parent(), lv2.parent()}));
return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::FLOAT32},
row_count,
float_results.release(),
std::move(null_mask),
null_count);
}
實作 Filtering List of String 的函式
上方有說到要實作針對 Array[String] 的過濾會需要同時實作 cudf 裡面的 cudf::lists 跟 cudf::strings,首先我們要先了解 List 在 Apache Arrow 裡面是怎麼做存儲的?在 libcudf c++ Developer Guide 裡面他提供一個複雜的範例:List<List<List<List<int>>>>,真正的整數值會全部攤平放在一個 List<int> 並且依靠多個 Offsets 與 Null array 來達到區分不同層 List 的效果。
lists_column = { {{{1, 2}, {3, 4}}, NULL}, {{{10, 20}, {30, 40}}, {{50, 60, 70}, {0}}} }
List<List<List<int>>> (2 rows):
Length : 2
Offsets : 0, 2, 4
Children :
List<List<int>>:
Length : 4
Offsets : 0, 2, 2, 4, 6
Null count: 1
1101
Children :
List<int>:
Length : 6
Offsets : 0, 2, 4, 6, 8, 11, 12
Children :
Column of ints
1, 2, 3, 4, 10, 20, 30, 40, 50, 60, 70, 0
那麼在 Apache Arrow 裡面是怎麼表示 List<String> 的呢?我們利用以下的例子做說明:
lists_string = [["This", "is", "a"], ["column", "of", "strings"]]
List<String> (2 rows):
Length : 2
Children :
Offsets : 0, 3, 6
String:
Length : 6
Children :
Offsets: 0, 4, 6, 7, 13, 15, 22
Characters: Thisisacolumnofstrings
所以在實作針對 List<String> 開頭是 test 的 filtering 可以使用以下的程式碼:
std::unique_ptr<cudf::column> filtering_startswith_test(cudf::lists_column_view const& lv) {
// sanity-check the input types
if (lv.child().type().id() != cudf::type_id::STRING) {
throw std::invalid_argument("inputs are not lists of string");
}
// sanity check the input shape
auto const row_count = lv.size();
if (row_count == 0) {
return cudf::make_empty_column(cudf::data_type{cudf::type_id::LIST});
}
if (lv.child().null_count() != 0) {
throw std::invalid_argument("null string are not supported");
}
auto const scv = cudf::strings_column_view(lv.child());
auto const rs_start = cudf::string_scalar(std::string("test"));
// build lists of bools column
auto const list_bools = cudf::make_lists_column(lv.size(),
std::make_unique<cudf::column>(lv.offsets()),
cudf::strings::starts_with(scv, rs_start),
lv.null_count(),
cudf::copy_bitmask(lv.parent()));
return cudf::lists::apply_boolean_mask(lv, list_bools->view());
}
另外關於 Struct 在 Apache Arrow 中的 Struct 的儲存方法,以下是 libcudf C++ Developer Guide 提供的範例,
{
type = STRUCT
null_mask = [1, 1, 0, 1]
null_count = 1
children = {
{
type = FLOAT32
data = [1.0, 4.0, X, 8.0]
null_mask = [ 1, 1, 0, 1]
null_count = 1
},
{
type = INT32
data = [2, 5, X, X]
null_mask = [1, 1, 0, 0]
null_count = 2
}
}
}