diff --git a/cpp/src/arrow/acero/tpch_benchmark.cc b/cpp/src/arrow/acero/tpch_benchmark.cc index ac3b69c9b70..b7f7f365e6b 100644 --- a/cpp/src/arrow/acero/tpch_benchmark.cc +++ b/cpp/src/arrow/acero/tpch_benchmark.cc @@ -21,6 +21,7 @@ #include "arrow/acero/test_util_internal.h" #include "arrow/acero/tpch_node.h" #include "arrow/compute/cast.h" +#include "arrow/table.h" #include "arrow/testing/future_util.h" #include @@ -36,6 +37,8 @@ using compute::SortKey; namespace acero { namespace internal { +constexpr int64_t kTpchBatchSize = 4096; + std::shared_ptr Plan_Q1(AsyncGenerator>* sink_gen, int scale_factor) { std::shared_ptr plan = *ExecPlan::Make(); @@ -112,6 +115,95 @@ std::shared_ptr Plan_Q1(AsyncGenerator>* sink return plan; } +std::shared_ptr GenerateLineitemTable(int scale_factor) { + std::shared_ptr plan = *ExecPlan::Make(); + std::unique_ptr gen = + *TpchGen::Make(plan.get(), static_cast(scale_factor), kTpchBatchSize); + ExecNode* lineitem = + *gen->Lineitem({"L_QUANTITY", "L_EXTENDEDPRICE", "L_TAX", "L_DISCOUNT", + "L_SHIPDATE", "L_RETURNFLAG", "L_LINESTATUS"}); + + std::shared_ptr
lineitem_table; + TableSinkNodeOptions options{&lineitem_table}; + std::ignore = *MakeExecNode("table_sink", plan.get(), {lineitem}, options); + plan->StartProducing(); + ARROW_CHECK_OK(plan->finished().status()); + return lineitem_table; +} + +std::shared_ptr Plan_Q1_FromTable( + const std::shared_ptr
& lineitem_table, + AsyncGenerator>* sink_gen) { + std::shared_ptr plan = *ExecPlan::Make(); + Declaration lineitem_decl("table_source", + TableSourceNodeOptions(lineitem_table, kTpchBatchSize)); + + auto sept_2_1998 = std::make_shared( + 10471); // September 2, 1998 is 10471 days after January 1, 1970 + Expression filter = + less_equal(field_ref("L_SHIPDATE"), literal(std::move(sept_2_1998))); + FilterNodeOptions filter_opts(filter); + + Expression l_returnflag = field_ref("L_RETURNFLAG"); + Expression l_linestatus = field_ref("L_LINESTATUS"); + Expression quantity = field_ref("L_QUANTITY"); + Expression base_price = field_ref("L_EXTENDEDPRICE"); + + std::shared_ptr decimal_1 = + std::make_shared(Decimal128{0, 100}, decimal128(12, 2)); + Expression discount_multiplier = + call("subtract", {literal(decimal_1), field_ref("L_DISCOUNT")}); + Expression tax_multiplier = call("add", {literal(decimal_1), field_ref("L_TAX")}); + Expression disc_price = + call("multiply", {field_ref("L_EXTENDEDPRICE"), discount_multiplier}); + Expression charge = + call("multiply", + {call("cast", + {call("multiply", {field_ref("L_EXTENDEDPRICE"), discount_multiplier})}, + compute::CastOptions::Unsafe(decimal128(12, 2))), + tax_multiplier}); + Expression discount = field_ref("L_DISCOUNT"); + + std::vector projection_list = {l_returnflag, l_linestatus, quantity, + base_price, disc_price, charge, + quantity, base_price, discount}; + std::vector project_names = { + "l_returnflag", "l_linestatus", "sum_qty", "sum_base_price", "sum_disc_price", + "sum_charge", "avg_qty", "avg_price", "avg_disc"}; + ProjectNodeOptions project_opts(std::move(projection_list), std::move(project_names)); + + auto sum_opts = + std::make_shared(ScalarAggregateOptions::Defaults()); + auto count_opts = std::make_shared(CountOptions::CountMode::ALL); + std::vector aggs = { + {"hash_sum", sum_opts, "sum_qty", "sum_qty"}, + {"hash_sum", sum_opts, "sum_base_price", "sum_base_price"}, + {"hash_sum", sum_opts, "sum_disc_price", "sum_disc_price"}, + {"hash_sum", sum_opts, "sum_charge", "sum_charge"}, + {"hash_mean", sum_opts, "avg_qty", "avg_qty"}, + {"hash_mean", sum_opts, "avg_price", "avg_price"}, + {"hash_mean", sum_opts, "avg_disc", "avg_disc"}, + {"hash_count", count_opts, "sum_qty", "count_order"}}; + + std::vector keys = {"l_returnflag", "l_linestatus"}; + AggregateNodeOptions agg_opts(aggs, keys); + + SortKey l_returnflag_key("l_returnflag"); + SortKey l_linestatus_key("l_linestatus"); + SortOptions sort_opts({l_returnflag_key, l_linestatus_key}); + OrderBySinkNodeOptions order_by_opts(sort_opts, sink_gen); + + Declaration filter_decl("filter", {lineitem_decl}, filter_opts); + Declaration project_decl("project", project_opts); + Declaration aggregate_decl("aggregate", agg_opts); + Declaration orderby_decl("order_by_sink", order_by_opts); + + Declaration q1 = + Declaration::Sequence({filter_decl, project_decl, aggregate_decl, orderby_decl}); + std::ignore = *q1.AddToPlan(plan.get()); + return plan; +} + static void BM_Tpch_Q1(benchmark::State& st) { for (auto _ : st) { st.PauseTiming(); @@ -124,6 +216,20 @@ static void BM_Tpch_Q1(benchmark::State& st) { } BENCHMARK(BM_Tpch_Q1)->Args({1})->ArgNames({"ScaleFactor"}); + +static void BM_Tpch_Q1_ExecOnly(benchmark::State& st) { + auto lineitem_table = GenerateLineitemTable(static_cast(st.range(0))); + ARROW_CHECK(lineitem_table); + for (auto _ : st) { + AsyncGenerator> sink_gen; + std::shared_ptr plan = + Plan_Q1_FromTable(lineitem_table, &sink_gen); + auto fut = StartAndCollect(plan.get(), sink_gen); + auto res = *fut.MoveResult(); + } +} + +BENCHMARK(BM_Tpch_Q1_ExecOnly)->Args({1})->ArgNames({"ScaleFactor"}); } // namespace internal } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index c5ba012d767..1bfb149355e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -4176,7 +4176,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { // For some reason, TDigest computations with libc++ seem much less accurate. // A possible explanation is that libc++ has less precise implementations // of std::sin and std::asin, used in the TDigest implementation. -# ifdef _LIBCPP_VERSION +# if defined(_LIBCPP_VERSION) || defined(_MSC_VER) constexpr double kRelativeTolerance = 0.09; # else constexpr double kRelativeTolerance = 0.05; diff --git a/cpp/src/arrow/util/basic_decimal.cc b/cpp/src/arrow/util/basic_decimal.cc index fc69bcf6f8e..eb359af27ca 100644 --- a/cpp/src/arrow/util/basic_decimal.cc +++ b/cpp/src/arrow/util/basic_decimal.cc @@ -405,7 +405,12 @@ BasicDecimal128& BasicDecimal128::Negate() { return *this; } -BasicDecimal128& BasicDecimal128::Abs() { return *this < 0 ? Negate() : *this; } +BasicDecimal128& BasicDecimal128::Abs() { + if (IsNegative()) { + Negate(); + } + return *this; +} BasicDecimal128 BasicDecimal128::Abs(const BasicDecimal128& in) { BasicDecimal128 result(in);