Skip to content

Commit 32a31c3

Browse files
authored
Max transaction time (#786)
1 parent 1263490 commit 32a31c3

File tree

13 files changed

+157
-26
lines changed

13 files changed

+157
-26
lines changed

bpf/common/trace_common.h

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ enum { k_bpf_traceparent_enabled = 1 };
3232
enum { k_bpf_traceparent_enabled = 0 };
3333
#endif
3434

35+
volatile const u64 max_transaction_time;
36+
3537
static __always_inline unsigned char *tp_char_buf() {
3638
int zero = 0;
3739
return bpf_map_lookup_elem(&tp_char_buf_mem, &zero);
@@ -109,8 +111,8 @@ static __always_inline unsigned char *bpf_strstr_tp_loop__legacy(unsigned char *
109111
return NULL;
110112
}
111113

112-
static __always_inline const tp_info_pid_t *
113-
find_nginx_parent_trace(const pid_connection_info_t *p_conn, u16 orig_dport) {
114+
static __always_inline tp_info_pid_t *find_nginx_parent_trace(const pid_connection_info_t *p_conn,
115+
u16 orig_dport) {
114116
connection_info_part_t client_part = {};
115117
populate_ephemeral_info(&client_part, &p_conn->conn, orig_dport, p_conn->pid, FD_CLIENT);
116118
fd_info_t *fd_info = fd_info_for_conn(&client_part);
@@ -127,8 +129,8 @@ find_nginx_parent_trace(const pid_connection_info_t *p_conn, u16 orig_dport) {
127129
return NULL;
128130
}
129131

130-
static __always_inline const tp_info_pid_t *
131-
find_nodejs_parent_trace(const pid_connection_info_t *p_conn, u16 orig_dport) {
132+
static __always_inline tp_info_pid_t *find_nodejs_parent_trace(const pid_connection_info_t *p_conn,
133+
u16 orig_dport) {
132134
connection_info_part_t client_part = {};
133135
populate_ephemeral_info(&client_part, &p_conn->conn, orig_dport, p_conn->pid, FD_CLIENT);
134136
fd_info_t *fd_info = fd_info_for_conn(&client_part);
@@ -161,12 +163,12 @@ find_nodejs_parent_trace(const pid_connection_info_t *p_conn, u16 orig_dport) {
161163
return trace_info_for_connection(conn, TRACE_TYPE_SERVER);
162164
}
163165

164-
static __always_inline const tp_info_pid_t *find_parent_process_trace(trace_key_t *t_key) {
166+
static __always_inline tp_info_pid_t *find_parent_process_trace(trace_key_t *t_key) {
165167
// Up to 5 levels of thread nesting allowed
166168
enum { k_max_depth = 5 };
167169

168170
for (u8 i = 0; i < k_max_depth; ++i) {
169-
const tp_info_pid_t *server_tp = bpf_map_lookup_elem(&server_traces, t_key);
171+
tp_info_pid_t *server_tp = bpf_map_lookup_elem(&server_traces, t_key);
170172

171173
if (server_tp) {
172174
bpf_dbg_printk("Found parent trace for pid=%d, ns=%lx, extra_id=%llx",
@@ -191,9 +193,9 @@ static __always_inline const tp_info_pid_t *find_parent_process_trace(trace_key_
191193
return NULL;
192194
}
193195

194-
static __always_inline const tp_info_pid_t *find_parent_trace(const pid_connection_info_t *p_conn,
195-
u16 orig_dport) {
196-
const tp_info_pid_t *node_tp = find_nodejs_parent_trace(p_conn, orig_dport);
196+
static __always_inline tp_info_pid_t *find_parent_trace(const pid_connection_info_t *p_conn,
197+
u16 orig_dport) {
198+
tp_info_pid_t *node_tp = find_nodejs_parent_trace(p_conn, orig_dport);
197199

198200
if (node_tp) {
199201
return node_tp;
@@ -208,13 +210,13 @@ static __always_inline const tp_info_pid_t *find_parent_trace(const pid_connecti
208210
t_key.p_key.ns,
209211
t_key.extra_id);
210212

211-
const tp_info_pid_t *nginx_parent = find_nginx_parent_trace(p_conn, orig_dport);
213+
tp_info_pid_t *nginx_parent = find_nginx_parent_trace(p_conn, orig_dport);
212214

213215
if (nginx_parent) {
214216
return nginx_parent;
215217
}
216218

217-
const tp_info_pid_t *proc_parent = find_parent_process_trace(&t_key);
219+
tp_info_pid_t *proc_parent = find_parent_process_trace(&t_key);
218220

219221
if (proc_parent) {
220222
return proc_parent;
@@ -382,13 +384,33 @@ static __always_inline u8 find_trace_for_server_request(connection_info_t *conn,
382384
return found_tp;
383385
}
384386

387+
static __always_inline u8 should_be_in_same_transaction(const tp_info_t *parent_tp,
388+
const tp_info_t *child_tp) {
389+
if (child_tp->ts < parent_tp->ts) {
390+
return 0;
391+
}
392+
393+
u64 diff = child_tp->ts - parent_tp->ts;
394+
395+
return diff < max_transaction_time;
396+
}
397+
385398
static __always_inline u8 find_trace_for_client_request(const pid_connection_info_t *p_conn,
386399
u16 orig_dport,
387400
tp_info_t *tp) {
388-
const tp_info_pid_t *server_tp = find_parent_trace(p_conn, orig_dport);
401+
tp_info_pid_t *server_tp = find_parent_trace(p_conn, orig_dport);
389402

390403
if (server_tp && server_tp->valid && valid_trace(server_tp->tp.trace_id)) {
391404
bpf_dbg_printk("Found existing server tp for client call");
405+
406+
if (!should_be_in_same_transaction(&server_tp->tp, tp)) {
407+
bpf_dbg_printk("Parent and child are too far apart, marking server trace as invalid");
408+
bpf_dbg_printk(
409+
"%lld >>> %lld (max: %lld)", tp->ts, server_tp->tp.ts, max_transaction_time);
410+
server_tp->valid = 0;
411+
return 0;
412+
}
413+
392414
__builtin_memcpy(tp->trace_id, server_tp->tp.trace_id, sizeof(tp->trace_id));
393415
__builtin_memcpy(tp->parent_id, server_tp->tp.span_id, sizeof(tp->parent_id));
394416
return 1;

bpf/logger/bpf_dbg.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ typedef struct log_info {
3131

3232
struct {
3333
__uint(type, BPF_MAP_TYPE_RINGBUF);
34-
__uint(max_entries, 1 << 12);
34+
__uint(max_entries, 1 << 15);
3535
__uint(pinning, OBI_PIN_INTERNAL);
3636
} debug_events SEC(".maps");
3737

pkg/config/ebpf_tracer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ type EBPFTracer struct {
106106

107107
// Configure data extraction/parsing based on protocol
108108
PayloadExtraction PayloadExtraction `yaml:"payload_extraction"`
109+
110+
// Maximum time allowed for two requests to be correlated as parent -> child
111+
// Some programs (e.g. load generators) keep on generating requests from the same thread in perpetuity,
112+
// which can generate very large traces. We want to mark the parent trace as invalid if this happens.
113+
MaxTransactionTime time.Duration `yaml:"max_transaction_time" env:"OTEL_EBPF_BPF_MAX_TRANSACTION_TIME"`
109114
}
110115

111116
// Per-protocol data buffer size in bytes.

pkg/internal/ebpf/generictracer/generictracer.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -178,47 +178,52 @@ func (p *Tracer) SetupTailCalls() {
178178
}
179179
}
180180

181-
func (p *Tracer) Constants() map[string]any {
181+
func GenericTracerConstants(cfg *obi.Config) map[string]any {
182182
m := make(map[string]any, 2)
183183

184-
m["wakeup_data_bytes"] = uint32(p.cfg.EBPF.WakeupLen) * uint32(unsafe.Sizeof(ebpfcommon.HTTPRequestTrace{}))
184+
m["wakeup_data_bytes"] = uint32(cfg.EBPF.WakeupLen) * uint32(unsafe.Sizeof(ebpfcommon.HTTPRequestTrace{}))
185185

186186
// The eBPF side does some basic filtering of events that do not belong to
187187
// processes which we monitor. We filter more accurately in the userspace, but
188188
// for performance reasons we enable the PID based filtering in eBPF.
189189
// This must match httpfltr.go, otherwise we get partial events in userspace.
190-
if p.cfg.Discovery.BPFPidFilterOff {
190+
if cfg.Discovery.BPFPidFilterOff {
191191
m["filter_pids"] = int32(0)
192192
} else {
193193
m["filter_pids"] = int32(1)
194194
}
195195

196-
if p.cfg.EBPF.TrackRequestHeaders ||
197-
p.cfg.EBPF.ContextPropagation != config.ContextPropagationDisabled {
196+
if cfg.EBPF.TrackRequestHeaders ||
197+
cfg.EBPF.ContextPropagation != config.ContextPropagationDisabled {
198198
m["capture_header_buffer"] = int32(1)
199199
} else {
200200
m["capture_header_buffer"] = int32(0)
201201
}
202202

203-
if p.cfg.EBPF.HighRequestVolume {
203+
if cfg.EBPF.HighRequestVolume {
204204
m["high_request_volume"] = uint32(1)
205205
} else {
206206
m["high_request_volume"] = uint32(0)
207207
}
208208

209-
if p.cfg.EBPF.DisableBlackBoxCP {
209+
if cfg.EBPF.DisableBlackBoxCP {
210210
m["disable_black_box_cp"] = uint32(1)
211211
} else {
212212
m["disable_black_box_cp"] = uint32(0)
213213
}
214214

215-
m["http_buffer_size"] = p.cfg.EBPF.BufferSizes.HTTP
216-
m["mysql_buffer_size"] = p.cfg.EBPF.BufferSizes.MySQL
217-
m["postgres_buffer_size"] = p.cfg.EBPF.BufferSizes.Postgres
215+
m["http_buffer_size"] = cfg.EBPF.BufferSizes.HTTP
216+
m["mysql_buffer_size"] = cfg.EBPF.BufferSizes.MySQL
217+
m["postgres_buffer_size"] = cfg.EBPF.BufferSizes.Postgres
218+
m["max_transaction_time"] = uint64(cfg.EBPF.MaxTransactionTime.Nanoseconds())
218219

219220
return m
220221
}
221222

223+
func (p *Tracer) Constants() map[string]any {
224+
return GenericTracerConstants(p.cfg)
225+
}
226+
222227
func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {}
223228

224229
func (p *Tracer) ProcessBinary(_ *exec.FileInfo) {}

pkg/internal/ebpf/gpuevent/gpuevent.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ func (p *Tracer) Constants() map[string]any {
121121
m["filter_pids"] = int32(1)
122122
}
123123

124+
m["max_transaction_time"] = uint64(p.cfg.EBPF.MaxTransactionTime.Nanoseconds())
125+
124126
return m
125127
}
126128

pkg/internal/ebpf/tctracer/tctracer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ func (p *Tracer) SetupTailCalls() {
7474
}
7575

7676
func (p *Tracer) Constants() map[string]any {
77-
return nil
77+
return map[string]any{
78+
"max_transaction_time": uint64(p.cfg.EBPF.MaxTransactionTime.Nanoseconds()),
79+
}
7880
}
7981

8082
func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {}

pkg/internal/ebpf/tpinjector/tpinjector.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (p *Tracer) SetupTailCalls() {
7070
}
7171

7272
func (p *Tracer) Constants() map[string]any {
73-
m := make(map[string]any, 1)
73+
m := make(map[string]any, 2)
7474

7575
// The eBPF side does some basic filtering of events that do not belong to
7676
// processes which we monitor. We filter more accurately in the userspace, but
@@ -82,6 +82,8 @@ func (p *Tracer) Constants() map[string]any {
8282
m["filter_pids"] = int32(1)
8383
}
8484

85+
m["max_transaction_time"] = uint64(p.cfg.EBPF.MaxTransactionTime.Nanoseconds())
86+
8587
return m
8688
}
8789

pkg/obi/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ var DefaultConfig = Config{
9999
},
100100
},
101101
},
102+
MaxTransactionTime: 5 * time.Minute,
102103
},
103104
NameResolver: &transform.NameResolverConfig{
104105
Sources: []string{"k8s"},

pkg/obi/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ discovery:
125125
BatchLength: 100,
126126
BatchTimeout: time.Second,
127127
HTTPRequestTimeout: 0,
128+
MaxTransactionTime: 5 * time.Minute,
128129
TCBackend: config.TCBackendAuto,
129130
ContextPropagationEnabled: false,
130131
ContextPropagation: config.ContextPropagationDisabled,

test/integration/components/pythonselfserver/main.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from flask import Flask, request, jsonify, Response
22
import requests
33
import ssl
4+
import time
45
from threading import Thread
56

67
# Create four Flask applications for different ports
@@ -48,6 +49,28 @@ def api2():
4849
def smoke():
4950
return Response(status=200)
5051

52+
@app3.route("/smoke1")
53+
def smoke1():
54+
return Response(status=200)
55+
56+
# API for the first application (Port 7771)
57+
@app3.route('/slow', methods=['GET'])
58+
def slow():
59+
try:
60+
# Forward all incoming headers to the internal HTTPS call
61+
headers = dict(request.headers)
62+
63+
time.sleep(2)
64+
65+
# Internal HTTPS call to the second API
66+
response = requests.get('http://localhost:7773/smoke1', headers=headers, verify=False)
67+
return jsonify({
68+
"message": "Internal call to smoke succeeded",
69+
}), response.status_code
70+
except requests.exceptions.RequestException as e:
71+
return jsonify({"error": str(e)}), 500
72+
73+
5174
# API for the first application (Port 7773)
5275
@app3.route('/api3', methods=['GET'])
5376
def api3():

0 commit comments

Comments
 (0)