Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ci/scripts/cpp_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ if [ "${ARROW_FUZZING}" == "ON" ]; then
# Some fuzz regression files may trigger huge memory allocations,
# let the allocator return null instead of aborting.
export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
export ARROW_FUZZING_VERBOSITY=1
# Run golden IPC integration files: these should ideally load without errors,
# though some very old ones carry invalid data (such as decimal values
# larger than their advertised precision).
# shellcheck disable=SC2046
"${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
# shellcheck disable=SC2046
"${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
# Run known crash files
"${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/crash-*
"${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/*-testcase-*
"${binary_output_dir}/arrow-ipc-file-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-file/*-testcase-*
Expand Down
75 changes: 45 additions & 30 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1252,40 +1252,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
ReadStats* out_stats = nullptr,
MetadataVector* out_metadata_list = nullptr) override {
std::shared_ptr<io::RandomAccessFile> buf_reader;
if (kCoalesce) {
// Use a non-zero-copy enabled BufferReader so we can test paths properly
buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
} else {
buf_reader = std::make_shared<io::BufferReader>(buffer_);
}
AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
// The generator doesn't track stats.
EXPECT_EQ(nullptr, out_stats);

{
auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
// Do NOT assert OK since some tests check whether this fails properly
EXPECT_FINISHES(fut);
ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
EXPECT_EQ(num_batches_written_, reader->num_record_batches());
// Generator will keep reader alive internally
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
}
auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
std::shared_ptr<io::RandomAccessFile> buf_reader;
if (kCoalesce) {
// Use a non-zero-copy enabled BufferReader so we can test paths properly
buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
} else {
buf_reader = std::make_shared<io::BufferReader>(buffer_);
}
AsyncGenerator<std::shared_ptr<RecordBatch>> generator;

{
auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
EXPECT_EQ(num_batches_written_, reader->num_record_batches());
if (pre_buffer) {
RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
}
// Generator will keep reader alive internally
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
}

// Generator is async-reentrant
std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
// Generator is async-reentrant
std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
for (int i = 0; i < num_batches_written_; ++i) {
futures.push_back(generator());
}
auto fut = generator();
ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
EXPECT_EQ(nullptr, final_batch);

RecordBatchVector batches;
for (auto& future : futures) {
ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
EXPECT_NE(nullptr, batch);
batches.push_back(batch);
}
return batches;
};

ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
// Also read with pre-buffered metadata, and check the results are equal
ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
for (int i = 0; i < num_batches_written_; ++i) {
futures.push_back(generator());
}
auto fut = generator();
EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
for (auto& future : futures) {
EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
out_batches->push_back(batch);
AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
/*check_metadata=*/true);
}

// The generator doesn't track stats.
EXPECT_EQ(nullptr, out_stats);

return Status::OK();
}
};
Expand Down
Loading
Loading