Skip to content

Commit 2e0f4bd

Browse files
ulysses-youzhejiangxiaomai
authored andcommitted
Support empty relation write (#370)
Co-authored-by: youxiduo <[email protected]>
1 parent ff311fc commit 2e0f4bd

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

velox/dwio/parquet/writer/Writer.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,25 @@ Writer::Writer(
132132
std::move(sink),
133133
*generalPool_,
134134
options.bufferGrowRatio)),
135-
arrowContext_(std::make_shared<ArrowContext>()),
136-
schema_(schema) {
135+
arrowContext_(std::make_shared<ArrowContext>()) {
137136
arrowContext_->properties = getArrowParquetWriterOptions(options);
137+
arrowContext_->schema = schema;
138+
139+
if (arrowContext_->schema) {
140+
// If the input iterator is empty, the writer will do nothing and build a
141+
// empty file. We should at least write the parquet magic header so the
142+
// reader can regonize it is a valid parquet file. So, we initialize the
143+
// writer at first even there is no data.
144+
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
145+
PARQUET_ASSIGN_OR_THROW(
146+
arrowContext_->writer,
147+
::parquet::arrow::FileWriter::Open(
148+
*arrowContext_->schema.get(),
149+
arrow::default_memory_pool(),
150+
stream_,
151+
arrowContext_->properties,
152+
arrowProperties));
153+
}
138154
}
139155

140156
Writer::Writer(
@@ -196,20 +212,23 @@ void Writer::flush() {
196212
*/
197213
void Writer::write(const VectorPtr& data) {
198214
ArrowArray array;
199-
ArrowSchema schema;
200215
exportToArrow(data, array, generalPool_.get());
201-
exportToArrow(data, schema);
202216
std::shared_ptr<arrow::RecordBatch> recordBatch;
203-
if (schema_) {
217+
if (arrowContext_->schema) {
204218
PARQUET_ASSIGN_OR_THROW(
205-
recordBatch, arrow::ImportRecordBatch(&array, schema_));
219+
recordBatch, arrow::ImportRecordBatch(&array, arrowContext_->schema));
206220
} else {
221+
ArrowSchema schema;
222+
exportToArrow(data, schema);
207223
PARQUET_ASSIGN_OR_THROW(
208224
recordBatch, arrow::ImportRecordBatch(&array, &schema));
209225
}
210226

211-
if (!arrowContext_->schema) {
212-
arrowContext_->schema = recordBatch->schema();
227+
if (arrowContext_->stagingChunks.empty()) {
228+
if (!arrowContext_->schema) {
229+
arrowContext_->schema = recordBatch->schema();
230+
}
231+
213232
for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields();
214233
colIdx++) {
215234
arrowContext_->stagingChunks.push_back(

velox/dwio/parquet/writer/Writer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ class Writer : public dwio::common::Writer {
9595
std::shared_ptr<ArrowDataBufferSink> stream_;
9696

9797
std::shared_ptr<ArrowContext> arrowContext_;
98-
99-
std::shared_ptr<arrow::Schema> schema_;
10098
};
10199

102100
class ParquetWriterFactory : public dwio::common::WriterFactory {

0 commit comments

Comments
 (0)