Skip to content

Commit 8c997d2

Browse files
authored
bpf: implement large buffer action, improve mysql protocol inference (#318)
1 parent be7fc18 commit 8c997d2

File tree

12 files changed

+185
-144
lines changed

12 files changed

+185
-144
lines changed

bpf/common/common.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ enum {
4747
k_mysql_error_message_max_mask = k_mysql_error_message_max - 1
4848
};
4949

50+
enum large_buf_action : u8 {
51+
k_large_buf_action_init = 0,
52+
k_large_buf_action_append = 1,
53+
};
54+
5055
#define MAX_SPAN_NAME_LEN 64
5156
#define MAX_STATUS_DESCRIPTION_LEN 64
5257

@@ -147,7 +152,8 @@ typedef struct tcp_req {
147152
typedef struct tcp_large_buffer {
148153
u8 type; // Must be first
149154
u8 direction;
150-
u8 _pad[2];
155+
enum large_buf_action action;
156+
u8 _pad;
151157
u32 len;
152158
tp_info_t tp;
153159
u8 buf[];
@@ -223,4 +229,4 @@ typedef struct otel_span {
223229
pid_info pid;
224230
otel_attributes_t span_attrs;
225231
u8 _epad[6];
226-
} otel_span_t;
232+
} otel_span_t;

bpf/common/sql.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include <common/strings.h>
77

88
enum {
9-
k_max_query_offset = 4,
9+
k_max_query_offset = 8,
1010

1111
k_max_sql_op_len = 6, // Maximum length of SQL operation names (e.g., "SELECT")
1212
};

bpf/generictracer/k_tracer.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -956,7 +956,7 @@ int obi_handle_buf_with_args(void *ctx) {
956956
&args->packet_type,
957957
&args->protocol_type)) {
958958
bpf_dbg_printk("Found mysql connection");
959-
bpf_tail_call(ctx, &jump_table, k_tail_protocol_mysql);
959+
bpf_tail_call(ctx, &jump_table, k_tail_protocol_tcp);
960960
} else { // large request tracking and generic TCP
961961
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, &args->pid_conn);
962962

@@ -1013,4 +1013,4 @@ int obi_handle_buf_with_args(void *ctx) {
10131013
}
10141014

10151015
return 0;
1016-
}
1016+
}

bpf/generictracer/k_tracer_tailcall.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,5 @@ enum {
1717
k_tail_protocol_http2_grpc_frames = 3,
1818
k_tail_protocol_http2_grpc_handle_start_frame = 4,
1919
k_tail_protocol_http2_grpc_handle_end_frame = 5,
20-
k_tail_protocol_mysql = 6,
21-
k_tail_handle_buf_with_args = 7,
20+
k_tail_handle_buf_with_args = 6,
2221
};

bpf/generictracer/protocol_mysql.h

Lines changed: 50 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ enum {
5656
// Large buffer
5757
k_large_buf_max_size = 1 << 14, // 16K
5858
k_large_buf_max_size_mask = k_large_buf_max_size - 1,
59+
60+
// Sanity checks
61+
k_mysql_payload_length_max = 1 << 13, // 8K
5962
};
6063

6164
struct {
@@ -67,43 +70,50 @@ struct {
6770

6871
SCRATCH_MEM_SIZED(mysql_large_buffers, k_large_buf_max_size);
6972

73+
// This function is used to store the MySQL header if it comes in split packets
74+
// from double send.
75+
// Given the fact that we need to store this for the duration of the full request
76+
// (split in potentially multiple packets), we will **not** process or preserve
77+
// any actual payloads that are exactly 4 bytes long — they are intentionally
78+
// dropped in favor of state storage.
7079
static __always_inline int mysql_store_state_data(const connection_info_t *conn_info,
7180
const unsigned char *data,
7281
size_t data_len) {
7382
if (data_len != k_mysql_hdr_without_command_size) {
7483
return 0;
7584
}
7685

77-
struct mysql_state_data *state_data = bpf_map_lookup_elem(&mysql_state, conn_info);
78-
if (state_data == NULL) {
79-
// State data not found, treat this data as a header.
80-
struct mysql_state_data new_state_data = {};
81-
bpf_probe_read(&new_state_data, k_mysql_hdr_without_command_size, (const void *)data);
82-
bpf_map_update_elem(&mysql_state, conn_info, &new_state_data, BPF_ANY);
83-
return -1;
84-
}
86+
struct mysql_state_data new_state_data = {};
87+
bpf_probe_read(&new_state_data, k_mysql_hdr_without_command_size, (const void *)data);
88+
bpf_map_update_elem(&mysql_state, conn_info, &new_state_data, BPF_ANY);
8589

86-
// This is a payload.
87-
return 0;
90+
return -1;
8891
}
8992

9093
static __always_inline int mysql_parse_fixup_header(const connection_info_t *conn_info,
9194
struct mysql_hdr *hdr,
9295
const unsigned char *data,
9396
size_t data_len) {
97+
// Try to parse and validate the header first.
98+
bpf_probe_read(hdr, k_mysql_hdr_size, (const void *)data);
99+
if (mysql_payload_length(hdr->payload_length) ==
100+
(data_len - k_mysql_hdr_without_command_size)) {
101+
// Header is valid and we have the full data, we can proceed.
102+
hdr->hdr_arrived = false;
103+
return 0;
104+
}
105+
106+
// Prepend the header from state data.
94107
struct mysql_state_data *state_data = bpf_map_lookup_elem(&mysql_state, conn_info);
95108
if (state_data != NULL) {
96109
__builtin_memcpy(hdr, state_data, k_mysql_hdr_without_command_size);
97110
bpf_probe_read(&hdr->command_id, k_mysql_hdr_command_id_size, (const void *)data);
98111
hdr->hdr_arrived = true;
99-
} else {
100-
if (data_len < k_mysql_hdr_size) {
101-
bpf_dbg_printk("mysql_parse_fixup_header: data_len is too short: %d", data_len);
102-
return -1;
103-
}
104-
bpf_probe_read(hdr, k_mysql_hdr_size, (const void *)data);
112+
return 0;
105113
}
106-
return 0;
114+
115+
bpf_dbg_printk("mysql_parse_fixup_header: failed to parse mysql header");
116+
return -1;
107117
}
108118

109119
// This is an alternative version of mysql_parse_fixup_header that fills the buffer
@@ -140,43 +150,44 @@ static __always_inline int mysql_read_fixup_buffer(const connection_info_t *conn
140150
return *buf_len;
141151
}
142152

143-
static __always_inline void mysql_send_large_buffer(tcp_req_t *req,
144-
pid_connection_info_t *pid_conn,
145-
const void *u_buf,
146-
u32 bytes_len,
147-
u8 direction) {
153+
// Emit a large buffer event for MySQL protocol.
154+
// The return value is used to control the flow for this specific protocol.
155+
// -1: wait additional data; 0: continue, regardless of errors.
156+
static __always_inline int mysql_send_large_buffer(tcp_req_t *req,
157+
pid_connection_info_t *pid_conn,
158+
const void *u_buf,
159+
u32 bytes_len,
160+
u8 direction,
161+
enum large_buf_action action) {
148162
if (mysql_store_state_data(&pid_conn->conn, u_buf, bytes_len) < 0) {
149163
bpf_dbg_printk("mysql_send_large_buffer: 4 bytes packet, storing state data");
150-
return;
151-
}
152-
153-
if (bytes_len < (k_mysql_hdr_size + 1)) {
154-
bpf_dbg_printk("mysql_send_large_buffer: bytes_len is too short: %d", bytes_len);
155-
return;
164+
return -1;
156165
}
157166

158167
tcp_large_buffer_t *large_buf = (tcp_large_buffer_t *)mysql_large_buffers_mem();
159168
if (!large_buf) {
160169
bpf_dbg_printk("mysql_send_large_buffer: failed to reserve space for MySQL large buffer");
161-
return;
170+
return 0;
162171
}
163172

164173
large_buf->type = EVENT_TCP_LARGE_BUFFER;
165174
large_buf->direction = direction;
175+
large_buf->action = action;
166176
__builtin_memcpy((void *)&large_buf->tp, (void *)&req->tp, sizeof(tp_info_t));
167177

168178
int written =
169179
mysql_read_fixup_buffer(&pid_conn->conn, large_buf->buf, &large_buf->len, u_buf, bytes_len);
170180
if (written < 0) {
171181
bpf_dbg_printk("mysql_send_large_buffer: failed to read buffer, not sending large buffer");
172-
return;
182+
return 0;
173183
}
174184

175185
req->has_large_buffers = true;
176186
bpf_ringbuf_output(&events,
177187
large_buf,
178188
(sizeof(tcp_large_buffer_t) + written) & k_large_buf_max_size_mask,
179189
get_flags());
190+
return 0;
180191
}
181192

182193
static __always_inline u32 data_offset(struct mysql_hdr *hdr) {
@@ -188,32 +199,6 @@ static __always_inline u32 mysql_command_offset(struct mysql_hdr *hdr) {
188199
return data_offset(hdr) - k_mysql_hdr_command_id_size;
189200
}
190201

191-
// k_tail_protocol_mysql
192-
SEC("kprobe/mysql")
193-
int obi_protocol_mysql(void *ctx) {
194-
call_protocol_args_t *args = protocol_args();
195-
if (!args) {
196-
return 0;
197-
}
198-
199-
bpf_dbg_printk("=== tcp_mysql_event len=%d pid=%d ===",
200-
args->bytes_len,
201-
pid_from_pid_tgid(bpf_get_current_pid_tgid()));
202-
203-
if (mysql_store_state_data(
204-
&args->pid_conn.conn, (const unsigned char *)args->u_buf, args->bytes_len) < 0) {
205-
bpf_dbg_printk("mysql: 4 bytes packet, storing state data");
206-
return 0;
207-
}
208-
209-
// Tail call back into generic TCP handler.
210-
// Once the header is fixed up, we can use the generic TCP handling code
211-
// in order to reuse all the common logic.
212-
bpf_tail_call(ctx, &jump_table, k_tail_protocol_tcp);
213-
214-
return 0;
215-
}
216-
217202
static __always_inline u8 is_mysql(connection_info_t *conn_info,
218203
const unsigned char *data,
219204
u32 data_len,
@@ -224,19 +209,20 @@ static __always_inline u8 is_mysql(connection_info_t *conn_info,
224209
return 0;
225210
}
226211

227-
if (data_len < (k_mysql_hdr_size + 1)) {
228-
bpf_dbg_printk("is_mysql: data_len is too short: %d", data_len);
229-
return 0;
230-
}
231-
232212
struct mysql_hdr hdr = {};
233213
if (mysql_parse_fixup_header(conn_info, &hdr, data, data_len) != 0) {
234214
bpf_dbg_printk("is_mysql: failed to parse mysql header");
235215
return 0;
236216
}
217+
const u32 payload_len = mysql_payload_length(hdr.payload_length);
218+
219+
if (payload_len > k_mysql_payload_length_max) {
220+
bpf_dbg_printk("is_mysql: payload length is too large: %d", payload_len);
221+
return 0;
222+
}
237223

238224
bpf_dbg_printk("is_mysql: payload_length=%d sequence_id=%d command_id=%d",
239-
mysql_payload_length(hdr.payload_length),
225+
payload_len,
240226
hdr.sequence_id,
241227
hdr.command_id);
242228

@@ -274,8 +260,9 @@ static __always_inline u8 is_mysql(connection_info_t *conn_info,
274260
// NOTE: Trying to classify the connection based on this command
275261
// would be unreliable, as the check is too shallow.
276262
*packet_type = PACKET_TYPE_REQUEST;
263+
break;
277264
}
278-
break;
265+
return 0;
279266
default:
280267
if (*protocol_type == k_protocol_type_mysql) {
281268
// Check sequence ID and make sure we are processing a response.

bpf/generictracer/protocol_tcp.h

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,26 @@ static __always_inline void cleanup_tcp_trace_info_if_needed(pid_connection_info
9696
}
9797
}
9898

99-
static __always_inline void tcp_send_large_buffer(tcp_req_t *req,
100-
pid_connection_info_t *pid_conn,
101-
void *u_buf,
102-
int bytes_len,
103-
u8 direction,
104-
enum protocol_type protocol_type) {
99+
static __always_inline int tcp_send_large_buffer(tcp_req_t *req,
100+
pid_connection_info_t *pid_conn,
101+
void *u_buf,
102+
int bytes_len,
103+
u8 direction,
104+
enum protocol_type protocol_type,
105+
enum large_buf_action action) {
106+
int ret = 0;
107+
105108
switch (protocol_type) {
106109
case k_protocol_type_mysql:
107110
if (mysql_buffer_size > 0) {
108-
mysql_send_large_buffer(req, pid_conn, u_buf, bytes_len, direction);
111+
ret = mysql_send_large_buffer(req, pid_conn, u_buf, bytes_len, direction, action);
109112
}
110113
break;
111114
case k_protocol_type_unknown:
112115
break;
113116
}
117+
118+
return ret;
114119
}
115120

116121
static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t *pid_conn,
@@ -183,12 +188,22 @@ static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t
183188

184189
tcp_get_or_set_trace_info(req, pid_conn, ssl, orig_dport);
185190

186-
tcp_send_large_buffer(req, pid_conn, u_buf, bytes_len, direction, protocol_type);
191+
tcp_send_large_buffer(
192+
req, pid_conn, u_buf, bytes_len, direction, protocol_type, k_large_buf_action_init);
187193

188194
bpf_map_update_elem(&ongoing_tcp_req, pid_conn, req, BPF_ANY);
189195
}
190196
} else if (existing->direction != direction) {
191-
tcp_send_large_buffer(existing, pid_conn, u_buf, bytes_len, direction, protocol_type);
197+
if (tcp_send_large_buffer(existing,
198+
pid_conn,
199+
u_buf,
200+
bytes_len,
201+
direction,
202+
protocol_type,
203+
k_large_buf_action_init) < 0) {
204+
bpf_dbg_printk("handle_unknown_tcp_connection: waiting additional response data");
205+
return;
206+
}
192207

193208
if (existing->end_monotime_ns == 0) {
194209
bpf_clamp_umax(bytes_len, K_TCP_RES_LEN);
@@ -220,7 +235,14 @@ static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t
220235
existing->len += bytes_len;
221236
existing->req_len = existing->len;
222237
existing->protocol_type = protocol_type;
223-
tcp_send_large_buffer(existing, pid_conn, u_buf, bytes_len, direction, protocol_type);
238+
239+
tcp_send_large_buffer(existing,
240+
pid_conn,
241+
u_buf,
242+
bytes_len,
243+
direction,
244+
protocol_type,
245+
k_large_buf_action_append);
224246
} else {
225247
existing->req_len += bytes_len;
226248
}

pkg/components/ebpf/common/common.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ type MisclassifiedEvent struct {
118118
type EBPFParseContext struct {
119119
h2c *lru.Cache[uint64, h2Connection]
120120
redisDBCache *simplelru.LRU[BpfConnectionInfoT, int]
121-
largeBuffers *expirable.LRU[largeBufferKey, largeBuffer]
121+
largeBuffers *expirable.LRU[largeBufferKey, *largeBuffer]
122122
mongoRequestCache *PendingMongoDBRequests
123123
}
124124

@@ -138,7 +138,7 @@ func ptlog() *slog.Logger { return slog.With("component", "ebpf.ProcessTracer")
138138
func NewEBPFParseContext(cfg *config.EBPFTracer) *EBPFParseContext {
139139
var redisDBCache *simplelru.LRU[BpfConnectionInfoT, int]
140140
h2c, _ := lru.New[uint64, h2Connection](1024 * 10)
141-
largeBuffers := expirable.NewLRU[largeBufferKey, largeBuffer](1024, nil, 5*time.Minute)
141+
largeBuffers := expirable.NewLRU[largeBufferKey, *largeBuffer](1024, nil, 5*time.Minute)
142142

143143
if cfg != nil && cfg.RedisDBCache.Enabled {
144144
var err error
@@ -189,7 +189,7 @@ func ReadBPFTraceAsSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, reco
189189
case EventTypeGoKafkaGo:
190190
return ReadGoKafkaGoRequestIntoSpan(record)
191191
case EventTypeTCPLargeBuffer:
192-
return setTCPLargeBuffer(parseCtx, record)
192+
return appendTCPLargeBuffer(parseCtx, record)
193193
case EventOTelSDKGo:
194194
return ReadGoOTelEventIntoSpan(record)
195195
}

pkg/components/ebpf/common/tcp_detect_transform.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ func ReadTCPRequestIntoSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer,
4141
responseBuffer = event.Rbuf[:l]
4242

4343
if event.HasLargeBuffers == 1 {
44-
if b, ok := getTCPLargeBuffer(parseCtx, event.Tp.TraceId, event.Tp.SpanId, 0); ok {
44+
if b, ok := extractTCPLargeBuffer(parseCtx, event.Tp.TraceId, event.Tp.SpanId, 0); ok {
4545
requestBuffer = b
4646
}
47-
if b, ok := getTCPLargeBuffer(parseCtx, event.Tp.TraceId, event.Tp.SpanId, 1); ok {
47+
if b, ok := extractTCPLargeBuffer(parseCtx, event.Tp.TraceId, event.Tp.SpanId, 1); ok {
4848
responseBuffer = b
4949
}
5050
}

0 commit comments

Comments
 (0)