diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/CMakeLists.txt b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/CMakeLists.txt index e4c6de32e..e11c95053 100755 --- a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/CMakeLists.txt +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/CMakeLists.txt @@ -141,6 +141,9 @@ link_directories("${CMAKE_CUDA_IMPLICIT_LINK_DIRECTORIES}" # - library targets ------------------------------------------------------------------------------- set(SOURCE_FILES + "src/integer_average.cu" + "src/integer_average_host_udf.cpp" + "src/IntegerAverageJni.cpp" "src/CosineSimilarityJni.cpp" "src/StringWordCountJni.cpp" "src/cosine_similarity.cu" diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/IntegerAverageJni.cpp b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/IntegerAverageJni.cpp new file mode 100644 index 000000000..9ecef0053 --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/IntegerAverageJni.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "integer_average.hpp" +#include "integer_average_host_udf.hpp" + +#include + +namespace { +/** + * @brief Throw a Java exception + * + * @param env The Java environment + * @param msg The message string to associate with the exception + */ +void throw_java_exception(JNIEnv* env, char const* msg) { + jclass exp_class = env->FindClass("java/lang/RuntimeException"); + if (exp_class != NULL) { + env->ThrowNew(exp_class, msg); + } +} + +} // anonymous namespace + +extern "C" { + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_udaf_java_AverageExampleUDF_createAverageHostUDF( + JNIEnv* env, jclass, jint agg_type) +{ + try { + auto udf_ptr = [&] { + // The value of agg_type must be sync with + // `AverageExampleUDF.java#AggregationType`. + switch (agg_type) { + case 0: return examples::create_average_example_reduction_host_udf(); + case 1: return examples::create_average_example_reduction_merge_host_udf(); + case 2: return examples::create_average_example_groupby_host_udf(); + case 3: return examples::create_average_example_groupby_merge_host_udf(); + default: CUDF_FAIL("Invalid aggregation type."); + } + }(); + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid AverageExample UDF instance."); + return reinterpret_cast(udf_ptr); + } catch (std::bad_alloc const& e) { + auto msg = std::string("Unable to allocate native memory: ") + + (e.what() == nullptr ? "" : e.what()); + throw_java_exception(env, msg.c_str()); + } catch (std::exception const& e) { + auto msg = e.what() == nullptr ? "" : e.what(); + throw_java_exception(env, msg); + } + return 0; +} + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_udaf_java_AverageExampleUDF_computeAvg( + JNIEnv* env, jclass, jlong input) +{ + if (input == 0) { + throw_java_exception(env, "input column is null"); + } else { + try { + auto const input_view = reinterpret_cast(input); + return reinterpret_cast(examples::compute_average_example(*input_view).release()); + } catch (std::bad_alloc const& e) { + auto msg = std::string("Unable to allocate native memory: ") + + (e.what() == nullptr ? "" : e.what()); + throw_java_exception(env, msg.c_str()); + } catch (std::exception const& e) { + auto msg = e.what() == nullptr ? "" : e.what(); + throw_java_exception(env, msg); + } + } + return 0; +} + +} // extern "C" \ No newline at end of file diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average.cu b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average.cu new file mode 100644 index 000000000..b45c02ae4 --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average.cu @@ -0,0 +1,396 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "integer_average.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace examples { +namespace detail { +namespace { + +struct group_fn { + // inputs + int const* key_offsets_ptr; + cudf::column_device_view d_data; + + // outputs + int* sum_output; + bool* sum_validity; // false indicates all values in a group are null(sum is null) + int* count_output; + + __device__ void operator()(cudf::size_type group_idx) const + { + bool is_sum_valid = false; + int sum = 0; + int count = 0; + for (auto i = key_offsets_ptr[group_idx]; i < key_offsets_ptr[group_idx + 1]; ++i) { + if (d_data.is_valid(i)) { + is_sum_valid = true; + sum += d_data.element(i); + ++count; + } + } + + sum_output[group_idx] = sum; + sum_validity[group_idx] = is_sum_valid; + count_output[group_idx] = count; + } +}; + +struct group_merge_fn { + // inputs + int const* key_offsets_ptr; + cudf::column_device_view d_input_sum; + int const* input_count_ptr; + + // outputs + int* sum_output; + bool* sum_validity; // false indicates all values in a group are null(sum is null) + int* count_output; + + __device__ void operator()(cudf::size_type group_idx) const + { + bool is_sum_valid = false; + int sum = 0; + int count = 0; + for (auto i = key_offsets_ptr[group_idx]; i < key_offsets_ptr[group_idx + 1]; ++i) { + if (d_input_sum.is_valid(i)) { + is_sum_valid = true; + sum += d_input_sum.element(i); + count += input_count_ptr[i]; + } + } + + sum_output[group_idx] = sum; + sum_validity[group_idx] = is_sum_valid; + count_output[group_idx] = count; + } +}; + +} // anonymous namespace + +std::unique_ptr group_avg(cudf::column_view const& grouped_data, + cudf::column_view const& key_offsets, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto num_groups = key_offsets.size() - 1; + auto const d_data = cudf::column_device_view::create(grouped_data, stream); + + // create output columns: sum and count + auto sum_col = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::INT32}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr); + rmm::device_uvector sum_validity(num_groups, stream); + auto count_col = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::INT32}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr); + + // merge sum and count for each group + thrust::for_each_n(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + num_groups, + group_fn{key_offsets.begin(), + *d_data, + sum_col->mutable_view().begin(), + sum_validity.begin(), + count_col->mutable_view().begin()}); + + // set nulls for sum column + auto [sum_null_mask, sum_null_count] = cudf::detail::valid_if( + sum_validity.begin(), sum_validity.end(), cuda::std::identity{}, stream, mr); + if (sum_null_count > 0) { sum_col->set_null_mask(std::move(sum_null_mask), sum_null_count); } + + // make struct column with (sum, count) + std::vector> children; + children.push_back(std::move(sum_col)); + children.push_back(std::move(count_col)); + return cudf::make_structs_column(num_groups, + std::move(children), + 0, // null count + rmm::device_buffer{}, // null mask + stream); +} + +std::unique_ptr group_merge_avg(cudf::column_view const& grouped_data, + cudf::column_view const& key_offsets, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto num_groups = key_offsets.size() - 1; + + // inputs + cudf::structs_column_view scv(grouped_data); + auto input_sum_cv = scv.get_sliced_child(0, stream); + auto input_count_cv = scv.get_sliced_child(1, stream); + auto d_input_sum = cudf::column_device_view::create(input_sum_cv, stream); + int32_t const* input_count_ptr = input_count_cv.begin(); + + // create output columns: sum and count + auto output_sum_col = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::INT32}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr); + rmm::device_uvector sum_validity(num_groups, stream); + auto output_count_col = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::INT32}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr); + + // merge sum and count for each group + thrust::for_each_n(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + num_groups, + group_merge_fn{key_offsets.begin(), + *d_input_sum, + input_count_ptr, + output_sum_col->mutable_view().begin(), + sum_validity.begin(), + output_count_col->mutable_view().data()}); + + // set nulls for sum column + auto [sum_null_mask, sum_null_count] = cudf::detail::valid_if( + sum_validity.begin(), sum_validity.end(), cuda::std::identity{}, stream, mr); + if (sum_null_count > 0) { + output_sum_col->set_null_mask(std::move(sum_null_mask), sum_null_count); + } + + // make struct column with (sum, count) + std::vector> children; + children.push_back(std::move(output_sum_col)); + children.push_back(std::move(output_count_col)); + return cudf::make_structs_column(num_groups, + std::move(children), + 0, // null count + rmm::device_buffer{}, // null mask + stream); +} + +std::unique_ptr reduce_avg(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + // 1. compute count + int count = input.size() - input.null_count(); + bool is_sum_valid = count > 0; + + // 2. compute sum, if element is null, treat it as 0 + auto d_input = cudf::column_device_view::create(input, stream); + auto const element_itr = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [d_input = *d_input] __device__(cudf::size_type idx) -> cudf::size_type { + if (d_input.is_valid(idx)) { + return d_input.element(idx); + } else { + // null element, treat it as 0 + return 0; + } + })); + int sum = + thrust::reduce(rmm::exec_policy_nosync(stream), element_itr, element_itr + input.size()); + + // 2. create output columns: sum and count + auto num_long_cols = 2; + auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { + return cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT32}, + 1, /** scalar has only 1 element */ + cudf::mask_state::UNALLOCATED, + stream, + mr); + }); + auto children = + std::vector>(results_iter, results_iter + num_long_cols); + int32_t* output_sum = children[0]->mutable_view().data(); + rmm::device_uvector sum_validity(1, stream); + int32_t* output_count = children[1]->mutable_view().data(); + thrust::fill( + rmm::exec_policy_nosync(stream), sum_validity.begin(), sum_validity.end(), is_sum_valid); + thrust::fill(rmm::exec_policy_nosync(stream), output_count, output_count + 1, count); + thrust::fill(rmm::exec_policy_nosync(stream), output_sum, output_sum + 1, sum); + + // 3. set null for sum + auto [sum_null_mask, sum_null_count] = cudf::detail::valid_if( + sum_validity.begin(), sum_validity.end(), cuda::std::identity{}, stream, mr); + if (sum_null_count > 0) { children[0]->set_null_mask(std::move(sum_null_mask), sum_null_count); } + + // 4. create struct scalar + return std::make_unique(cudf::table{std::move(children)}, true, stream, mr); +} + +std::unique_ptr reduce_merge_avg(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + // inputs + cudf::structs_column_view scv(input); + auto input_sum_cv = scv.get_sliced_child(0, stream); + auto input_count_cv = scv.get_sliced_child(1, stream); + + // 1. compute sum and count + bool is_sum_valid = input_sum_cv.size() > input_sum_cv.null_count(); + int sum = thrust::reduce( + rmm::exec_policy_nosync(stream), input_sum_cv.begin(), input_sum_cv.end()); + int count = thrust::reduce( + rmm::exec_policy_nosync(stream), input_count_cv.begin(), input_count_cv.end()); + + // 2. create output columns: sum and count + auto num_long_cols = 2; + auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { + return cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT32}, + 1 /** scalar has only 1 element */, + cudf::mask_state::UNALLOCATED, + stream, + mr); + }); + auto children = + std::vector>(results_iter, results_iter + num_long_cols); + int32_t* output_sum = children[0]->mutable_view().data(); + rmm::device_uvector sum_validity(1, stream); + int32_t* output_count = children[1]->mutable_view().data(); + thrust::fill( + rmm::exec_policy_nosync(stream), sum_validity.begin(), sum_validity.end(), is_sum_valid); + thrust::fill(rmm::exec_policy_nosync(stream), output_count, output_count + 1, count); + thrust::fill(rmm::exec_policy_nosync(stream), output_sum, output_sum + 1, sum); + + // 3. set null for sum + auto [sum_null_mask, sum_null_count] = cudf::detail::valid_if( + sum_validity.begin(), sum_validity.end(), cuda::std::identity{}, stream, mr); + if (sum_null_count > 0) { children[0]->set_null_mask(std::move(sum_null_mask), sum_null_count); } + + // 4. create struct scalar + return std::make_unique(cudf::table{std::move(children)}, true, stream, mr); +} + +/** + * input is a struct column: struct(sum, count). + * sum column may have nulls, count column has no nulls. + * output is a int column: avg. + */ +std::unique_ptr compute_average_example(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + cudf::structs_column_view scv(input); + auto sum_cv = scv.get_sliced_child(0, stream); + auto count_cv = scv.get_sliced_child(1, stream); + auto d_sum = cudf::column_device_view::create(sum_cv, stream); + int32_t const* count_ptr = count_cv.begin(); + + // if sum is null, result is null. Use sum's null mask for result. + auto result = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::INT32}, + input.size(), + cudf::detail::copy_bitmask(sum_cv, stream, mr), + input.null_count(), + stream, + mr); + + thrust::tabulate(rmm::exec_policy_nosync(stream), + result->mutable_view().begin(), + result->mutable_view().end(), + [d_sum = *d_sum, count_ptr] __device__(cudf::size_type idx) { + if (d_sum.is_null(idx)) { + // sum is null + return 0; + } + return d_sum.element(idx) / count_ptr[idx]; + }); + return result; +} + +} // namespace detail + +/** + * @brief Aggregate sum and count for each group. + * Input is a int column. + * Output is a struct column: struct(sum, avg), + * the num of rows is equal to num of groups. + */ +std::unique_ptr group_average_example( + cudf::column_view const& input, + cudf::device_span group_offsets, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::group_avg(input, group_offsets, stream, mr); +} + +/** + * @brief Merge sums and counts in the same group. + * Input is a struct column: struct(sum, avg). + * Output is a struct column: struct(sum, avg), the num of rows is equal to num of groups. + */ +std::unique_ptr group_merge_average_example( + cudf::column_view const& input, + cudf::device_span group_offsets, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::group_merge_avg(input, group_offsets, stream, mr); +} + +/** + * @brief Aggregate sum and count for all input rows. + * Input is a int column. + * Output is a struct scalar: struct(sum, avg). + */ +std::unique_ptr reduce_average_example(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::reduce_avg(input, stream, mr); +} + +/** + * @brief Merge all sum and count into one sum and count + * Input is a struct column: struct(sum, avg). + * Output is a struct scalar: struct(sum, avg). + */ +std::unique_ptr reduce_merge_average_example(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::reduce_merge_avg(input, stream, mr); +} + +/** + * @brief Compute average from sum and count. + * Input is a struct column: struct(sum, avg). + * Output is a int column. + * This is the final step for both group and reduction average example. + */ +std::unique_ptr compute_average_example(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::compute_average_example(input, stream, mr); +} + +} // namespace examples \ No newline at end of file diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average.hpp b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average.hpp new file mode 100644 index 000000000..08939d918 --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average.hpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace examples { +/** + * @brief Aggregate sum and count for each group. + * Input is a int column. + * Output is a struct column: struct(sum, avg), + * the num of rows is equal to num of groups. + */ +std::unique_ptr group_average_example( + cudf::column_view const& input, + cudf::device_span group_offsets, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +/** + * @brief Merge sums and counts in the same group. + * Input is a struct column: struct(sum, avg). + * Output is a struct column: struct(sum, avg), the num of rows is equal to num of groups. + */ +std::unique_ptr group_merge_average_example( + cudf::column_view const& input, + cudf::device_span group_offsets, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +/** + * @brief Aggregate sum and count for all input rows. + * Input is a int column. + * Output is a struct scalar: struct(sum, avg). + */ +std::unique_ptr reduce_average_example( + cudf::column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +/** + * @brief Merge all sum and count into one sum and count + * Input is a struct column: struct(sum, avg). + * Output is a struct scalar: struct(sum, avg). + */ +std::unique_ptr reduce_merge_average_example( + cudf::column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +/** + * @brief Compute average from sum and count. + * Input is a struct column: struct(sum, avg). + * Output is a int column. + * This is the final step for both group and reduction average example. + */ +std::unique_ptr compute_average_example( + cudf::column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); +} // namespace examples \ No newline at end of file diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average_host_udf.cpp b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average_host_udf.cpp new file mode 100644 index 000000000..e3a2daf49 --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average_host_udf.cpp @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "integer_average_host_udf.hpp" +#include "integer_average.hpp" + +#include +#include + +namespace examples { + +namespace { + +struct average_example_groupby_udf : cudf::groupby_host_udf { + average_example_groupby_udf(bool is_merge_) : is_merge(is_merge_) {} + + /** + * @brief Perform the main groupby computation for average_example UDF. + */ + [[nodiscard]] std::unique_ptr operator()( + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override + { + auto const group_values = get_grouped_values(); + if (group_values.size() == 0) { return get_empty_output(stream, mr); } + auto const group_offsets = get_group_offsets(); + if (is_merge) { + // input is a struct column: struct(sum, avg) + return examples::group_merge_average_example(group_values, group_offsets, stream, mr); + } else { + // input is a int column + return examples::group_average_example(group_values, group_offsets, stream, mr); + } + } + + /** + * @brief Create an empty column when the input is empty for groupby UDF. + * Create a struct(sum, count) column with zero rows. + */ + [[nodiscard]] std::unique_ptr get_empty_output( + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override + { + int num_long_cols = 2; // sum and count + auto const results_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int i) { return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); }); + auto children = + std::vector>(results_iter, results_iter + num_long_cols); + return cudf::make_structs_column(0, + std::move(children), + 0, // null count + rmm::device_buffer{}, // null mask + stream, + mr); + } + + [[nodiscard]] bool is_equal(cudf::host_udf_base const& other) const override + { + auto o = dynamic_cast(&other); + return o != nullptr && o->is_merge == this->is_merge; + } + + [[nodiscard]] std::size_t do_hash() const override + { + return 31 * std::hash{}({"average_example_groupby_udf"}) + is_merge; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(is_merge); + } + + private: + bool is_merge; +}; + +struct average_example_reduction_udf : cudf::reduce_host_udf { + average_example_reduction_udf(bool is_merge_) : is_merge(is_merge_) {} + + /** + * @brief Create an empty scalar when the input is empty. + */ + std::unique_ptr get_empty_scalar(rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + int num_long_cols = 2; // sum and count + auto const results_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int i) { return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); }); + auto children = + std::vector>(results_iter, results_iter + num_long_cols); + return std::make_unique( + cudf::table{std::move(children)}, true, stream, mr); + } + + /** + * @brief Perform the main reduce computation for average_example UDF. + */ + std::unique_ptr operator()( + cudf::column_view const& input, + cudf::data_type, /** output_dtype is useless */ + std::optional>, /** init is useless */ + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + if (input.size() == 0) { return get_empty_scalar(stream, mr); } + if (is_merge) { + // input is a struct column: struct(sum, avg) + return examples::reduce_merge_average_example(input, stream, mr); + } else { + // intput is a int column + return examples::reduce_average_example(input, stream, mr); + } + } + + [[nodiscard]] bool is_equal(cudf::host_udf_base const& other) const override + { + auto o = dynamic_cast(&other); + return o != nullptr && o->is_merge == this->is_merge; + } + + [[nodiscard]] std::size_t do_hash() const override + { + return 31 * (31 * std::hash{}({"average_example_reduction_udf"}) + is_merge); + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(is_merge); + } + + private: + bool is_merge; +}; + +} // namespace + +cudf::host_udf_base* create_average_example_reduction_host_udf() +{ + return new average_example_reduction_udf(/*is_merge*/ false); +} + +cudf::host_udf_base* create_average_example_reduction_merge_host_udf() +{ + return new average_example_reduction_udf(/*is_merge*/ true); +} + +cudf::host_udf_base* create_average_example_groupby_host_udf() +{ + return new average_example_groupby_udf(/*is_merge*/ false); +} + +cudf::host_udf_base* create_average_example_groupby_merge_host_udf() +{ + return new average_example_groupby_udf(/*is_merge*/ true); +} + +} // namespace examples \ No newline at end of file diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average_host_udf.hpp b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average_host_udf.hpp new file mode 100644 index 000000000..b849c59b9 --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/src/integer_average_host_udf.hpp @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace examples { + +cudf::host_udf_base* create_average_example_reduction_host_udf(); + +cudf::host_udf_base* create_average_example_reduction_merge_host_udf(); + +cudf::host_udf_base* create_average_example_groupby_host_udf(); + +cudf::host_udf_base* create_average_example_groupby_merge_host_udf(); + +} // namespace examples diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/java/com/nvidia/spark/rapids/udaf/java/IntegerAverage.java b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/java/com/nvidia/spark/rapids/udaf/java/IntegerAverage.java new file mode 100644 index 000000000..4af37152c --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/java/com/nvidia/spark/rapids/udaf/java/IntegerAverage.java @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.udaf.java; + +import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.ColumnView; +import ai.rapids.cudf.DType; +import ai.rapids.cudf.GroupByAggregation; +import ai.rapids.cudf.GroupByAggregationOnColumn; +import ai.rapids.cudf.HostUDFWrapper; +import ai.rapids.cudf.ReductionAggregation; +import ai.rapids.cudf.Scalar; +import com.nvidia.spark.RapidsSimpleGroupByAggregation; +import com.nvidia.spark.RapidsUDAF; +import com.nvidia.spark.RapidsUDAFGroupByAggregation; +import com.nvidia.spark.rapids.udf.java.NativeUDFExamplesLoader; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.expressions.MutableAggregationBuffer; +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Objects; + +/** + * A user-defined aggregate function (UDAF) that computes the average value + * of integers. + * This class demonstrates how to implement a UDAF that also provides a + * RAPIDS implementation that can run on the GPU when the query is executed + * with the RAPIDS Accelerator for Apache Spark. + */ +@SuppressWarnings("deprecation") +public class IntegerAverage extends UserDefinedAggregateFunction implements RapidsUDAF { + /** Row-by-row implementation that executes on the CPU */ + @Override + public StructType inputSchema() { + // Integer values as the input + return StructType.fromDDL("input INTEGER"); + } + + @Override + public StructType bufferSchema() { + // two long buffers, sum and count + return StructType.fromDDL("sum BIGINT, count BIGINT"); + } + + @Override + public DataType dataType() { + // integer as the result type + return IntegerType$.MODULE$; + } + + @Override + public boolean deterministic() { + return true; + } + + @Override + public void initialize(MutableAggregationBuffer buffer) { + buffer.update(0, null); // sum + buffer.update(1, 0L); // count + } + + @Override + public void update(MutableAggregationBuffer buffer, Row input) { + if (!input.isNullAt(0)) { + long accSum = input.getInt(0); + if(!buffer.isNullAt(0)) { + accSum += buffer.getLong(0); + } + buffer.update(0, accSum); // sum field in buffer + buffer.update(1, buffer.getLong(1) + 1L); // count field in buffer + } // ignore nulls + } + + @Override + public void merge(MutableAggregationBuffer buffer1, Row buffer2) { + if (!buffer2.isNullAt(0)) { + long accMergedSum = buffer2.getLong(0); + if (!buffer1.isNullAt(0)) { + accMergedSum += buffer1.getLong(0); + } + buffer1.update(0, accMergedSum); // sum + } // else{} // NOOP, "buffer2[0]" is null so "buffer1" holds the correct value already + buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1)); // count + } + + @Override + public Object evaluate(Row buffer) { + long count = buffer.getLong(1); + if (count == 0) { + return null; + } else { + return (int)(buffer.getLong(0) / count); + } + } + + /** Columnar implementation that runs on the GPU */ + @Override + public Scalar[] getDefaultValue() { + // The output should follow the buffer schema defined by "aggBufferTypes". + // aka, a single struct type. + try ( + Scalar nullLong = Scalar.fromNull(DType.INT32); + ColumnVector nullSum = ColumnVector.fromScalar(nullLong, 1); + ColumnVector count = ColumnVector.fromInts( 0)) { + return new Scalar[]{ Scalar.structFromColumnViews(nullSum, count) }; + } + } + + // preProcess, the default implementation is good enough. + + @Override + public RapidsUDAFGroupByAggregation updateAggregation() { + return new RapidsSimpleGroupByAggregation() { + @Override + public GroupByAggregationOnColumn[] aggregate(int[] ids) { + assert ids.length == 1; // the integer input column + HostUDFWrapper avgUDAF = new RapidsAverageUDAF(AggregationType.GroupBy); + return new GroupByAggregationOnColumn[] { + GroupByAggregation.hostUDF(avgUDAF).onColumn(ids[0]) + }; + } + + @Override + public Scalar[] reduce(int numRows, ColumnVector[] args) { + assert args.length == 1; + HostUDFWrapper avgUDAF = new RapidsAverageUDAF(AggregationType.Reduction); + return new Scalar[] { args[0].reduce(ReductionAggregation.hostUDF(avgUDAF)) }; + } + }; + } + + @Override + public RapidsUDAFGroupByAggregation mergeAggregation() { + return new RapidsSimpleGroupByAggregation() { + @Override + public GroupByAggregationOnColumn[] aggregate(int[] ids) { + // Column of struct type with two children: sum and count, after "update" stage. + assert ids.length == 1; + HostUDFWrapper avgUDAF = new RapidsAverageUDAF(AggregationType.GroupByMerge); + return new GroupByAggregationOnColumn[] { + GroupByAggregation.hostUDF(avgUDAF).onColumn(ids[0]) + }; + } + + @Override + public Scalar[] reduce(int numRows, ColumnVector[] args) { + // Column of struct type with two children: sum and count, after "update" stage. + assert args.length == 1; + HostUDFWrapper avgUDAF = new RapidsAverageUDAF(AggregationType.ReductionMerge); + return new Scalar[] { args[0].reduce(ReductionAggregation.hostUDF(avgUDAF)) }; + } + }; + } + + @Override + public ColumnVector postProcess(int numRows, ColumnVector[] buffers, DataType retType) { + try { + // Column of struct type with two children: sum and count, after "merge" stage. + assert buffers.length == 1; + return RapidsAverageUDAF.computeAvg(buffers[0]); + } finally { + // Make sure buffers always be closed to avoid memory leak. + Arrays.stream(buffers).forEach(ColumnVector::close); + } + } + + @Override + public DataType[] aggBufferTypes() { + return new DataType[] { bufferSchema() }; + } +} + +// The customized cuDF aggregation to perform the average computation on integer values. +class RapidsAverageUDAF extends HostUDFWrapper { + public RapidsAverageUDAF(AggregationType type) { + this.aggType = type; + } + + @Override + public long createUDFInstance() { + if (nativeInstance == 0L) { + NativeUDFExamplesLoader.ensureLoaded(); + nativeInstance = createAverageHostUDF(aggType.nativeId); + } + return nativeInstance; + } + + @Override + public int computeHashCode() { + return Objects.hash(this.getClass().getName(), aggType, createUDFInstance()); + } + + @Override + public boolean isEqual(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) { + return false; + } + RapidsAverageUDAF other = (RapidsAverageUDAF) o; + return aggType == other.aggType; + } + + // input struct(sum, count), output avg column + public static ColumnVector computeAvg(ColumnView input) { + NativeUDFExamplesLoader.ensureLoaded(); + return new ColumnVector(computeAvg(input.getNativeView())); + } + + private static native long createAverageHostUDF(int type); + private static native long computeAvg(long inputHandle); + + private final AggregationType aggType; + private long nativeInstance; +} + +enum AggregationType { + // input: int column, output: sum, count + Reduction(0), + // input: sum, count, output: sum, count + ReductionMerge(1), + // input: int column, output: struct(sum: int, count: int) + GroupBy(2), + // input: struct(sum: int, count: int), output: struct(sum: int, count: int) + GroupByMerge(3); + + final int nativeId; + + AggregationType(int nativeId) { + this.nativeId = nativeId; + } +} \ No newline at end of file diff --git a/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/python/rapids_udaf_test.py b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/python/rapids_udaf_test.py new file mode 100644 index 000000000..ed7e4fa6b --- /dev/null +++ b/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/python/rapids_udaf_test.py @@ -0,0 +1,52 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_sql +from data_gen import * +from rapids_udf_test import drop_udf + +def load_java_udaf(spark, udf_name, udf_class): + drop_udf(spark, udf_name) + spark.udf.registerJavaUDAF(udf_name, udf_class) + + +@pytest.mark.ignore_order(local=True) +@pytest.mark.rapids_udf_example_native +def test_java_udaf_integer_average_groupby(): + def two_cols_table(spark): + load_java_udaf(spark, "int_avg", "com.nvidia.spark.rapids.udaf.java.IntegerAverage") + group_gen = RepeatSeqGen(string_gen, 100) # 100 groups at most + int_value_gen = IntegerGen(min_val=-100, max_val=100) # avoid integer overflow + return two_col_df(spark, group_gen, int_value_gen) + + assert_gpu_and_cpu_are_equal_sql( + two_cols_table, + "int_avg_udaf_test_table", + "SELECT a, int_avg(b) FROM int_avg_udaf_test_table GROUP BY a") + + +@pytest.mark.ignore_order(local=True) +@pytest.mark.rapids_udf_example_native +def test_java_udaf_integer_average_reduction(): + def int_col_table(spark): + load_java_udaf(spark, "int_avg", "com.nvidia.spark.rapids.udaf.java.IntegerAverage") + int_value_gen = IntegerGen(min_val=-100, max_val=100) # avoid integer overflow + return unary_op_df(spark, int_value_gen) + + assert_gpu_and_cpu_are_equal_sql( + int_col_table, + "int_avg_udaf_test_table", + "SELECT int_avg(a) FROM int_avg_udaf_test_table")