diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh index 0ad59bc308f..5d6d5e099ab 100755 --- a/ci/scripts/cpp_test.sh +++ b/ci/scripts/cpp_test.sh @@ -182,6 +182,15 @@ if [ "${ARROW_FUZZING}" == "ON" ]; then # Some fuzz regression files may trigger huge memory allocations, # let the allocator return null instead of aborting. export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1" + export ARROW_FUZZING_VERBOSITY=1 + # Run golden IPC integration files: these should ideally load without errors, + # though some very old ones carry invalid data (such as decimal values + # larger than their advertised precision). + # shellcheck disable=SC2046 + "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream") + # shellcheck disable=SC2046 + "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file") + # Run known crash files "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/crash-* "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/*-testcase-* "${binary_output_dir}/arrow-ipc-file-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-file/*-testcase-* diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 315d8bd07d9..9f7df541bd7 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1252,40 +1252,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper { Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches, ReadStats* out_stats = nullptr, MetadataVector* out_metadata_list = nullptr) override { - std::shared_ptr buf_reader; - if (kCoalesce) { - // Use a non-zero-copy enabled BufferReader so we can test paths properly - buf_reader = std::make_shared(buffer_); - } else { - buf_reader = std::make_shared(buffer_); - } - AsyncGenerator> generator; + // The generator doesn't track stats. + EXPECT_EQ(nullptr, out_stats); - { - auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options); - // Do NOT assert OK since some tests check whether this fails properly - EXPECT_FINISHES(fut); - ARROW_ASSIGN_OR_RAISE(auto reader, fut.result()); - EXPECT_EQ(num_batches_written_, reader->num_record_batches()); - // Generator will keep reader alive internally - ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce)); - } + auto read_batches = [&](bool pre_buffer) -> Result { + std::shared_ptr buf_reader; + if (kCoalesce) { + // Use a non-zero-copy enabled BufferReader so we can test paths properly + buf_reader = std::make_shared(buffer_); + } else { + buf_reader = std::make_shared(buffer_); + } + AsyncGenerator> generator; + + { + auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options); + ARROW_ASSIGN_OR_RAISE(auto reader, fut.result()); + EXPECT_EQ(num_batches_written_, reader->num_record_batches()); + if (pre_buffer) { + RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{})); + } + // Generator will keep reader alive internally + ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce)); + } - // Generator is async-reentrant - std::vector>> futures; + // Generator is async-reentrant + std::vector>> futures; + for (int i = 0; i < num_batches_written_; ++i) { + futures.push_back(generator()); + } + auto fut = generator(); + ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result()); + EXPECT_EQ(nullptr, final_batch); + + RecordBatchVector batches; + for (auto& future : futures) { + ARROW_ASSIGN_OR_RAISE(auto batch, future.result()); + EXPECT_NE(nullptr, batch); + batches.push_back(batch); + } + return batches; + }; + + ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false)); + // Also read with pre-buffered metadata, and check the results are equal + ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true)); for (int i = 0; i < num_batches_written_; ++i) { - futures.push_back(generator()); - } - auto fut = generator(); - EXPECT_FINISHES_OK_AND_EQ(nullptr, fut); - for (auto& future : futures) { - EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future); - out_batches->push_back(batch); + AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i], + /*check_metadata=*/true); } - - // The generator doesn't track stats. - EXPECT_EQ(nullptr, out_stats); - return Status::OK(); } }; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8e125fc5ede..f1571f76c24 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -54,6 +54,7 @@ #include "arrow/util/compression.h" #include "arrow/util/endian.h" #include "arrow/util/fuzz_internal.h" +#include "arrow/util/int_util_overflow.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging_internal.h" #include "arrow/util/parallel.h" @@ -72,6 +73,7 @@ namespace arrow { namespace flatbuf = org::apache::arrow::flatbuf; +using internal::AddWithOverflow; using internal::checked_cast; using internal::checked_pointer_cast; @@ -177,14 +179,16 @@ class ArrayLoader { explicit ArrayLoader(const flatbuf::RecordBatch* metadata, MetadataVersion metadata_version, const IpcReadOptions& options, - int64_t file_offset) + int64_t file_offset, int64_t file_length) : metadata_(metadata), metadata_version_(metadata_version), file_(nullptr), file_offset_(file_offset), + file_length_(file_length), max_recursion_depth_(options.max_recursion_depth) {} Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr* out) { + // This construct permits overriding GetBuffer at compile time if (skip_io_) { return Status::OK(); } @@ -194,7 +198,10 @@ class ArrayLoader { if (length < 0) { return Status::Invalid("Negative length for reading buffer ", buffer_index_); } - // This construct permits overriding GetBuffer at compile time + auto read_end = AddWithOverflow({offset, length}); + if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) { + return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area"); + } if (!bit_util::IsMultipleOf8(offset)) { return Status::Invalid("Buffer ", buffer_index_, " did not start on 8-byte aligned offset: ", offset); @@ -202,6 +209,9 @@ class ArrayLoader { if (file_) { return file_->ReadAt(offset, length).Value(out); } else { + if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) { + return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area"); + } read_request_.RequestRange(offset + file_offset_, length, out); return Status::OK(); } @@ -292,6 +302,16 @@ class ArrayLoader { // we can skip that buffer without reading from shared memory RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_)); + if (::arrow::internal::has_variadic_buffers(type_id)) { + ARROW_ASSIGN_OR_RAISE(auto data_buffer_count, + GetVariadicCount(variadic_count_index_++)); + const int64_t start = static_cast(out_->buffers.size()); + // NOTE: this must be done before any other call to `GetBuffer` because + // BatchDataReadRequest will keep pointers to `std::shared_ptr` + // objects. + out_->buffers.resize(start + data_buffer_count); + } + if (internal::HasValidityBitmap(type_id, metadata_version_)) { // Extract null_bitmap which is common to all arrays except for unions // and nulls. @@ -300,6 +320,7 @@ class ArrayLoader { } buffer_index_++; } + return Status::OK(); } @@ -398,14 +419,9 @@ class ArrayLoader { Status Visit(const BinaryViewType& type) { out_->buffers.resize(2); - RETURN_NOT_OK(LoadCommon(type.id())); - RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1])); - - ARROW_ASSIGN_OR_RAISE(auto data_buffer_count, - GetVariadicCount(variadic_count_index_++)); - out_->buffers.resize(data_buffer_count + 2); - for (int64_t i = 0; i < data_buffer_count; ++i) { - RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2])); + RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers + for (int64_t i = 1; i < static_cast(out_->buffers.size()); ++i) { + RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i])); } return Status::OK(); } @@ -503,6 +519,7 @@ class ArrayLoader { const MetadataVersion metadata_version_; io::RandomAccessFile* file_; int64_t file_offset_; + std::optional file_length_; int max_recursion_depth_; int buffer_index_ = 0; int field_index_ = 0; @@ -1173,8 +1190,19 @@ namespace { // Common functions used in both the random-access file reader and the // asynchronous generator -inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { - return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; +Result FileBlockFromFlatbuffer(const flatbuf::Block* fb_block, + int64_t max_offset) { + auto block = + FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()}; + if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) { + return Status::IOError("Invalid Block in IPC file footer"); + } + auto block_end = + AddWithOverflow({block.offset, block.metadata_length, block.body_length}); + if (!block_end.has_value() || block_end > max_offset) { + return Status::IOError("Invalid Block in IPC file footer"); + } + return block; } Status CheckAligned(const FileBlock& block) { @@ -1362,8 +1390,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { read_options, file, schema, &inclusion_mask); }; } - ARROW_ASSIGN_OR_RAISE(auto message, - ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader)); + ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i)); + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(block, fields_loader)); CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); @@ -1379,8 +1407,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Result CountRows() override { int64_t total = 0; for (int i = 0; i < num_record_batches(); i++) { - ARROW_ASSIGN_OR_RAISE(auto outer_message, - ReadMessageFromBlock(GetRecordBatchBlock(i))); + ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i)); + ARROW_ASSIGN_OR_RAISE(auto outer_message, ReadMessageFromBlock(block)); auto metadata = outer_message->metadata(); const flatbuf::Message* message = nullptr; RETURN_NOT_OK( @@ -1494,13 +1522,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Status DoPreBufferMetadata(const std::vector& indices) { RETURN_NOT_OK(CacheMetadata(indices)); - EnsureDictionaryReadStarted(); + RETURN_NOT_OK(EnsureDictionaryReadStarted()); Future<> all_metadata_ready = WaitForMetadatas(indices); for (int index : indices) { Future> metadata_loaded = all_metadata_ready.Then([this, index]() -> Result> { stats_.num_messages.fetch_add(1, std::memory_order_relaxed); - FileBlock block = GetRecordBatchBlock(index); + ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index)); ARROW_ASSIGN_OR_RAISE( std::shared_ptr metadata, metadata_cache_->Read({block.offset, block.metadata_length})); @@ -1549,12 +1577,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } }; - FileBlock GetRecordBatchBlock(int i) const { - return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + Result GetRecordBatchBlock(int i) const { + return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_); } - FileBlock GetDictionaryBlock(int i) const { - return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); + Result GetDictionaryBlock(int i) const { + return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_); } Result> ReadMessageFromBlock( @@ -1567,16 +1595,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Status ReadDictionaries() { // Read all the dictionaries + std::vector> messages(num_dictionaries()); + for (int i = 0; i < num_dictionaries(); ++i) { + ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i)); + ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block)); + } + return ReadDictionaries(messages); + } + + Status ReadDictionaries( + const std::vector>& dictionary_messages) { + DCHECK_EQ(dictionary_messages.size(), static_cast(num_dictionaries())); IpcReadContext context(&dictionary_memo_, options_, swap_endian_); for (int i = 0; i < num_dictionaries(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i))); - RETURN_NOT_OK(ReadOneDictionary(message.get(), context)); - stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed); + RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context)); } return Status::OK(); } - Status ReadOneDictionary(Message* message, const IpcReadContext& context) { + Status ReadOneDictionary(int dict_index, Message* message, + const IpcReadContext& context) { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); DictionaryKind kind; @@ -1586,44 +1624,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } else if (kind == DictionaryKind::Delta) { stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed); } + stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed); return Status::OK(); } - void AddDictionaryRanges(std::vector* ranges) const { + Status AddDictionaryRanges(std::vector* ranges) const { // Adds all dictionaries to the range cache for (int i = 0; i < num_dictionaries(); ++i) { - FileBlock block = GetDictionaryBlock(i); + ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i)); ranges->push_back({block.offset, block.metadata_length + block.body_length}); } + return Status::OK(); } - void AddMetadataRanges(const std::vector& indices, - std::vector* ranges) { + Status AddMetadataRanges(const std::vector& indices, + std::vector* ranges) { for (int index : indices) { - FileBlock block = GetRecordBatchBlock(static_cast(index)); + ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index)); ranges->push_back({block.offset, block.metadata_length}); } + return Status::OK(); } Status CacheMetadata(const std::vector& indices) { std::vector ranges; if (!read_dictionaries_) { - AddDictionaryRanges(&ranges); + RETURN_NOT_OK(AddDictionaryRanges(&ranges)); } - AddMetadataRanges(indices, &ranges); + RETURN_NOT_OK(AddMetadataRanges(indices, &ranges)); return metadata_cache_->Cache(std::move(ranges)); } - void EnsureDictionaryReadStarted() { + Status EnsureDictionaryReadStarted() { if (!dictionary_load_finished_.is_valid()) { read_dictionaries_ = true; std::vector ranges; - AddDictionaryRanges(&ranges); + RETURN_NOT_OK(AddDictionaryRanges(&ranges)); dictionary_load_finished_ = metadata_cache_->WaitFor(std::move(ranges)).Then([this] { return ReadDictionaries(); }); } + return Status::OK(); } Status WaitForDictionaryReadFinished() { @@ -1641,7 +1683,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Future<> WaitForMetadatas(const std::vector& indices) { std::vector ranges; - AddMetadataRanges(indices, &ranges); + RETURN_NOT_OK(AddMetadataRanges(indices, &ranges)); return metadata_cache_->WaitFor(std::move(ranges)); } @@ -1685,12 +1727,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { const flatbuf::RecordBatch* batch, IpcReadContext context, io::RandomAccessFile* file, std::shared_ptr owned_file, - int64_t block_data_offset) + int64_t block_data_offset, int64_t block_data_length) : schema(std::move(sch)), context(std::move(context)), file(file), owned_file(std::move(owned_file)), - loader(batch, context.metadata_version, context.options, block_data_offset), + loader(batch, context.metadata_version, context.options, block_data_offset, + block_data_length), columns(schema->num_fields()), cache(file, file->io_context(), io::CacheOptions::LazyDefaults()), length(batch->length()) {} @@ -1789,14 +1832,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return dictionary_load_finished_.Then([message_fut] { return message_fut; }) .Then([this, index](const std::shared_ptr& message_obj) -> Future> { - FileBlock block = GetRecordBatchBlock(index); + ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index)); ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj)); ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message)); ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch)); auto read_context = std::make_shared( schema_, batch, std::move(context), file_, owned_file_, - block.offset + static_cast(block.metadata_length)); + block.offset + static_cast(block.metadata_length), + block.body_length); RETURN_NOT_OK(read_context->CalculateLoadRequest()); return read_context->ReadAsync().Then( [read_context] { return read_context->CreateRecordBatch(); }); @@ -1915,25 +1959,31 @@ Future WholeIpcFileRecordBatchGenerator::operator()() { auto state = state_; if (!read_dictionaries_.is_valid()) { - std::vector>> messages(state->num_dictionaries()); - for (int i = 0; i < state->num_dictionaries(); i++) { - auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i)); - messages[i] = ReadBlock(block); - } - auto read_messages = All(std::move(messages)); - if (executor_) read_messages = executor_->Transfer(read_messages); - read_dictionaries_ = read_messages.Then( - [=](const std::vector>>& maybe_messages) - -> Status { - ARROW_ASSIGN_OR_RAISE(auto messages, - arrow::internal::UnwrapOrRaise(maybe_messages)); - return ReadDictionaries(state.get(), std::move(messages)); - }); + if (state->dictionary_load_finished_.is_valid()) { + // PreBufferMetadata has started reading dictionaries in the background + read_dictionaries_ = state->dictionary_load_finished_; + } else { + // Start reading dictionaries + std::vector>> messages(state->num_dictionaries()); + for (int i = 0; i < state->num_dictionaries(); i++) { + ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i)); + messages[i] = ReadBlock(block); + } + auto read_messages = All(std::move(messages)); + if (executor_) read_messages = executor_->Transfer(read_messages); + read_dictionaries_ = read_messages.Then( + [=](const std::vector>>& maybe_messages) + -> Status { + ARROW_ASSIGN_OR_RAISE(auto messages, + arrow::internal::UnwrapOrRaise(maybe_messages)); + return state->ReadDictionaries(messages); + }); + } } if (index_ >= state_->num_record_batches()) { return Future::MakeFinished(IterationTraits::End()); } - auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++)); + ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++)); auto read_message = ReadBlock(block); auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; }); // Force transfer. This may be wasteful in some cases, but ensures we get off the @@ -1969,16 +2019,6 @@ Future> WholeIpcFileRecordBatchGenerator::ReadBlock( } } -Status WholeIpcFileRecordBatchGenerator::ReadDictionaries( - RecordBatchFileReaderImpl* state, - std::vector> dictionary_messages) { - IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); - for (const auto& message : dictionary_messages) { - RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context)); - } - return Status::OK(); -} - Result> WholeIpcFileRecordBatchGenerator::ReadRecordBatch( RecordBatchFileReaderImpl* state, Message* message) { CHECK_HAS_BODY(*message); @@ -2630,6 +2670,14 @@ Status ValidateFuzzBatch(const RecordBatch& batch) { return st; } +Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) { + if (batch.batch) { + RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch)); + } + // XXX do something with custom metadata? + return Status::OK(); +} + IpcReadOptions FuzzingOptions() { IpcReadOptions options; options.memory_pool = ::arrow::internal::fuzzing_memory_pool(); @@ -2648,12 +2696,12 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) { Status st; while (true) { - std::shared_ptr batch; - RETURN_NOT_OK(batch_reader->ReadNext(&batch)); - if (batch == nullptr) { + ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext()); + if (!batch.batch && !batch.custom_metadata) { + // EOS break; } - st &= ValidateFuzzBatch(*batch); + st &= ValidateFuzzBatch(batch); } return st; @@ -2661,20 +2709,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) { Status FuzzIpcFile(const uint8_t* data, int64_t size) { auto buffer = std::make_shared(data, size); - io::BufferReader buffer_reader(buffer); - std::shared_ptr batch_reader; - ARROW_ASSIGN_OR_RAISE(batch_reader, - RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions())); - Status st; + Status final_status; - const int n_batches = batch_reader->num_record_batches(); - for (int i = 0; i < n_batches; ++i) { - ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i)); - st &= ValidateFuzzBatch(*batch); + auto do_read = [&](bool pre_buffer) { + io::BufferReader buffer_reader(buffer); + ARROW_ASSIGN_OR_RAISE(auto batch_reader, + RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions())); + if (pre_buffer) { + // Pre-buffer all record batches + RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{})); + } + + const int n_batches = batch_reader->num_record_batches(); + for (int i = 0; i < n_batches; ++i) { + RecordBatchWithMetadata batch; + auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch); + final_status &= st; + if (!st.ok()) { + continue; + } + final_status &= ValidateFuzzBatch(batch); + } + return Status::OK(); + }; + + for (const bool pre_buffer : {false, true}) { + final_status &= do_read(pre_buffer); } - return st; + return final_status; } Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) { diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index 02e6b816c0b..ceca6d9e434 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -368,19 +369,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo return builder.Finish(out); } -template -static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls, - MemoryPool* pool, - std::shared_ptr* out) { - BuilderType builder(pool); +template BuilderType> +static Result> MakeBinaryArrayWithUniqueValues( + BuilderType builder, int64_t length, bool include_nulls) { + if constexpr (std::is_base_of_v) { + // Try to emit several variadic buffers by choosing a small block size. + builder.SetBlockSize(512); + } for (int64_t i = 0; i < length; ++i) { if (include_nulls && (i % 7 == 0)) { RETURN_NOT_OK(builder.AppendNull()); } else { - RETURN_NOT_OK(builder.Append(std::to_string(i))); + // Make sure that some strings are long enough to have non-inline binary views + const auto base = std::to_string(i); + std::string value; + for (int64_t j = 0; j < 3 * (i % 10); ++j) { + value += base; + } + RETURN_NOT_OK(builder.Append(value)); } } - return builder.Finish(out); + return builder.Finish(); } Status MakeStringTypesRecordBatch(std::shared_ptr* out, bool with_nulls, @@ -390,22 +399,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr* out, bool with_n ArrayVector arrays; FieldVector fields; - auto AppendColumn = [&](auto& MakeArray) { - arrays.emplace_back(); - RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back())); - - const auto& type = arrays.back()->type(); - fields.push_back(field(type->ToString(), type)); + auto AppendColumn = [&](auto builder) { + ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues( + std::move(builder), length, with_nulls)); + arrays.push_back(array); + fields.push_back(field(array->type()->ToString(), array->type())); return Status::OK(); }; - RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues)); - RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues)); - RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues)); - RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues)); + auto pool = default_memory_pool(); + RETURN_NOT_OK(AppendColumn(StringBuilder(pool))); + RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool))); + RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool))); + RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool))); if (with_view_types) { - RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues)); - RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues)); + RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool))); + RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool))); } *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays)); diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index f68d2dcb619..e3582056ead 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -2575,6 +2575,16 @@ constexpr bool may_have_validity_bitmap(Type::type id) { } } +constexpr bool has_variadic_buffers(Type::type id) { + switch (id) { + case Type::BINARY_VIEW: + case Type::STRING_VIEW: + return true; + default: + return false; + } +} + ARROW_DEPRECATED("Deprecated in 17.0.0. Use may_have_validity_bitmap() instead.") constexpr bool HasValidityBitmap(Type::type id) { return may_have_validity_bitmap(id); } diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h index 93066fecafa..69714a935a4 100644 --- a/cpp/src/arrow/util/int_util_overflow.h +++ b/cpp/src/arrow/util/int_util_overflow.h @@ -18,7 +18,9 @@ #pragma once #include +#include #include +#include #include #include "arrow/status.h" @@ -162,6 +164,37 @@ NON_GENERIC_OPS_WITH_OVERFLOW(DivideWithOverflow) #undef NON_GENERIC_OPS_WITH_OVERFLOW #undef NON_GENERIC_OP_WITH_OVERFLOW +// Convenience functions over an arbitrary number of arguments +template +std::optional AddWithOverflow(std::initializer_list vs) { + if (vs.size() == 0) { + return {}; + } + auto it = vs.begin(); + Int v = *it++; + while (it != vs.end()) { + if (ARROW_PREDICT_FALSE(AddWithOverflowGeneric(v, *it++, &v))) { + return {}; + } + } + return v; +} + +template +std::optional MultiplyWithOverflow(std::initializer_list vs) { + if (vs.size() == 0) { + return {}; + } + auto it = vs.begin(); + Int v = *it++; + while (it != vs.end()) { + if (ARROW_PREDICT_FALSE(MultiplyWithOverflowGeneric(v, *it++, &v))) { + return {}; + } + } + return v; +} + // Define function NegateWithOverflow with the signature `bool(T u, T* out)` // where T is a signed integer type. On overflow, these functions return true. // Otherwise, false is returned and `out` is updated with the result of the diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc index 7217c1097e4..cffa4e9d15e 100644 --- a/cpp/src/arrow/util/int_util_test.cc +++ b/cpp/src/arrow/util/int_util_test.cc @@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) { this->CheckOk(almost_min, almost_max + T{2}, T{1}); } +TEST(AddWithOverflow, Variadic) { + ASSERT_EQ(AddWithOverflow({}), std::nullopt); + ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6); + ASSERT_EQ(AddWithOverflow({1, 2, 125}), std::nullopt); + ASSERT_EQ(AddWithOverflow({125, 2, 1}), std::nullopt); + ASSERT_EQ(AddWithOverflow({1, 2, 125}), 128); + ASSERT_EQ(AddWithOverflow({125, 2, 1}), 128); +} + +TEST(MultiplyWithOverflow, Variadic) { + ASSERT_EQ(MultiplyWithOverflow({}), std::nullopt); + ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24); + ASSERT_EQ(MultiplyWithOverflow({2, 2, 32}), std::nullopt); + ASSERT_EQ(MultiplyWithOverflow({32, 4, 1}), std::nullopt); + ASSERT_EQ(MultiplyWithOverflow({2, 2, 32}), 128); + ASSERT_EQ(MultiplyWithOverflow({32, 4, 1}), 128); +} + } // namespace internal } // namespace arrow