Skip to content

Commit 12d08bd

Browse files
authored
bpf: classify postgres in ebpf, add large buffers and errors support (#369)
1 parent fd743b7 commit 12d08bd

23 files changed

+605
-47
lines changed

bpf/bpfcore/utils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,8 @@
5959
: "+r"(VAR) \
6060
: [max] "i"(UMAX))
6161

62+
static __always_inline bool is_pow2(u32 n) {
63+
return n != 0UL && (n & (n - 1)) == 0UL;
64+
}
65+
6266
#endif /* __UTILS_H__ */

bpf/common/common.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,7 @@
3939
#define HTTP_CONTENT_TYPE_MAX_LEN 16
4040

4141
volatile const u32 mysql_buffer_size = 0;
42-
43-
enum {
44-
k_mysql_query_max = 8192,
45-
k_mysql_query_max_mask = k_mysql_query_max - 1,
46-
k_mysql_error_message_max = 512,
47-
k_mysql_error_message_max_mask = k_mysql_error_message_max - 1
48-
};
42+
volatile const u32 postgres_buffer_size = 0;
4943

5044
enum large_buf_action : u8 {
5145
k_large_buf_action_init = 0,

bpf/common/connection_info.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ enum protocol_type : u8 {
1414
// in userspace.
1515
k_protocol_type_unknown = 0,
1616
k_protocol_type_mysql = 1,
17+
k_protocol_type_postgres = 2,
1718
};
1819

1920
// Struct to keep information on the connections in flight

bpf/generictracer/k_tracer.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <generictracer/protocol_http.h>
1919
#include <generictracer/protocol_http2.h>
2020
#include <generictracer/protocol_mysql.h>
21+
#include <generictracer/protocol_postgres.h>
2122
#include <generictracer/protocol_tcp.h>
2223
#include <generictracer/ssl_defs.h>
2324

@@ -953,10 +954,15 @@ int obi_handle_buf_with_args(void *ctx) {
953954
} else if (is_mysql(&args->pid_conn.conn,
954955
(const unsigned char *)args->u_buf,
955956
args->bytes_len,
956-
&args->packet_type,
957957
&args->protocol_type)) {
958958
bpf_dbg_printk("Found mysql connection");
959959
bpf_tail_call(ctx, &jump_table, k_tail_protocol_tcp);
960+
} else if (is_postgres(&args->pid_conn.conn,
961+
(const unsigned char *)args->u_buf,
962+
args->bytes_len,
963+
&args->protocol_type)) {
964+
bpf_dbg_printk("Found postgres connection");
965+
bpf_tail_call(ctx, &jump_table, k_tail_protocol_tcp);
960966
} else { // large request tracking and generic TCP
961967
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, &args->pid_conn);
962968

bpf/generictracer/protocol_mysql.h

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <bpfcore/vmlinux.h>
44
#include <bpfcore/bpf_helpers.h>
5+
#include <bpfcore/utils.h>
56

67
#include <common/common.h>
78
#include <common/connection_info.h>
@@ -54,8 +55,8 @@ enum {
5455
k_mysql_com_stmt_execute = 0x17,
5556

5657
// Large buffer
57-
k_large_buf_max_size = 1 << 14, // 16K
58-
k_large_buf_max_size_mask = k_large_buf_max_size - 1,
58+
k_mysql_large_buf_max_size = 1 << 14, // 16K
59+
k_mysql_large_buf_max_size_mask = k_mysql_large_buf_max_size - 1,
5960

6061
// Sanity checks
6162
k_mysql_payload_length_max = 1 << 13, // 8K
@@ -68,7 +69,7 @@ struct {
6869
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
6970
} mysql_state SEC(".maps");
7071

71-
SCRATCH_MEM_SIZED(mysql_large_buffers, k_large_buf_max_size);
72+
SCRATCH_MEM_SIZED(mysql_large_buffers, k_mysql_large_buf_max_size);
7273

7374
// This function is used to store the MySQL header if it comes in split packets
7475
// from double send.
@@ -124,8 +125,12 @@ static __always_inline int mysql_read_fixup_buffer(const connection_info_t *conn
124125
const unsigned char *data,
125126
u32 data_len) {
126127
u8 offset = 0;
127-
const u8 buf_len_mask =
128-
mysql_buffer_size - 1; // mysql_buffer_size is guaranteed to be a power of 2
128+
129+
if (!is_pow2(mysql_buffer_size)) {
130+
bpf_dbg_printk("mysql_read_fixup_buffer: bug: mysql_buffer_size is not a power of 2");
131+
return -1;
132+
}
133+
const u8 buf_len_mask = mysql_buffer_size - 1;
129134

130135
struct mysql_state_data *state_data = bpf_map_lookup_elem(&mysql_state, conn_info);
131136
if (state_data != NULL) {
@@ -185,7 +190,7 @@ static __always_inline int mysql_send_large_buffer(tcp_req_t *req,
185190
req->has_large_buffers = true;
186191
bpf_ringbuf_output(&events,
187192
large_buf,
188-
(sizeof(tcp_large_buffer_t) + written) & k_large_buf_max_size_mask,
193+
(sizeof(tcp_large_buffer_t) + written) & k_mysql_large_buf_max_size_mask,
189194
get_flags());
190195
return 0;
191196
}
@@ -202,8 +207,12 @@ static __always_inline u32 mysql_command_offset(struct mysql_hdr *hdr) {
202207
static __always_inline u8 is_mysql(connection_info_t *conn_info,
203208
const unsigned char *data,
204209
u32 data_len,
205-
u8 *packet_type,
206210
enum protocol_type *protocol_type) {
211+
if (*protocol_type != k_protocol_type_mysql && *protocol_type != k_protocol_type_unknown) {
212+
// Already classified, not mysql.
213+
return 0;
214+
}
215+
207216
if (mysql_store_state_data(conn_info, data, (size_t)data_len) < 0) {
208217
bpf_dbg_printk("is_mysql: 4 bytes packet, storing state data");
209218
return 0;
@@ -246,7 +255,6 @@ static __always_inline u8 is_mysql(connection_info_t *conn_info,
246255
"is_mysql: COM_QUERY or COM_PREPARE found, but buf doesn't contain a sql query");
247256
return 0;
248257
}
249-
*packet_type = PACKET_TYPE_REQUEST;
250258
break;
251259
case k_mysql_com_stmt_execute:
252260
// COM_STMT_EXECUTE packet structure:
@@ -259,7 +267,6 @@ static __always_inline u8 is_mysql(connection_info_t *conn_info,
259267
// Already identified, mark this as a request.
260268
// NOTE: Trying to classify the connection based on this command
261269
// would be unreliable, as the check is too shallow.
262-
*packet_type = PACKET_TYPE_REQUEST;
263270
break;
264271
}
265272
return 0;
@@ -270,7 +277,6 @@ static __always_inline u8 is_mysql(connection_info_t *conn_info,
270277
// If the request came in split packets, the sequence ID will be 2 (hdr->hdr_arrived == false) or 3 (hdr->hdr_arrived == true).
271278
bpf_dbg_printk("is_mysql: already identified as MySQL protocol");
272279
if ((hdr.sequence_id == 1 && !hdr.hdr_arrived) || hdr.sequence_id > 1) {
273-
*packet_type = PACKET_TYPE_RESPONSE;
274280
break;
275281
}
276282
bpf_dbg_printk(
@@ -285,6 +291,6 @@ static __always_inline u8 is_mysql(connection_info_t *conn_info,
285291
*protocol_type = k_protocol_type_mysql;
286292
bpf_map_update_elem(&protocol_cache, conn_info, protocol_type, BPF_ANY);
287293

288-
bpf_dbg_printk("is_mysql: mysql! command_id=%d packet_type=%d", hdr.command_id, *packet_type);
294+
bpf_dbg_printk("is_mysql: mysql! command_id=%d", hdr.command_id);
289295
return 1;
290296
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#pragma once
2+
3+
#include <bpfcore/vmlinux.h>
4+
#include <bpfcore/bpf_endian.h>
5+
#include <bpfcore/bpf_helpers.h>
6+
#include <bpfcore/utils.h>
7+
8+
#include <common/common.h>
9+
#include <common/connection_info.h>
10+
#include <common/http_types.h>
11+
#include <common/pin_internal.h>
12+
#include <common/ringbuf.h>
13+
#include <common/runtime.h>
14+
#include <common/scratch_mem.h>
15+
#include <common/sql.h>
16+
#include <common/tp_info.h>
17+
#include <common/trace_common.h>
18+
19+
#include <generictracer/protocol_common.h>
20+
#include <generictracer/k_tracer_tailcall.h>
21+
22+
#include <generictracer/maps/protocol_cache.h>
23+
24+
#include <maps/active_ssl_connections.h>
25+
26+
struct postgres_hdr {
27+
u32 message_len;
28+
u8 message_type;
29+
u8 _pad[3];
30+
};
31+
32+
enum {
33+
// Postgres header
34+
k_pg_hdr_size = 5,
35+
k_pg_messages_in_packet_max = 10,
36+
37+
// Postgres frontend message types
38+
k_pg_msg_bind = 'B', // Bind a named portal to a prepared statement
39+
k_pg_msg_execute = 'E', // Execute a portal
40+
k_pg_msg_parse = 'P', // Parses a query and creates a prepared statement
41+
k_pg_msg_query = 'Q', // Executes a simple SQL query
42+
43+
// Large buffer
44+
k_pg_large_buf_max_size = 1 << 14, // 16K
45+
k_pg_large_buf_max_size_mask = k_pg_large_buf_max_size - 1,
46+
};
47+
48+
SCRATCH_MEM_SIZED(postgres_large_buffers, k_pg_large_buf_max_size);
49+
50+
// Emit a large buffer event for Postgres protocol.
51+
// The return value is used to control the flow for this specific protocol.
52+
// -1: wait additional data; 0: continue, regardless of errors.
53+
static __always_inline int postgres_send_large_buffer(tcp_req_t *req,
54+
pid_connection_info_t *pid_conn,
55+
const void *u_buf,
56+
u32 bytes_len,
57+
u8 direction,
58+
enum large_buf_action action) {
59+
if (!is_pow2(postgres_buffer_size)) {
60+
bpf_dbg_printk("postgres_send_large_buffer: bug: postgres_buffer_size is not a power of 2");
61+
return -1;
62+
}
63+
const u8 buf_len_mask = postgres_buffer_size - 1;
64+
65+
tcp_large_buffer_t *large_buf = (tcp_large_buffer_t *)postgres_large_buffers_mem();
66+
if (!large_buf) {
67+
bpf_dbg_printk(
68+
"postgres_send_large_buffer: failed to reserve space for Postgres large buffer");
69+
return 0;
70+
}
71+
72+
large_buf->type = EVENT_TCP_LARGE_BUFFER;
73+
large_buf->direction = direction;
74+
large_buf->action = action;
75+
__builtin_memcpy((void *)&large_buf->tp, (void *)&req->tp, sizeof(tp_info_t));
76+
77+
large_buf->len = bytes_len;
78+
if (large_buf->len >= postgres_buffer_size) {
79+
large_buf->len = postgres_buffer_size;
80+
bpf_dbg_printk("WARN: postgres_send_large_buffer: buffer is full, truncating data");
81+
}
82+
bpf_probe_read(large_buf->buf, large_buf->len & buf_len_mask, u_buf);
83+
84+
req->has_large_buffers = true;
85+
bpf_ringbuf_output(&events,
86+
large_buf,
87+
(sizeof(tcp_large_buffer_t) + large_buf->len) & k_pg_large_buf_max_size_mask,
88+
get_flags());
89+
return 0;
90+
}
91+
92+
static __always_inline struct postgres_hdr postgres_parse_hdr(const unsigned char *data) {
93+
struct postgres_hdr hdr = {};
94+
95+
u8 header[k_pg_hdr_size] = {};
96+
bpf_probe_read(header, k_pg_hdr_size, data);
97+
98+
u32 message_len_le;
99+
__builtin_memcpy(&message_len_le, header + 1, sizeof(message_len_le));
100+
101+
hdr.message_type = header[0];
102+
hdr.message_len = bpf_ntohl(message_len_le);
103+
104+
return hdr;
105+
}
106+
107+
static __always_inline u8 is_postgres(connection_info_t *conn_info,
108+
const unsigned char *data,
109+
u32 data_len,
110+
enum protocol_type *protocol_type) {
111+
if (*protocol_type != k_protocol_type_postgres && *protocol_type != k_protocol_type_unknown) {
112+
// Already classified, not postgres.
113+
return 0;
114+
}
115+
116+
if (data_len < k_pg_hdr_size) {
117+
bpf_dbg_printk("is_postgres: data_len is too short: %d", data_len);
118+
return 0;
119+
}
120+
121+
size_t message_size = 0;
122+
struct postgres_hdr hdr;
123+
bool includes_known_command = false;
124+
125+
for (u8 i = 0; i < k_pg_messages_in_packet_max; i++) {
126+
if (message_size + k_pg_hdr_size > data_len) {
127+
break;
128+
}
129+
130+
hdr = postgres_parse_hdr(data + message_size);
131+
132+
message_size += hdr.message_len + 1;
133+
if (hdr.message_len == 0) {
134+
break;
135+
}
136+
137+
switch (hdr.message_type) {
138+
case k_pg_msg_query:
139+
case k_pg_msg_parse:
140+
case k_pg_msg_bind:
141+
case k_pg_msg_execute:
142+
includes_known_command = true;
143+
break;
144+
default:
145+
bpf_dbg_printk("postgres_detect: unhandled message type 0x%x", hdr.message_type);
146+
return 0;
147+
}
148+
}
149+
150+
if (message_size != data_len) {
151+
bpf_dbg_printk("is_postgres: message length mismatch: message_size=%d data_len=%u",
152+
message_size,
153+
data_len);
154+
return 0;
155+
}
156+
157+
if (!includes_known_command) {
158+
bpf_dbg_printk("is_postgres: no known command found");
159+
return 0;
160+
}
161+
162+
*protocol_type = k_protocol_type_postgres;
163+
bpf_map_update_elem(&protocol_cache, conn_info, protocol_type, BPF_ANY);
164+
165+
bpf_dbg_printk("is_postgres: postgres! message_type=%u", hdr.message_type);
166+
return 1;
167+
}

bpf/generictracer/protocol_tcp.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include <generictracer/protocol_common.h>
1313
#include <generictracer/protocol_mysql.h>
14+
#include <generictracer/protocol_postgres.h>
1415

1516
#include <generictracer/maps/ongoing_tcp_req.h>
1617
#include <generictracer/maps/tcp_req_mem.h>
@@ -111,6 +112,11 @@ static __always_inline int tcp_send_large_buffer(tcp_req_t *req,
111112
ret = mysql_send_large_buffer(req, pid_conn, u_buf, bytes_len, direction, action);
112113
}
113114
break;
115+
case k_protocol_type_postgres:
116+
if (postgres_buffer_size > 0) {
117+
ret = postgres_send_large_buffer(req, pid_conn, u_buf, bytes_len, direction, action);
118+
}
119+
break;
114120
case k_protocol_type_unknown:
115121
break;
116122
}

pkg/app/request/span.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,16 +291,23 @@ func (s *Span) SQLErrorDescription() string {
291291
return ""
292292
}
293293

294+
var codeString string
295+
if s.SQLError.Code == 0 {
296+
codeString = "NA"
297+
} else {
298+
codeString = strconv.FormatUint(uint64(s.SQLError.Code), 10)
299+
}
300+
294301
if s.SQLCommand == "" {
295302
return fmt.Sprintf(
296-
"SQL Server errored: error_code=%d sql_state=%s message=%s",
297-
s.SQLError.Code, s.SQLError.SQLState, s.SQLError.Message,
303+
"SQL Server errored: error_code=%s sql_state=%s message=%s",
304+
codeString, s.SQLError.SQLState, s.SQLError.Message,
298305
)
299306
}
300307

301308
return fmt.Sprintf(
302-
"SQL Server errored for command 'COM_%s': error_code=%d sql_state=%s message=%s",
303-
s.SQLCommand, s.SQLError.Code, s.SQLError.SQLState, s.SQLError.Message,
309+
"SQL Server errored for command 'COM_%s': error_code=%s sql_state=%s message=%s",
310+
s.SQLCommand, codeString, s.SQLError.SQLState, s.SQLError.Message,
304311
)
305312
}
306313

pkg/components/ebpf/common/common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ const (
6363
const (
6464
ProtocolTypeUnknown uint8 = iota
6565
ProtocolTypeMySQL
66+
ProtocolTypePostgres
6667
)
6768

6869
var IntegrityModeOverride = false

pkg/components/ebpf/common/sql_detect_mysql.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,7 @@ func handleMySQL(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffe
100100
}
101101

102102
sqlCommand := sqlprune.SQLParseCommandID(request.DBMySQL, requestBuffer)
103-
if sqlCommand == "" {
104-
return span, errIgnore
105-
}
106-
107-
sqlError := sqlprune.SQLParseError(responseBuffer)
103+
sqlError := sqlprune.SQLParseError(request.DBMySQL, responseBuffer)
108104

109105
switch sqlCommand {
110106
case "STMT_PREPARE":

0 commit comments

Comments
 (0)