diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 0bc5dc06472..e988ed3c5cc 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -173,6 +173,7 @@ set(PARQUET_SRCS encryption/internal_file_encryptor.cc exception.cc file_reader.cc + file_rewriter.cc file_writer.cc geospatial/statistics.cc geospatial/util_internal.cc @@ -412,6 +413,8 @@ add_parquet_test(arrow-reader-writer-test add_parquet_test(arrow-index-test SOURCES arrow/index_test.cc) +add_parquet_test(arrow-rewriter-test SOURCES arrow/arrow_rewriter_test.cc) + add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc arrow/reconstruct_internal_test.cc) diff --git a/cpp/src/parquet/arrow/arrow_rewriter_test.cc b/cpp/src/parquet/arrow/arrow_rewriter_test.cc new file mode 100644 index 00000000000..32c7df67d4c --- /dev/null +++ b/cpp/src/parquet/arrow/arrow_rewriter_test.cc @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/memory.h" +#include "arrow/testing/gtest_util.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" +#include "parquet/file_rewriter.h" +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include + +#include "gtest/gtest.h" + +#include "parquet/arrow/test_util.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +using arrow::Table; +using arrow::io::BufferReader; + +namespace parquet::arrow { + +TEST(ParquetRewriterTest, SimpleRoundTrip) { + auto rewriter_properties = + RewriterProperties::Builder() + .writer_properties( + WriterProperties::Builder().enable_write_page_index()->build()) + ->build(); + + auto schema = ::arrow::schema( + {::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())}); + + std::shared_ptr buffer; + + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}), buffer); + + auto sink = CreateOutputStream(); + auto rewriter = + ParquetFileRewriter::Open({std::make_shared(buffer)}, sink, {NULLPTR}, + NULLPTR, rewriter_properties); + rewriter->Rewrite(); + rewriter->Close(); + + ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish()); + auto file_reader = ParquetFileReader::Open(std::make_shared(out_buffer)); + ASSERT_OK_AND_ASSIGN(auto reader, FileReader::Make(::arrow::default_memory_pool(), + std::move(file_reader))); + + std::shared_ptr table; + ASSERT_OK(reader->ReadTable(&table)); + ASSERT_OK(table->ValidateFull()); + + auto expected_table = ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}); + AssertTablesEqual(*expected_table, *table); +} + +TEST(ParquetRewriterTest, ConcatRoundTrip) { + auto rewriter_properties = + RewriterProperties::Builder() + .writer_properties( + WriterProperties::Builder().enable_write_page_index()->build()) + ->build(); + + auto schema = ::arrow::schema( + {::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())}); + + std::shared_ptr buffer_up; + std::shared_ptr buffer_down; + + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}), buffer_up); + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(schema, {R"([[3, "c"]])"}), buffer_down); + + auto sink = CreateOutputStream(); + auto rewriter = + ParquetFileRewriter::Open({std::make_shared(buffer_up), + std::make_shared(buffer_down)}, + sink, {NULLPTR, NULLPTR}, NULLPTR, rewriter_properties); + rewriter->Rewrite(); + rewriter->Close(); + + ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish()); + auto file_reader = ParquetFileReader::Open(std::make_shared(out_buffer)); + ASSERT_OK_AND_ASSIGN(auto reader, FileReader::Make(::arrow::default_memory_pool(), + std::move(file_reader))); + + std::shared_ptr
table; + ASSERT_OK(reader->ReadTable(&table)); + ASSERT_OK(table->ValidateFull()); + + auto expected_table = + ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"}); + AssertTablesEqual(*expected_table, *table); +} + +} // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index 05f6fd24ac0..30d75b58e0a 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -28,18 +28,23 @@ #include "arrow/array/builder_binary.h" #include "arrow/array/builder_decimal.h" #include "arrow/array/builder_primitive.h" +#include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/type_fwd.h" #include "arrow/type_traits.h" #include "arrow/util/decimal.h" #include "arrow/util/float16.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" #include "parquet/column_reader.h" +#include "parquet/file_writer.h" #include "parquet/test_util.h" namespace parquet { using internal::RecordReader; +using schema::GroupNode; namespace arrow { @@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) { EXPECT_TRUE(result->Equals(*expected_array)); } +void WriteFile(const std::shared_ptr& writer_properties, + const std::shared_ptr<::arrow::Table>& table, + std::shared_ptr& buffer) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer, sink->Finish()); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/file_rewriter.cc b/cpp/src/parquet/file_rewriter.cc new file mode 100644 index 00000000000..23cc5c7bbc6 --- /dev/null +++ b/cpp/src/parquet/file_rewriter.cc @@ -0,0 +1,429 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include +#include +#include +#include + +#include "arrow/util/logging.h" +#include "parquet/bloom_filter.h" // IWYU pragma: keep +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/index_location.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +namespace { +void CopyStream(std::shared_ptr from, + std::shared_ptr to, int64_t size, + ::arrow::MemoryPool* pool) { + int64_t bytes_copied = 0; + if (from->supports_zero_copy()) { + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied)); + if (buffer->size() == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size())); + bytes_copied += buffer->size(); + } + return; + } + + std::shared_ptr buffer = + AllocateBuffer(pool, kDefaultOutputStreamSize); + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto read_size, from->Read(size - bytes_copied, &buffer)); + if (read_size == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); + bytes_copied += read_size; + } +} +} // namespace + +const std::shared_ptr& default_rewriter_properties() { + static std::shared_ptr default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr source, + std::shared_ptr sink, + const RewriterProperties* props, + std::shared_ptr row_group_reader, + std::shared_ptr page_index_reader, + std::shared_ptr bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteRowGroupData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + + bool fast_copy = metadata_->file_offset() != 0; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size(), + props_->memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + if (column_index != nullptr) { + page_index_builder->SetColumnIndex(i, column_index); + } + if (offset_index != nullptr) { + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + // TODO(HuaHuaY): add else branch to rewrite column chunk with new encoding, + // compression, etc. + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size(), + props_->memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } + } + } + } + + private: + std::shared_ptr source_; + std::shared_ptr sink_; + const RewriterProperties* props_; + std::shared_ptr row_group_reader_; + std::shared_ptr page_index_reader_; + std::shared_ptr bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr source, + std::shared_ptr sink, + std::shared_ptr source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, + {/*column_index=*/true, /*offset_index=*/true}); + } + + void WriteRowGroupData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder, + total_bytes_written); + ++current_row_group_index_; + } + + bool HasMoreRowGroup() { + return current_row_group_index_ < metadata_->num_row_groups(); + } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr source_; + std::shared_ptr sink_; + const RewriterProperties* props_; + std::unique_ptr parquet_file_reader_; + std::shared_ptr page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector> rewriters) + : file_rewriters_(std::move(rewriters)) { + auto* schema = file_rewriters_[0]->schema(); + for (size_t i = 1; i < file_rewriters_.size(); ++i) { + if (!schema->Equals(*file_rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i, + ", schema:", file_rewriters_[i]->schema()->ToString()); + } + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + file_rewriters_[current_rewriter_index_]->WriteRowGroupData( + rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasMoreRowGroup() { + while (current_rewriter_index_ < file_rewriters_.size() && + !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) { + file_rewriters_[current_rewriter_index_]->Close(); + ARROW_LOG(DEBUG) << "Finished rewriting file index " << current_rewriter_index_; + ++current_rewriter_index_; + } + return current_rewriter_index_ < file_rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < file_rewriters_.size(); ++i) { + file_rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return file_rewriters_[0]->schema(); } + + std::vector row_group_row_counts() const { + std::vector row_counts; + for (auto& rewriter : file_rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector> file_rewriters_; + size_t current_rewriter_index_{}; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr Open( + std::vector> sources, + std::shared_ptr sink, + std::vector> sources_metadata, + std::shared_ptr sink_metadata, + std::shared_ptr props) { + if (sources.size() != sources_metadata.size()) { + throw ParquetException( + "The number of sources and sources_metadata must be the same."); + } + std::unique_ptr result(new GeneratedFile( + std::move(sources), std::move(sink), std::move(sources_metadata), + std::move(sink_metadata), std::move(props))); + return result; + } + + void Close() override { + if (rewriter_) { + rewriter_->Close(); + rewriter_.reset(); + } + } + + void Rewrite() override { + while (rewriter_->HasMoreRowGroup()) { + auto* rg_metadata_builder = metadata_builder_->AppendRowGroup(); + page_index_builder_->AppendRowGroup(); + int64_t total_bytes_written = 0; + rewriter_->WriteRowGroupData(rg_metadata_builder, page_index_builder_.get(), + total_bytes_written); + rg_metadata_builder->Finish(total_bytes_written); + } + page_index_builder_->Finish(); + + auto [column_index_locations, offset_index_locations] = + page_index_builder_->WriteTo(sink_.get()); + metadata_builder_->SetIndexLocations(IndexKind::kColumnIndex, column_index_locations); + metadata_builder_->SetIndexLocations(IndexKind::kOffsetIndex, offset_index_locations); + + auto file_metadata = metadata_builder_->Finish(sink_metadata_); + WriteFileMetaData(*file_metadata, sink_.get()); + } + + private: + GeneratedFile(std::vector> sources, + std::shared_ptr sink, + std::vector> sources_metadata, + std::shared_ptr sink_metadata, + std::shared_ptr props) + : sink_(std::move(sink)), + props_(std::move(props)), + sink_metadata_(std::move(sink_metadata)) { + std::vector> rewriters; + rewriters.reserve(sources.size()); + for (size_t i = 0; i < sources.size(); ++i) { + rewriters.emplace_back(std::make_unique( + std::move(sources[i]), sink_, std::move(sources_metadata[i]), props_.get())); + } + rewriter_ = std::make_unique(std::move(rewriters)); + + if (props_->writer_properties()->file_encryption_properties() == nullptr) { + // Unencrypted parquet files always start with PAR1 + PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); + } else { + throw ParquetException( + "NotImplemented, rewriter does not support to write encrypted files."); + } + + auto new_schema = rewriter_->schema()->schema_root(); + new_schema_.Init(new_schema); + metadata_builder_ = + FileMetaDataBuilder::Make(&new_schema_, props_->writer_properties()); + if (props_->writer_properties()->page_index_enabled()) { + page_index_builder_ = PageIndexBuilder::Make(&new_schema_, nullptr); + } + } + + std::shared_ptr sink_; + std::shared_ptr props_; + std::shared_ptr sink_metadata_; + std::unique_ptr rewriter_; + + SchemaDescriptor new_schema_; + std::unique_ptr metadata_builder_; + std::unique_ptr page_index_builder_; +}; + +// ---------------------------------------------------------------------- +// ParquetFilesRewriter public API + +ParquetFileRewriter::ParquetFileRewriter() = default; + +ParquetFileRewriter::~ParquetFileRewriter() { + try { + Close(); + } catch (...) { + } +} + +std::unique_ptr ParquetFileRewriter::Open( + std::vector> sources, + std::shared_ptr sink, + std::vector> sources_metadata, + std::shared_ptr sink_metadata, + std::shared_ptr props) { + auto contents = GeneratedFile::Open(std::move(sources), std::move(sink), + std::move(sources_metadata), + std::move(sink_metadata), std::move(props)); + std::unique_ptr result(new ParquetFileRewriter()); + result->Open(std::move(contents)); + return result; +} + +void ParquetFileRewriter::Open(std::unique_ptr contents) { + contents_ = std::move(contents); +} + +void ParquetFileRewriter::Close() { + if (contents_) { + contents_->Close(); + contents_.reset(); + } +} + +void ParquetFileRewriter::Rewrite() { contents_->Rewrite(); } + +} // namespace parquet diff --git a/cpp/src/parquet/file_rewriter.h b/cpp/src/parquet/file_rewriter.h new file mode 100644 index 00000000000..f904c5c3b84 --- /dev/null +++ b/cpp/src/parquet/file_rewriter.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "parquet/platform.h" +#include "parquet/properties.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +class PARQUET_EXPORT ParquetFileRewriter { + public: + struct PARQUET_EXPORT Contents { + virtual ~Contents() = default; + virtual void Close() = 0; + virtual void Rewrite() = 0; + }; + + ParquetFileRewriter(); + ~ParquetFileRewriter(); + + static std::unique_ptr Open( + std::vector> sources, + std::shared_ptr sink, + std::vector> sources_metadata, + std::shared_ptr sink_metadata = NULLPTR, + std::shared_ptr props = default_rewriter_properties()); + + void Open(std::unique_ptr contents); + void Close(); + + void Rewrite(); + + private: + std::unique_ptr contents_; +}; + +} // namespace parquet diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 5def0b8329e..260c1f70d39 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -413,6 +413,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; } + inline int64_t start_offset() const { + return has_dictionary_page() ? dictionary_page_offset() : data_page_offset(); + } + inline bool has_index_page() const { return column_metadata_->__isset.index_page_offset; } @@ -454,6 +458,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return key_value_metadata_; } + const void* to_thrift() const { return column_metadata_; } + private: void InitKeyValueMetadata() { key_value_metadata_ = FromThriftKeyValueMetadata(*column_metadata_); @@ -550,6 +556,8 @@ int64_t ColumnChunkMetaData::data_page_offset() const { return impl_->data_page_offset(); } +int64_t ColumnChunkMetaData::start_offset() const { return impl_->start_offset(); } + bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); } int64_t ColumnChunkMetaData::index_page_offset() const { @@ -601,6 +609,8 @@ const std::shared_ptr& ColumnChunkMetaData::key_value_me return impl_->key_value_metadata(); } +const void* ColumnChunkMetaData::to_thrift() const { return impl_->to_thrift(); } + // row-group metadata class RowGroupMetaData::RowGroupMetaDataImpl { public: @@ -1888,6 +1898,23 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { return column_builder_ptr; } + void NextColumnChunk(std::unique_ptr cc_metadata, int64_t shift) { + auto* column_chunk = &row_group_->columns[next_column_++]; + column_chunk->__set_file_offset(0); + column_chunk->__isset.meta_data = true; + column_chunk->meta_data = + *static_cast(cc_metadata->to_thrift()); + column_chunk->meta_data.__set_dictionary_page_offset( + column_chunk->meta_data.dictionary_page_offset + shift); + column_chunk->meta_data.__set_data_page_offset( + column_chunk->meta_data.data_page_offset + shift); + column_chunk->meta_data.__set_index_page_offset( + column_chunk->meta_data.index_page_offset + shift); + column_chunk->meta_data.__set_bloom_filter_offset( + column_chunk->meta_data.bloom_filter_offset + shift); + column_builders_.push_back(NULLPTR); + } + int current_column() { return next_column_ - 1; } void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) { @@ -1919,6 +1946,10 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { } // sometimes column metadata is encrypted and not available to read, // so we must get total_compressed_size from column builder + if (column_builders_[i] == NULLPTR) { + total_compressed_size += row_group_->columns[i].meta_data.total_compressed_size; + continue; + } total_compressed_size += column_builders_[i]->total_compressed_size(); } @@ -1973,6 +2004,11 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { return impl_->NextColumnChunk(); } +void RowGroupMetaDataBuilder::NextColumnChunk( + std::unique_ptr cc_metadata, int64_t shift) { + return impl_->NextColumnChunk(std::move(cc_metadata), shift); +} + int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); } int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 6d9deee106b..045d14e9a10 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -150,6 +150,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { bool has_dictionary_page() const; int64_t dictionary_page_offset() const; int64_t data_page_offset() const; + int64_t start_offset() const; bool has_index_page() const; int64_t index_page_offset() const; int64_t total_compressed_size() const; @@ -159,6 +160,8 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::optional GetOffsetIndexLocation() const; const std::shared_ptr& key_value_metadata() const; + const void* to_thrift() const; + private: explicit ColumnChunkMetaData( const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, @@ -479,6 +482,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { ~RowGroupMetaDataBuilder(); ColumnChunkMetaDataBuilder* NextColumnChunk(); + void NextColumnChunk(std::unique_ptr cc_metadata, int64_t shift); int num_columns(); int64_t num_rows(); int current_column() const; diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index c06fc77dc53..71f2349927c 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -31,6 +31,7 @@ #include "arrow/util/unreachable.h" #include +#include #include namespace parquet { @@ -770,6 +771,11 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { column_index_builders_.back().resize(num_columns); offset_index_builders_.back().resize(num_columns); + column_indices_.emplace_back(); + offset_indices_.emplace_back(); + column_indices_.back().resize(num_columns); + offset_indices_.back().resize(num_columns); + DCHECK_EQ(column_index_builders_.size(), offset_index_builders_.size()); DCHECK_EQ(column_index_builders_.back().size(), num_columns); DCHECK_EQ(offset_index_builders_.back().size(), num_columns); @@ -793,6 +799,23 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { return builder.get(); } + void SetColumnIndex(int32_t i, + const std::shared_ptr& column_index) override { + CheckState(i); + column_indices_.back()[i] = + std::make_unique(ToThrift(*column_index)); + } + + void SetOffsetIndex(int32_t i, const std::shared_ptr& offset_index, + int64_t shift) override { + CheckState(i); + auto index = std::make_unique(ToThrift(*offset_index)); + for (auto& page_location : index->page_locations) { + page_location.__set_offset(page_location.offset + shift); + } + offset_indices_.back()[i] = std::move(index); + } + void Finish() override { finished_ = true; } WriteResult WriteTo(::arrow::io::OutputStream* sink) const override { @@ -802,11 +825,13 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { WriteResult result; - // Serialize column index ordered by row group ordinal and then column ordinal. - result.column_index_locations = SerializeIndex(column_index_builders_, sink); + /// Serialize column index ordered by row group ordinal and then column ordinal. + result.column_index_locations = + SerializeIndex(column_index_builders_, column_indices_, sink); - // Serialize offset index ordered by row group ordinal and then column ordinal. - result.offset_index_locations = SerializeIndex(offset_index_builders_, sink); + /// Serialize offset index ordered by row group ordinal and then column ordinal. + result.offset_index_locations = + SerializeIndex(offset_index_builders_, offset_indices_, sink); return result; } @@ -841,9 +866,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { return encryptor; } + template + using Index = std::conditional_t, + format::ColumnIndex, format::OffsetIndex>; + template IndexLocations SerializeIndex( const std::vector>>& page_index_builders, + const std::vector>>>& page_indices, ::arrow::io::OutputStream* sink) const { IndexLocations locations; @@ -854,20 +884,28 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { // Serialize the same kind of page index row group by row group. for (size_t row_group = 0; row_group < page_index_builders.size(); ++row_group) { + const auto& row_group_page_indices = page_indices[row_group]; + DCHECK_EQ(row_group_page_indices.size(), num_columns); + const auto& row_group_page_index_builders = page_index_builders[row_group]; DCHECK_EQ(row_group_page_index_builders.size(), num_columns); // In the same row group, serialize the same kind of page index column by column. for (size_t column = 0; column < num_columns; ++column) { + const auto& column_page_index = row_group_page_indices[column]; const auto& column_page_index_builder = row_group_page_index_builders[column]; - if (column_page_index_builder != nullptr) { + if (column_page_index != nullptr || column_page_index_builder != nullptr) { /// Get encryptor if encryption is enabled. std::shared_ptr encryptor = GetColumnMetaEncryptor( static_cast(row_group), static_cast(column), module_type); // Try serializing the page index. PARQUET_ASSIGN_OR_THROW(int64_t pos_before_write, sink->Tell()); - column_page_index_builder->WriteTo(sink, encryptor.get()); + if (column_page_index != nullptr) { + ThriftSerializer{}.Serialize(column_page_index.get(), sink, encryptor.get()); + } else { + column_page_index_builder->WriteTo(sink, encryptor.get()); + } PARQUET_ASSIGN_OR_THROW(int64_t pos_after_write, sink->Tell()); int64_t len = pos_after_write - pos_before_write; @@ -895,6 +933,8 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { InternalFileEncryptor* file_encryptor_; std::vector>> column_index_builders_; std::vector>> offset_index_builders_; + std::vector>> column_indices_; + std::vector>> offset_indices_; bool finished_ = false; }; diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 67e68288532..c2433de9c64 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -23,6 +23,7 @@ #include "parquet/type_fwd.h" #include "parquet/types.h" +#include #include #include @@ -370,6 +371,12 @@ class PARQUET_EXPORT PageIndexBuilder { /// the PageIndexBuilder. virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0; + virtual void SetColumnIndex(int32_t i, + const std::shared_ptr& column_index) = 0; + + virtual void SetOffsetIndex(int32_t i, const std::shared_ptr& offset_index, + int64_t shift) = 0; + /// \brief Complete the page index builder and no more write is allowed. virtual void Finish() = 0; diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index 94024ad403b..b45ff4b50a4 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -28,7 +28,7 @@ namespace parquet { std::shared_ptr ReaderProperties::GetStream( - std::shared_ptr source, int64_t start, int64_t num_bytes) { + std::shared_ptr source, int64_t start, int64_t num_bytes) const { if (buffered_stream_enabled_) { // ARROW-6180 / PARQUET-1636 Create isolated reader that references segment // of source diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index ee829c4dbc5..e43e6d1129d 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -77,7 +77,7 @@ class PARQUET_EXPORT ReaderProperties { MemoryPool* memory_pool() const { return pool_; } std::shared_ptr GetStream(std::shared_ptr source, - int64_t start, int64_t num_bytes); + int64_t start, int64_t num_bytes) const; /// Buffered stream reading allows the user to control the memory usage of /// parquet readers. This ensure that all `RandomAccessFile::ReadAt` calls are @@ -1526,4 +1526,74 @@ struct ArrowWriteContext { PARQUET_EXPORT std::shared_ptr default_arrow_writer_properties(); +class PARQUET_EXPORT RewriterProperties { + public: + class Builder { + public: + Builder() + : pool_(::arrow::default_memory_pool()), + writer_properties_(default_writer_properties()), + reader_properties_(default_reader_properties()) {} + + explicit Builder(const RewriterProperties& properties) + : pool_(properties.memory_pool()), + writer_properties_(properties.writer_properties()), + reader_properties_(properties.reader_properties()) {} + + virtual ~Builder() = default; + + /// Specify the memory pool for the rewriter. Default default_memory_pool. + Builder* memory_pool(MemoryPool* pool) { + pool_ = pool; + return this; + } + + /// Set the writer properties. + Builder* writer_properties(std::shared_ptr properties) { + writer_properties_ = std::move(properties); + return this; + } + + /// Set the reader properties. + Builder* reader_properties(ReaderProperties properties) { + reader_properties_ = std::move(properties); + return this; + } + + /// Build the RewriterProperties with the builder parameters. + std::shared_ptr build() { + return std::shared_ptr(new RewriterProperties( + pool_, std::move(writer_properties_), std::move(reader_properties_))); + } + + private: + MemoryPool* pool_; + std::shared_ptr writer_properties_; + ReaderProperties reader_properties_; + }; + + MemoryPool* memory_pool() const { return pool_; } + + const std::shared_ptr& writer_properties() const { + return writer_properties_; + } + + const ReaderProperties& reader_properties() const { return reader_properties_; } + + private: + explicit RewriterProperties(MemoryPool* pool, + std::shared_ptr writer_properties, + ReaderProperties reader_properties) + : pool_(pool), + writer_properties_(std::move(writer_properties)), + reader_properties_(std::move(reader_properties)) {} + + MemoryPool* pool_; + std::shared_ptr writer_properties_; + ReaderProperties reader_properties_; +}; + +PARQUET_EXPORT +const std::shared_ptr& default_rewriter_properties(); + } // namespace parquet diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index 1ffe99eb3c9..81d38ae4d2a 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -42,6 +42,7 @@ #include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" #include "parquet/geospatial/statistics.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/size_statistics.h" @@ -544,6 +545,49 @@ static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats) return size_statistics; } +static inline format::PageLocation ToThrift(const PageLocation& page_location) { + format::PageLocation thrift_page_location; + thrift_page_location.__set_offset(page_location.offset); + thrift_page_location.__set_compressed_page_size(page_location.compressed_page_size); + thrift_page_location.__set_first_row_index(page_location.first_row_index); + return thrift_page_location; +} + +static inline format::ColumnIndex ToThrift(const ColumnIndex& column_index) { + format::ColumnIndex thrift_column_index; + thrift_column_index.__set_null_pages(column_index.null_pages()); + thrift_column_index.__set_min_values(column_index.encoded_min_values()); + thrift_column_index.__set_max_values(column_index.encoded_max_values()); + thrift_column_index.__set_boundary_order(ToThrift(column_index.boundary_order())); + if (column_index.has_null_counts()) { + thrift_column_index.__set_null_counts(column_index.null_counts()); + } + if (column_index.has_definition_level_histograms()) { + thrift_column_index.__set_definition_level_histograms( + column_index.definition_level_histograms()); + } + if (column_index.has_repetition_level_histograms()) { + thrift_column_index.__set_repetition_level_histograms( + column_index.repetition_level_histograms()); + } + return thrift_column_index; +} + +static inline format::OffsetIndex ToThrift(const OffsetIndex& offset_index) { + format::OffsetIndex thrift_offset_index; + std::vector thrift_page_locations; + thrift_page_locations.reserve(offset_index.page_locations().size()); + for (const auto& page_location : offset_index.page_locations()) { + thrift_page_locations.push_back(ToThrift(page_location)); + } + thrift_offset_index.__set_page_locations(std::move(thrift_page_locations)); + if (!offset_index.unencoded_byte_array_data_bytes().empty()) { + thrift_offset_index.__set_unencoded_byte_array_data_bytes( + offset_index.unencoded_byte_array_data_bytes()); + } + return thrift_offset_index; +} + // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities