Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions cpp/src/arrow/acero/tpch_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/tpch_node.h"
#include "arrow/compute/cast.h"
#include "arrow/table.h"
#include "arrow/testing/future_util.h"

#include <memory>
Expand All @@ -36,6 +37,8 @@ using compute::SortKey;
namespace acero {
namespace internal {

constexpr int64_t kTpchBatchSize = 4096;

std::shared_ptr<ExecPlan> Plan_Q1(AsyncGenerator<std::optional<ExecBatch>>* sink_gen,
int scale_factor) {
std::shared_ptr<ExecPlan> plan = *ExecPlan::Make();
Expand Down Expand Up @@ -112,6 +115,95 @@ std::shared_ptr<ExecPlan> Plan_Q1(AsyncGenerator<std::optional<ExecBatch>>* sink
return plan;
}

std::shared_ptr<Table> GenerateLineitemTable(int scale_factor) {
std::shared_ptr<ExecPlan> plan = *ExecPlan::Make();
std::unique_ptr<TpchGen> gen =
*TpchGen::Make(plan.get(), static_cast<double>(scale_factor), kTpchBatchSize);
ExecNode* lineitem =
*gen->Lineitem({"L_QUANTITY", "L_EXTENDEDPRICE", "L_TAX", "L_DISCOUNT",
"L_SHIPDATE", "L_RETURNFLAG", "L_LINESTATUS"});

std::shared_ptr<Table> lineitem_table;
TableSinkNodeOptions options{&lineitem_table};
std::ignore = *MakeExecNode("table_sink", plan.get(), {lineitem}, options);
plan->StartProducing();
ARROW_CHECK_OK(plan->finished().status());
return lineitem_table;
}

std::shared_ptr<ExecPlan> Plan_Q1_FromTable(
const std::shared_ptr<Table>& lineitem_table,
AsyncGenerator<std::optional<ExecBatch>>* sink_gen) {
std::shared_ptr<ExecPlan> plan = *ExecPlan::Make();
Declaration lineitem_decl("table_source",
TableSourceNodeOptions(lineitem_table, kTpchBatchSize));

auto sept_2_1998 = std::make_shared<Date32Scalar>(
10471); // September 2, 1998 is 10471 days after January 1, 1970
Expression filter =
less_equal(field_ref("L_SHIPDATE"), literal(std::move(sept_2_1998)));
FilterNodeOptions filter_opts(filter);

Expression l_returnflag = field_ref("L_RETURNFLAG");
Expression l_linestatus = field_ref("L_LINESTATUS");
Expression quantity = field_ref("L_QUANTITY");
Expression base_price = field_ref("L_EXTENDEDPRICE");

std::shared_ptr<Decimal128Scalar> decimal_1 =
std::make_shared<Decimal128Scalar>(Decimal128{0, 100}, decimal128(12, 2));
Expression discount_multiplier =
call("subtract", {literal(decimal_1), field_ref("L_DISCOUNT")});
Expression tax_multiplier = call("add", {literal(decimal_1), field_ref("L_TAX")});
Expression disc_price =
call("multiply", {field_ref("L_EXTENDEDPRICE"), discount_multiplier});
Expression charge =
call("multiply",
{call("cast",
{call("multiply", {field_ref("L_EXTENDEDPRICE"), discount_multiplier})},
compute::CastOptions::Unsafe(decimal128(12, 2))),
tax_multiplier});
Expression discount = field_ref("L_DISCOUNT");

std::vector<Expression> projection_list = {l_returnflag, l_linestatus, quantity,
base_price, disc_price, charge,
quantity, base_price, discount};
std::vector<std::string> project_names = {
"l_returnflag", "l_linestatus", "sum_qty", "sum_base_price", "sum_disc_price",
"sum_charge", "avg_qty", "avg_price", "avg_disc"};
ProjectNodeOptions project_opts(std::move(projection_list), std::move(project_names));

auto sum_opts =
std::make_shared<ScalarAggregateOptions>(ScalarAggregateOptions::Defaults());
auto count_opts = std::make_shared<CountOptions>(CountOptions::CountMode::ALL);
std::vector<arrow::compute::Aggregate> aggs = {
{"hash_sum", sum_opts, "sum_qty", "sum_qty"},
{"hash_sum", sum_opts, "sum_base_price", "sum_base_price"},
{"hash_sum", sum_opts, "sum_disc_price", "sum_disc_price"},
{"hash_sum", sum_opts, "sum_charge", "sum_charge"},
{"hash_mean", sum_opts, "avg_qty", "avg_qty"},
{"hash_mean", sum_opts, "avg_price", "avg_price"},
{"hash_mean", sum_opts, "avg_disc", "avg_disc"},
{"hash_count", count_opts, "sum_qty", "count_order"}};

std::vector<FieldRef> keys = {"l_returnflag", "l_linestatus"};
AggregateNodeOptions agg_opts(aggs, keys);

SortKey l_returnflag_key("l_returnflag");
SortKey l_linestatus_key("l_linestatus");
SortOptions sort_opts({l_returnflag_key, l_linestatus_key});
OrderBySinkNodeOptions order_by_opts(sort_opts, sink_gen);

Declaration filter_decl("filter", {lineitem_decl}, filter_opts);
Declaration project_decl("project", project_opts);
Declaration aggregate_decl("aggregate", agg_opts);
Declaration orderby_decl("order_by_sink", order_by_opts);

Declaration q1 =
Declaration::Sequence({filter_decl, project_decl, aggregate_decl, orderby_decl});
std::ignore = *q1.AddToPlan(plan.get());
return plan;
}

static void BM_Tpch_Q1(benchmark::State& st) {
for (auto _ : st) {
st.PauseTiming();
Expand All @@ -124,6 +216,20 @@ static void BM_Tpch_Q1(benchmark::State& st) {
}

BENCHMARK(BM_Tpch_Q1)->Args({1})->ArgNames({"ScaleFactor"});

static void BM_Tpch_Q1_ExecOnly(benchmark::State& st) {
auto lineitem_table = GenerateLineitemTable(static_cast<int>(st.range(0)));
ARROW_CHECK(lineitem_table);
for (auto _ : st) {
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
std::shared_ptr<ExecPlan> plan =
Plan_Q1_FromTable(lineitem_table, &sink_gen);
auto fut = StartAndCollect(plan.get(), sink_gen);
auto res = *fut.MoveResult();
}
}

BENCHMARK(BM_Tpch_Q1_ExecOnly)->Args({1})->ArgNames({"ScaleFactor"});
} // namespace internal
} // namespace acero
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4176,7 +4176,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
// For some reason, TDigest computations with libc++ seem much less accurate.
// A possible explanation is that libc++ has less precise implementations
// of std::sin and std::asin, used in the TDigest implementation.
# ifdef _LIBCPP_VERSION
# if defined(_LIBCPP_VERSION) || defined(_MSC_VER)
constexpr double kRelativeTolerance = 0.09;
# else
constexpr double kRelativeTolerance = 0.05;
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/util/basic_decimal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,12 @@ BasicDecimal128& BasicDecimal128::Negate() {
return *this;
}

BasicDecimal128& BasicDecimal128::Abs() { return *this < 0 ? Negate() : *this; }
BasicDecimal128& BasicDecimal128::Abs() {
if (IsNegative()) {
Negate();
}
return *this;
}

BasicDecimal128 BasicDecimal128::Abs(const BasicDecimal128& in) {
BasicDecimal128 result(in);
Expand Down