[GPU] Native Rapids UDF – Create Custom UDF

上一篇我們成功實作了 HiveUDF,為了要進一步利用 GPU 加速,我們需要去實作 evaluateColumnar 這一個函數,參考 Spark Rapids 的 Github 與 ColumnView / ColumnVector 裡面的範例,針對我們想做到的 UDF 我們沒有發現適合的函數去實作針對一個 Array[String] 的過濾程式,所以我們需要自己去實作 Tutorial 裡面所謂的 Native Code Examples,Tutorial 裡面針對 HiveUDF 只有提供一個範例是 StringWordCount,本篇我們紀錄藉由這個範例去實作一個支援 GPU 的 HiveUDF。

關於 cuDF C++ 你所需要知道的事情,簡介

libcudf C++ Developer Guide 

https://github.com/rapidsai/cudf/blob/branch-24.02/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md#variable-size-and-nested-data-types

UDF 架構

cudf 的 UDF 主要就是輸入一個 column_view 輸出一個 std::unique_ptr<cudf:column> 以下的範例都會是這樣,下圖擷取自網址,說明 cudf  針對不同的型態的資料 column_view 要使用不同的 type 來實作。

Mastering String Transformation in RAPIDS libcudf

在以上的範例裡面我們觀察 cudf 是怎麼對 strings_column 做轉換的,首先他有兩個 column_view 分別是 names 跟 visibilities,然後利用 cudf::string_scalar 定義兩個 cudf::scalar,分別是 visible 跟 redaction,利用 cudf::strings::contains 會回傳一個 cudf::column,所以 allowed 變成一個 boolean type 的 column_view,再透過 cudf::copy_if_else 會再拿到一個新的 cudf::column,他的 type 是 STRING,經過 cudf::strings::split 之後回傳一個 cudf::table,第一個 column 是 first 第二個是 last,最後再重組成一個 table,在範例中利用 cudf::strings::concatenate 就可以再把一個 column::table 組回去。

/**
 * @brief Redacts each name per the corresponding visibility entry
 *
 * This implementation uses libcudf APIs to create the output result.
 *
 * @param names Column of names
 * @param visibilities Column of visibilities
 * @return Redacted column of names
 */
std::unique_ptr<cudf::column> redact_strings(cudf::column_view const& names,
                                             cudf::column_view const& visibilities)
{
  auto const visible   = cudf::string_scalar(std::string("public"));
  auto const redaction = cudf::string_scalar(std::string("X X"));

  nvtxRangePushA("redact_strings");

  auto const allowed      = cudf::strings::contains(visibilities, visible);
  auto const redacted     = cudf::copy_if_else(names, redaction, allowed->view());
  auto const first_last   = cudf::strings::split(redacted->view());
  auto const first        = first_last->view().column(0);
  auto const last         = first_last->view().column(1);
  auto const last_initial = cudf::strings::slice_strings(last, 0, 1);

  auto const last_initial_first = cudf::table_view({last_initial->view(), first});

  auto result = cudf::strings::concatenate(last_initial_first, std::string(" "));

  cudaStreamSynchronize(0);

  nvtxRangePop();
  return result;
}

參考 string_word_count.cu / string_word_count.hpp

spark-rapids-example 提供的範例與 Nvidia 官網提供的範例不太一樣,有以下幾個差別:

  1. 使用 Thrust 函式庫使用 GPU 做迭代平行計算
  2. 使用 Rapids Memory Manager 做 GPU memory allocation
/*
 * Copyright (c) 2021-2022, NVIDIA CORPORATION.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "string_word_count.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/strings/string_view.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

namespace {

// count the words separated by whitespace characters
__device__ cudf::size_type count_words(cudf::column_device_view const& d_strings,
                                       cudf::size_type idx) {
  if (d_strings.is_null(idx)) return 0;
  cudf::string_view const d_str = d_strings.element<cudf::string_view>(idx);
  cudf::size_type word_count    = 0;
  // run of whitespace is considered a single delimiter
  bool spaces = true;
  auto itr    = d_str.begin();
  while (itr != d_str.end()) {
    cudf::char_utf8 ch = *itr;
    if (spaces == (ch <= ' ')) {
      itr++;
    } else {
      word_count += static_cast<cudf::size_type>(spaces);
      spaces = !spaces;
    }
  }

  return word_count;
}


} // anonymous namespace

/**
 * @brief Count the words in a string using whitespace as word boundaries
 *
 * @param strs The column containing the strings
 * @param stream The CUDA stream to use
 * @return The INT32 column containing the word count results per string
 */
std::unique_ptr<cudf::column> string_word_count(cudf::column_view const& strs) {
  auto strings_count = strs.size();
  if (strings_count == 0) {
    return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32});
  }

  // the validity of the output matches the validity of the input
  rmm::device_buffer null_mask = cudf::copy_bitmask(strs);

  // allocate the column that will contain the word count results
  std::unique_ptr<cudf::column> result =
    cudf::make_numeric_column(
      cudf::data_type{cudf::type_id::INT32},
      strs.size(),
      std::move(null_mask),
      strs.null_count());

  // compute the word counts, writing into the result column data buffer
  auto stream = rmm::cuda_stream_default;
  auto strs_device_view = cudf::column_device_view::create(strs, stream);
  auto d_strs_view = *strs_device_view;
  thrust::transform(
    rmm::exec_policy(stream),
    thrust::make_counting_iterator<cudf::size_type>(0),
    thrust::make_counting_iterator<cudf::size_type>(strings_count),
    result->mutable_view().data<cudf::size_type>(),
    [d_strs_view] __device__(cudf::size_type idx) { return count_words(d_strs_view, idx); });

  return result;
}

參考 cosine_similarity.cu / cosine_similarity.hpp

在處理 inner product 的時候,需要用到 Thrust 這一個算法庫,根據 Nvidia 官網的介紹,

Thrust is a C++ template library for CUDA based on the Standard Template Library (STL). Thrust allows you to implement high performance parallel applications with minimal programming effort through a high-level interface that is fully interoperable with CUDA C.

https://docs.nvidia.com/cuda/thrust/index.html

在計算 cosine_similarity 的時候,有用到以下這兩個 thrust::all_ofthrust::transform,後者是以上兩個範例都有用到的函式,

/*
 * Copyright (c) 2021-2022, NVIDIA CORPORATION.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "cosine_similarity.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/lists/lists_column_device_view.cuh>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/iterator/counting_iterator.h>
#include <thrust/logical.h>
#include <thrust/transform.h>

#include <cmath>

namespace {

/**
 * @brief Functor for computing the cosine similarity between two list of float columns
 */
struct cosine_similarity_functor {
  float const* const v1;
  float const* const v2;
  int32_t const* const v1_offsets;
  int32_t const* const v2_offsets;

  // This kernel executes thread-per-row which should be fine for relatively short lists
  // but may need to be revisited for performance if operating on long lists.
  __device__ float operator()(cudf::size_type row_idx) {
    auto const v1_start_idx = v1_offsets[row_idx];
    auto const v1_num_elems = v1_offsets[row_idx + 1] - v1_start_idx;
    auto const v2_start_idx = v2_offsets[row_idx];
    auto const v2_num_elems = v2_offsets[row_idx + 1] - v2_start_idx;
    auto const num_elems = std::min(v1_num_elems, v2_num_elems);
    double mag1 = 0;
    double mag2 = 0;
    double dot_product = 0;
    for (auto i = 0; i < num_elems; i++) {
      float const f1 = v1[v1_start_idx + i];
      mag1 += f1 * f1;
      float const f2 = v2[v2_start_idx + i];
      mag2 += f2 * f2;
      dot_product += f1 * f2;
    }
    mag1 = std::sqrt(mag1);
    mag2 = std::sqrt(mag2);
    return static_cast<float>(dot_product / (mag1 * mag2));
  }
};

} // anonymous namespace

/**
 * @brief Compute the cosine similarity between two LIST of FLOAT32 columns
 *
 * The input vectors must have matching shapes, i.e.: same row count and same number of
 * list elements per row. A null list row is supported, but null float entries within a
 * list are not supported.
 *
 * @param lv1 The first LIST of FLOAT32 column view
 * @param lv2 The second LIST of FLOAT32 column view
 * @return A FLOAT32 column containing the cosine similarity corresponding to each input row
 */
std::unique_ptr<cudf::column> cosine_similarity(cudf::lists_column_view const& lv1,
                                                cudf::lists_column_view const& lv2) {
  // sanity-check the input types
  if (lv1.child().type().id() != lv2.child().type().id() ||
      lv1.child().type().id() != cudf::type_id::FLOAT32) {
    throw std::invalid_argument("inputs are not lists of floats");
  }

  // sanity check the input shape
  auto const row_count = lv1.size();
  if (row_count != lv2.size()) {
    throw std::invalid_argument("input row counts do not match");
  }
  if (row_count == 0) {
    return cudf::make_empty_column(cudf::data_type{cudf::type_id::FLOAT32});
  }
  if (lv1.child().null_count() != 0 || lv2.child().null_count() != 0) {
    throw std::invalid_argument("null floats are not supported");
  }

  auto const stream = rmm::cuda_stream_default;
  auto d_view1_ptr = cudf::column_device_view::create(lv1.parent());
  auto d_lists1 = cudf::detail::lists_column_device_view(*d_view1_ptr);
  auto d_view2_ptr = cudf::column_device_view::create(lv2.parent());
  auto d_lists2 = cudf::detail::lists_column_device_view(*d_view2_ptr);
  bool const are_offsets_equal =
    thrust::all_of(rmm::exec_policy(stream),
                   thrust::make_counting_iterator<cudf::size_type>(0),
                   thrust::make_counting_iterator<cudf::size_type>(row_count),
                   [d_lists1, d_lists2] __device__(cudf::size_type idx) {
                     auto ldv1 = cudf::list_device_view(d_lists1, idx);
                     auto ldv2 = cudf::list_device_view(d_lists2, idx);
                     return ldv1.is_null() || ldv2.is_null() || ldv1.size() == ldv2.size();
                   });
  if (not are_offsets_equal) {
    throw std::invalid_argument("input list lengths do not match for every row");
  }

  // allocate the vector of float results
  rmm::device_uvector<float> float_results(row_count, stream);

  // compute the cosine similarity
  auto const lv1_data = lv1.child().data<float>();
  auto const lv2_data = lv2.child().data<float>();
  auto const lv1_offsets = lv1.offsets().data<int32_t>();
  auto const lv2_offsets = lv2.offsets().data<int32_t>();
  thrust::transform(rmm::exec_policy(stream),
                    thrust::make_counting_iterator<cudf::size_type>(0),
                    thrust::make_counting_iterator<cudf::size_type>(row_count),
                    float_results.data(),
                    cosine_similarity_functor({lv1_data, lv2_data, lv1_offsets, lv2_offsets}));

  // the validity of the output is the bitwise-and of the two input validity masks
  auto [null_mask, null_count] = cudf::bitmask_and(cudf::table_view({lv1.parent(), lv2.parent()}));

  return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::FLOAT32},
                                        row_count,
                                        float_results.release(),
                                        std::move(null_mask),
                                        null_count);
}

實作 Filtering List of String 的函式

上方有說到要實作針對 Array[String] 的過濾會需要同時實作 cudf 裡面的 cudf::listscudf::strings,首先我們要先了解 List 在 Apache Arrow 裡面是怎麼做存儲的?在 libcudf c++ Developer Guide 裡面他提供一個複雜的範例:List<List<List<List<int>>>>,真正的整數值會全部攤平放在一個 List<int> 並且依靠多個 Offsets 與 Null array 來達到區分不同層 List 的效果。

lists_column = { {{{1, 2}, {3, 4}}, NULL}, {{{10, 20}, {30, 40}}, {{50, 60, 70}, {0}}} }

   List<List<List<int>>>  (2 rows):
   Length : 2
   Offsets : 0, 2, 4
   Children :
      List<List<int>>:
      Length : 4
      Offsets : 0, 2, 2, 4, 6
      Null count: 1
        1101
      Children :
        List<int>:
        Length : 6
        Offsets : 0, 2, 4, 6, 8, 11, 12
        Children :
          Column of ints
          1, 2, 3, 4, 10, 20, 30, 40, 50, 60, 70, 0

那麼在 Apache Arrow 裡面是怎麼表示 List<String> 的呢?我們利用以下的例子做說明:

lists_string = [["This", "is", "a"], ["column", "of", "strings"]]

   List<String> (2 rows):
   Length : 2
   Children :
      Offsets : 0, 3, 6
      String:
      Length : 6
      Children : 
      	 Offsets: 0, 4, 6, 7, 13, 15, 22
         Characters: Thisisacolumnofstrings

所以在實作針對 List<String> 開頭是 test 的 filtering 可以使用以下的程式碼:

std::unique_ptr<cudf::column> filtering_startswith_test(cudf::lists_column_view const& lv) {
  // sanity-check the input types
  if (lv.child().type().id() != cudf::type_id::STRING) {
    throw std::invalid_argument("inputs are not lists of string");
  }

  // sanity check the input shape
  auto const row_count = lv.size();
  if (row_count == 0) {
    return cudf::make_empty_column(cudf::data_type{cudf::type_id::LIST});
  }
  if (lv.child().null_count() != 0) {
    throw std::invalid_argument("null string are not supported");
  }

  auto const scv = cudf::strings_column_view(lv.child());
  auto const rs_start = cudf::string_scalar(std::string("test"));
  // build lists of bools column
  auto const list_bools = cudf::make_lists_column(lv.size(),
                                                  std::make_unique<cudf::column>(lv.offsets()),
                                                  cudf::strings::starts_with(scv, rs_start),
                                                  lv.null_count(),
                                                  cudf::copy_bitmask(lv.parent()));
  return cudf::lists::apply_boolean_mask(lv, list_bools->view());
}
                                                  

另外關於 Struct 在 Apache Arrow 中的 Struct 的儲存方法,以下是 libcudf C++ Developer Guide 提供的範例,

{
  type = STRUCT
  null_mask = [1, 1, 0, 1]
  null_count = 1
  children = {
    {
      type = FLOAT32
      data =       [1.0, 4.0, X, 8.0]
      null_mask  = [  1,   1, 0,   1]
      null_count = 1
    },
    {
      type = INT32
      data =       [2, 5, X, X]
      null_mask  = [1, 1, 0, 0]
      null_count = 2
    }
  }
}