Skip to content

Commit 8dbbaea

Browse files
c1lyjmacdutpilla
authored
[otap-dataflow]basic filter processor logs followup (#1341)
Add a basic filter processor that is based on the golang version minus the OTTL support Update to #1279 - fixed edge case where optional record batches are not present - fixed edge case where logs with no resource attribute/record attributes are not being filtered out - fixed string column not guaranteed to be a string array when downcast to apply regex filter - utilized booleanbuilder to optimize building the id filter Add support for log filtering Defined to following rust structs that will allow us to filter on - resource attributes - log record attributes - log record bodies - log record severity text - log record severity number (optional) ```rust /// struct that describes the overall requirements to use in order to filter logs #[derive(Debug, Clone, Deserialize)] pub struct LogFilter { // Include match properties describe logs that should be included in the Collector Service pipeline, // all other logs should be dropped from further processing. // If both Include and Exclude are specified, Include filtering occurs first. include: LogMatchProperties, // Exclude match properties describe logs that should be excluded from the Collector Service pipeline, // all other logs should be included. // If both Include and Exclude are specified, Include filtering occurs first. exclude: LogMatchProperties, // LogConditions is a list of OTTL conditions for an ottllog context. // If any condition resolves to true, the log event will be dropped. // Supports `and`, `or`, and `()` #[allow(dead_code)] log_record: Vec<String>, } /// LogMatchProperties specifies the set of properties in a log to match against and the type of string pattern matching to use. #[derive(Debug, Clone, Deserialize)] pub struct LogMatchProperties { // LogMatchType specifies the type of matching desired match_type: LogMatchType, // ResourceAttributes defines a list of possible resource attributes to match logs against. // A match occurs if any resource attribute matches all expressions in this given list. resource_attributes: Vec<KeyValue>, // RecordAttributes defines a list of possible record attributes to match logs against. // A match occurs if any record attribute matches at least one expression in this given list. record_attributes: Vec<KeyValue>, // SeverityTexts is a list of strings that the LogRecord's severity text field must match // against. severity_texts: Vec<String>, // SeverityNumberProperties defines how to match against a log record's SeverityNumber, if defined. severity_number: Option<LogSeverityNumberMatchProperties>, // LogBodies is a list of values that the LogRecord's body field must match // against. bodies: Vec<AnyValue>, } /// LogSeverityNumberMatchProperties specifies the requirements needed to match on the log severity field #[derive(Debug, Clone, Deserialize)] pub struct LogSeverityNumberMatchProperties { // Min is the minimum severity needed for the log record to match. // This corresponds to the short names specified here: // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity // this field is case-insensitive ("INFO" == "info") min: i32, // MatchUndefined lets logs records with "unknown" severity match. // If MinSeverity is not set, this field is ignored, as fields are not matched based on severity. match_undefined: bool, } ``` Future PR - add trace and metric support Part of #1213 --------- Co-authored-by: Joshua MacDonald <[email protected]> Co-authored-by: Utkarsh Umesan Pillai <[email protected]>
1 parent 7da17f2 commit 8dbbaea

File tree

11 files changed

+2014
-13
lines changed

11 files changed

+2014
-13
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
settings:
2+
default_pipeline_ctrl_msg_channel_size: 100
3+
default_node_ctrl_msg_channel_size: 100
4+
default_pdata_channel_size: 100
5+
6+
nodes:
7+
receiver:
8+
kind: receiver
9+
plugin_urn: "urn:otel:otap:fake_data_generator:receiver"
10+
out_ports:
11+
out_port:
12+
destinations:
13+
- filter
14+
dispatch_strategy: round_robin
15+
config:
16+
traffic_config:
17+
max_signal_count: 1000
18+
max_batch_size: 100
19+
signals_per_second: 100
20+
log_weight: 100
21+
registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
22+
filter:
23+
kind: processor
24+
plugin_urn: "urn:otel:filter:processor"
25+
out_ports:
26+
out_port:
27+
destinations:
28+
- debug
29+
dispatch_strategy: round_robin
30+
config:
31+
logs:
32+
exclude:
33+
match_type: strict
34+
resource_attributes:
35+
- key: fake_data_generator
36+
value: v0
37+
record_attributes:
38+
- key: fake_data_generator
39+
value: v0
40+
severity_texts:
41+
- WARN
42+
severity_number: null
43+
bodies:
44+
- body_text
45+
include:
46+
match_type: strict
47+
resource_attributes:
48+
- key: fake_data_generator
49+
value: v1
50+
record_attributes:
51+
- key: gen_ai.system
52+
value: openai
53+
severity_texts: []
54+
severity_number: null
55+
bodies: []
56+
log_record: []
57+
debug:
58+
kind: processor
59+
plugin_urn: "urn:otel:debug:processor"
60+
out_ports:
61+
out_port:
62+
destinations:
63+
- noop
64+
dispatch_strategy: round_robin
65+
config:
66+
verbosity: detailed
67+
mode: signal
68+
noop:
69+
kind: exporter
70+
plugin_urn: "urn:otel:noop:exporter"
71+
config:

rust/otap-dataflow/crates/otap/src/fake_data_generator/fake_signal.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::fake_data_generator::fake_data::{
1111
current_time, delay, gen_span_id, gen_trace_id, get_scope_name, get_scope_version,
1212
};
1313
use otel_arrow_rust::proto::opentelemetry::{
14-
common::v1::{AnyValue, InstrumentationScope},
14+
common::v1::{AnyValue, InstrumentationScope, KeyValue},
1515
logs::v1::{LogRecord, LogsData, ResourceLogs, ScopeLogs, SeverityNumber},
1616
metrics::v1::{
1717
AggregationTemporality, Gauge, Histogram, HistogramDataPoint, Metric, MetricsData,
@@ -37,9 +37,12 @@ pub fn fake_otlp_traces(signal_count: usize, registry: &ResolvedRegistry) -> Tra
3737
];
3838

3939
let resources: Vec<ResourceSpans> = vec![
40-
ResourceSpans::build(Resource::default())
41-
.scope_spans(scopes)
42-
.finish(),
40+
ResourceSpans::build(Resource::build(vec![KeyValue::new(
41+
"fake_data_generator",
42+
AnyValue::new_string("v1"),
43+
)]))
44+
.scope_spans(scopes)
45+
.finish(),
4346
];
4447
TracesData::new(resources)
4548
}
@@ -58,9 +61,12 @@ pub fn fake_otlp_logs(signal_count: usize, registry: &ResolvedRegistry) -> LogsD
5861
];
5962

6063
let resources: Vec<ResourceLogs> = vec![
61-
ResourceLogs::build(Resource::default())
62-
.scope_logs(scopes)
63-
.finish(),
64+
ResourceLogs::build(Resource::build(vec![KeyValue::new(
65+
"fake_data_generator",
66+
AnyValue::new_string("v1"),
67+
)]))
68+
.scope_logs(scopes)
69+
.finish(),
6470
];
6571

6672
LogsData::new(resources)
@@ -80,9 +86,12 @@ pub fn fake_otlp_metrics(signal_count: usize, registry: &ResolvedRegistry) -> Me
8086
];
8187

8288
let resources: Vec<ResourceMetrics> = vec![
83-
ResourceMetrics::build(Resource::default())
84-
.scope_metrics(scopes)
85-
.finish(),
89+
ResourceMetrics::build(Resource::build(vec![KeyValue::new(
90+
"fake_data_generator",
91+
AnyValue::new_string("v1"),
92+
)]))
93+
.scope_metrics(scopes)
94+
.finish(),
8695
];
8796

8897
MetricsData::new(resources)

0 commit comments

Comments
 (0)