Skip to content

Commit c457ff4

Browse files
authored
Merge pull request #119 from caibirdme/dev
feat: support ck level case insensitive
2 parents 82e9d24 + 5bcf975 commit c457ff4

File tree

8 files changed

+87
-18
lines changed

8 files changed

+87
-18
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ prost = { version = "0.13.1" }
5555
reqwest = { version = "0.12.7", features = ["json", "native-tls-vendored", "gzip"], default-features = false }
5656
reqwest-middleware = "0.3.3"
5757
serde = { version = "1.0.209", features = ["derive"] }
58-
serde_json = { version = "1.0.125" }
58+
serde_json = { version = "1.0.127" }
5959
serde_with = { version = "3.9.0", features = ["json"] }
6060
sqlbuilder = { path = "sqlbuilder" }
6161
thiserror = { version = "1.0.63" }

config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ log_source:
2020
attributes: ["quantity", "code.function"]
2121
replace_dash_to_dot: true
2222
default_log_level: debug
23+
level_case_sensitive: false
2324

2425
# quickwit:
2526
# domain: http://127.0.0.1:7280

sqlbuilder/src/visit.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,25 @@ impl IRVisitor for DefaultIRVisitor {
5757
cmp: Cmp::Equal(PlaceValue::String(p.value.to_string())),
5858
};
5959
}
60+
if matches!(p.label.to_lowercase().as_str(), "level" | "severitytext") {
61+
return Condition {
62+
column: Column::Level,
63+
cmp: match p.op {
64+
Operator::NotEqual => {
65+
Cmp::NotEqual(PlaceValue::String(p.value.to_string()))
66+
}
67+
Operator::RegexMatch => {
68+
Cmp::RegexMatch(p.value.to_string())
69+
}
70+
Operator::RegexNotMatch => {
71+
Cmp::RegexNotMatch(p.value.to_string())
72+
}
73+
Operator::Equal => {
74+
Cmp::Equal(PlaceValue::String(p.value.to_string()))
75+
}
76+
},
77+
};
78+
}
6079
Condition {
6180
column: maybe_nested_key(&p.label),
6281
cmp: match p.op {

src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ pub struct ClickhouseLog {
9090
pub replace_dash_to_dot: Option<bool>,
9191
#[serde(default = "default_log_level")]
9292
pub default_log_level: String,
93+
pub level_case_sensitive: Option<bool>,
9394
}
9495

9596
fn default_log_level() -> String {
@@ -228,6 +229,7 @@ mod tests {
228229
},
229230
replace_dash_to_dot: None,
230231
default_log_level: "info".to_string(),
232+
level_case_sensitive: None,
231233
});
232234
assert_eq!(expect, actual);
233235
}
@@ -289,6 +291,7 @@ mod tests {
289291
},
290292
replace_dash_to_dot: Some(true),
291293
default_log_level: "debug".to_string(),
294+
level_case_sensitive: Some(false),
292295
};
293296
assert_eq!(
294297
cfg.log_source,

src/storage/ck/converter.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,31 @@ use sqlbuilder::builder::*;
66
pub struct CKLogConverter<T: TableSchema> {
77
table: T,
88
replace_dash_to_dot: bool,
9+
level_insenstive: bool,
910
}
1011

1112
impl<T: TableSchema> CKLogConverter<T> {
12-
pub fn new(table: T, replace_dash_to_dot: bool) -> Self {
13+
pub fn new(
14+
table: T,
15+
replace_dash_to_dot: bool,
16+
level_insenstive: bool,
17+
) -> Self {
1318
Self {
1419
table,
1520
replace_dash_to_dot,
21+
level_insenstive,
1622
}
1723
}
1824
}
1925

2026
impl<T: TableSchema> QueryConverter for CKLogConverter<T> {
2127
fn convert_condition(&self, c: &Condition) -> String {
28+
// special case for level
29+
if matches!(c.column, Column::Level) {
30+
if let Some(s) = self.convert_level(&c.cmp) {
31+
return s;
32+
}
33+
}
2234
let col_name = self.column_name(&c.column);
2335
match &c.cmp {
2436
Cmp::Equal(v) => format!("{} = {}", col_name, v),
@@ -62,6 +74,31 @@ impl<T: TableSchema> QueryConverter for CKLogConverter<T> {
6274
}
6375

6476
impl<T: TableSchema> CKLogConverter<T> {
77+
fn convert_level(&self, cmp: &Cmp) -> Option<String> {
78+
let insensitive = self.level_insenstive;
79+
let key = self.table.level_key();
80+
match cmp {
81+
Cmp::Equal(v) => {
82+
if insensitive {
83+
Some(format!("{} ILIKE {}", key, v))
84+
} else {
85+
Some(format!("{} = {}", key, v))
86+
}
87+
}
88+
Cmp::NotEqual(v) => {
89+
if insensitive {
90+
Some(format!("{} NOT ILIKE {}", key, v))
91+
} else {
92+
Some(format!("{} != {}", key, v))
93+
}
94+
}
95+
Cmp::RegexMatch(v) => Some(format!("match({}, '{}')", key, v)),
96+
Cmp::RegexNotMatch(v) => {
97+
Some(format!("NOT match({}, '{}')", key, v))
98+
}
99+
_ => None,
100+
}
101+
}
65102
fn column_name(&self, c: &Column) -> String {
66103
match c {
67104
Column::Message => self.table.msg_key().to_string(),

src/storage/ck/log.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use logql::parser::{LogQuery, MetricQuery};
88
use reqwest::Client;
99
use serde_json::Value as JSONValue;
1010
use sqlbuilder::{
11-
builder::{time_range_into_timing, QueryPlan, TableSchema},
11+
builder::{time_range_into_timing, QueryConverter, QueryPlan, TableSchema},
1212
visit::{DefaultIRVisitor, LogQLVisitor},
1313
};
1414
use std::{
@@ -48,6 +48,13 @@ impl CKLogQuerier {
4848
tx,
4949
}
5050
}
51+
fn new_converter(&self) -> CKLogConverter<LogTable> {
52+
CKLogConverter::new(
53+
self.schema.clone(),
54+
self.ck_cfg.replace_dash_to_dot.unwrap_or(false),
55+
!self.ck_cfg.level_case_sensitive.unwrap_or(false),
56+
)
57+
}
5158
}
5259

5360
#[async_trait]
@@ -57,12 +64,7 @@ impl LogStorage for CKLogQuerier {
5764
q: &LogQuery,
5865
opt: QueryLimits,
5966
) -> Result<Vec<LogItem>> {
60-
let sql = logql_to_sql(
61-
q,
62-
opt,
63-
&self.schema,
64-
self.ck_cfg.replace_dash_to_dot.unwrap_or(false),
65-
);
67+
let sql = logql_to_sql(q, opt, &self.schema, self.new_converter());
6668
let mut results = vec![];
6769
let rows =
6870
send_query(self.cli.clone(), self.ck_cfg.common.clone(), sql)
@@ -86,7 +88,12 @@ impl LogStorage for CKLogQuerier {
8688
q: &MetricQuery,
8789
opt: QueryLimits,
8890
) -> Result<Vec<MetricItem>> {
89-
let sql = new_from_metricquery(q, opt, self.schema.clone());
91+
let sql = new_from_metricquery(
92+
q,
93+
opt,
94+
self.schema.clone(),
95+
self.new_converter(),
96+
);
9097
let mut results = vec![];
9198
let rows =
9299
send_query(self.cli.clone(), self.ck_cfg.common.clone(), sql)
@@ -274,12 +281,13 @@ fn new_from_metricquery(
274281
q: &MetricQuery,
275282
limits: QueryLimits,
276283
schema: LogTable,
284+
converter: impl QueryConverter,
277285
) -> String {
278286
let v = LogQLVisitor::new(DefaultIRVisitor {});
279287
let selection = v.visit(&q.log_query);
280288
let step = limits.step.unwrap_or(DEFAULT_STEP);
281289
let qp = QueryPlan::new(
282-
CKLogConverter::new(schema.clone(), false),
290+
converter,
283291
schema.clone(),
284292
vec![
285293
to_start_interval(step).to_string(),
@@ -299,12 +307,12 @@ fn logql_to_sql(
299307
q: &LogQuery,
300308
limits: QueryLimits,
301309
schema: &LogTable,
302-
replace_dash: bool,
310+
converter: impl QueryConverter,
303311
) -> String {
304312
let v = LogQLVisitor::new(DefaultIRVisitor {});
305313
let selection = v.visit(q);
306314
let qp = QueryPlan::new(
307-
CKLogConverter::new(schema.clone(), replace_dash),
315+
converter,
308316
schema.clone(),
309317
schema.projection(),
310318
selection,
@@ -435,7 +443,7 @@ impl TableSchema for LogTable {
435443
self.table.as_str()
436444
}
437445
fn level_key(&self) -> &str {
438-
"SeverityNumber"
446+
"SeverityText"
439447
}
440448
fn trace_key(&self) -> &str {
441449
"TraceId"

src/storage/ck/trace.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ impl TraceStorage for CKTraceQuerier {
7373
return Ok(vec![]);
7474
}
7575
Expression::SpanSet(sp) => {
76-
let converter = CKLogConverter::new(self.schema.clone(), true);
76+
let converter =
77+
CKLogConverter::new(self.schema.clone(), true, true);
7778
let sql = single_spanset_query(
7879
sp,
7980
self.schema.clone(),
@@ -413,7 +414,7 @@ mod tests {
413414
for (name, tc) in cases {
414415
let expr = parse_traceql(&tc.input).unwrap();
415416
if let Expression::SpanSet(sp) = expr {
416-
let converter = CKLogConverter::new(schema.clone(), true);
417+
let converter = CKLogConverter::new(schema.clone(), true, true);
417418
let sql = single_spanset_query(
418419
&sp,
419420
schema.clone(),

0 commit comments

Comments
 (0)