Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ fns
foobarfoobarfoo
footgunning
Forcepoint
fpp
freelist
fuzzcheck
GC'ing
Expand Down Expand Up @@ -380,6 +381,7 @@ myvalue
Namazu
nats
ndjson
ndv
nearline
nextest
ngx
Expand Down
56 changes: 56 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]

# Codecs
codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"]
codecs-parquet = ["vector-lib/parquet"]
codecs-opentelemetry = ["vector-lib/opentelemetry"]
codecs-syslog = ["vector-lib/syslog"]

Expand Down Expand Up @@ -845,7 +846,7 @@ sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-
sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"]
sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"]
sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"]
sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"]
sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3", "codecs-parquet"]
sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
sinks-axiom = ["sinks-http"]
Expand Down
18 changes: 18 additions & 0 deletions changelog.d/parquet_encoder_aws_s3.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
The `aws_s3` sink now supports [Apache Parquet](https://parquet.apache.org/) encoding, enabling
Vector to write columnar Parquet files optimized for analytics workloads.

Parquet is a columnar storage format that provides efficient compression and encoding schemes,
making it ideal for long-term storage and query performance with tools like AWS Athena, Apache Spark,
and Presto. Users can now configure Parquet encoding with custom schemas defined directly in YAML
as a simple map of field names to data types.

Features include:
- Support for all common data types: strings (utf8), integers (int8-int64), unsigned integers,
floats (float32, float64), timestamps (second/millisecond/microsecond/nanosecond precision),
booleans, binary data, and decimals
- Configurable compression algorithms: snappy (default), gzip, zstd, lz4, brotli, or uncompressed

Each batch of events becomes one Parquet file in S3, with batch size controlled by the standard
`batch.max_events`, `batch.max_bytes`, and `batch.timeout_secs` settings.

authors: rorylshanks
2 changes: 2 additions & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "tests/bin/generate-avro-fixtures.rs"
[dependencies]
apache-avro = { version = "0.20.0", default-features = false }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"] }
parquet = { version = "56.2.0", default-features = false, features = ["arrow", "snap", "zstd", "lz4", "brotli", "flate2", "flate2-rust_backened"], optional = true }
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
Expand Down Expand Up @@ -61,5 +62,6 @@ vrl.workspace = true

[features]
arrow = []
parquet = ["dep:parquet", "arrow"]
opentelemetry = ["dep:opentelemetry-proto"]
syslog = ["dep:syslog_loose"]
4 changes: 2 additions & 2 deletions lib/codecs/src/encoding/format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub fn encode_events_to_arrow_ipc_stream(
}

/// Recursively makes a Field and all its nested fields nullable
fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
pub fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
let new_data_type = match field.data_type() {
DataType::List(inner_field) => DataType::List(Arc::new(make_field_nullable(inner_field))),
DataType::Struct(fields) => {
Expand All @@ -254,7 +254,7 @@ fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Fie
}

/// Builds an Arrow RecordBatch from events
fn build_record_batch(
pub fn build_record_batch(
schema: Arc<Schema>,
events: &[Event],
) -> Result<RecordBatch, ArrowEncodingError> {
Expand Down
10 changes: 10 additions & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ mod native;
mod native_json;
#[cfg(feature = "opentelemetry")]
mod otlp;
#[cfg(feature = "parquet")]
mod parquet;
mod protobuf;
mod raw_message;
#[cfg(any(feature = "arrow", feature = "parquet"))]
mod schema_definition;
mod text;

use std::fmt::Debug;
Expand All @@ -36,8 +40,14 @@ pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
#[cfg(feature = "opentelemetry")]
pub use otlp::{OtlpSerializer, OtlpSerializerConfig};
#[cfg(feature = "parquet")]
pub use parquet::{
ParquetCompression, ParquetEncodingError, ParquetSerializer, ParquetSerializerConfig,
};
pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
#[cfg(any(feature = "arrow", feature = "parquet"))]
pub use schema_definition::{SchemaDefinition, SchemaDefinitionError};
pub use text::{TextSerializer, TextSerializerConfig};
use vector_core::event::Event;

Expand Down
Loading
Loading