Skip to content

Commit 82fdcc3

Browse files
authored
Update to match db semconv (#849)
1 parent 501db05 commit 82fdcc3

File tree

11 files changed

+216
-48
lines changed

11 files changed

+216
-48
lines changed

bpf/common/common.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ enum large_buf_action : u8 {
4848
};
4949

5050
enum {
51-
k_dns_max_len = 516,
51+
k_dns_max_len = 512, // must be a power of 2
5252
};
5353

5454
#define MAX_SPAN_NAME_LEN 64
@@ -246,17 +246,16 @@ typedef struct mongo_go_client_req {
246246

247247
typedef struct dns_req {
248248
u8 flags; // Must be first we use it to tell what kind of packet we have on the ring buffer
249-
u8 p_type;
250249
u8 dns_q;
251-
u8 _pad1[1];
250+
u8 _pad1[2];
252251
u32 len;
253252
connection_info_t conn;
254253
u16 id;
255254
u8 _pad2[2];
256255
tp_info_t tp;
257-
u64 ts;
258256
// we need this to filter traces from unsolicited processes that share the executable
259257
// with other instrumented processes
260258
pid_info pid;
261259
unsigned char buf[k_dns_max_len];
260+
u8 _pad3[4];
262261
} dns_req_t;

bpf/common/dns.h

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,36 @@ static __always_inline u8 is_dns(connection_info_t *conn) {
9797
return is_dns_port(conn->s_port) || is_dns_port(conn->d_port);
9898
}
9999

100+
static __always_inline void populate_dns_record(dns_req_t *req,
101+
const pid_connection_info_t *p_conn,
102+
const u16 orig_dport,
103+
const u32 size,
104+
const u8 qr,
105+
const u16 id,
106+
const conn_pid_t *conn_pid) {
107+
__builtin_memcpy(&req->conn, &p_conn->conn, sizeof(connection_info_t));
108+
109+
req->flags = EVENT_DNS_REQUEST;
110+
req->len = size;
111+
req->dns_q = qr;
112+
req->id = bpf_ntohs(id);
113+
req->tp.ts = bpf_ktime_get_ns();
114+
req->pid = conn_pid->p_info;
115+
116+
trace_key_t t_key = {0};
117+
trace_key_from_pid_tid_with_p_key(&t_key, &conn_pid->p_key, conn_pid->id);
118+
119+
const u8 found = find_trace_for_client_request_with_t_key(
120+
p_conn, orig_dport, &t_key, conn_pid->id, &req->tp);
121+
122+
bpf_dbg_printk("handle_dns: looking up client trace info, found %d", found);
123+
if (found) {
124+
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
125+
} else {
126+
init_new_trace(&req->tp);
127+
}
128+
}
129+
100130
static __always_inline u8 handle_dns(struct __sk_buff *skb,
101131
connection_info_t *conn,
102132
protocol_info_t *p_info) {
@@ -158,30 +188,51 @@ static __always_inline u8 handle_dns(struct __sk_buff *skb,
158188
dns_req_t *req = bpf_ringbuf_reserve(&events, sizeof(dns_req_t), 0);
159189

160190
if (req) {
161-
__builtin_memcpy(&req->conn, conn, sizeof(connection_info_t));
162-
163-
req->flags = EVENT_DNS_REQUEST;
164-
req->p_type = skb->pkt_type;
165-
req->len = skb->len;
166-
req->dns_q = qr;
167-
req->id = bpf_ntohs(hdr.id);
168-
req->ts = bpf_ktime_get_ns();
169-
req->tp.ts = bpf_ktime_get_ns();
170-
req->pid = conn_pid->p_info;
171-
172-
trace_key_t t_key = {0};
173-
trace_key_from_pid_tid_with_p_key(&t_key, &conn_pid->p_key, conn_pid->id);
174-
175-
const u8 found = find_trace_for_client_request_with_t_key(
176-
&p_conn, orig_dport, &t_key, conn_pid->id, &req->tp);
177-
178-
bpf_dbg_printk("handle_dns: looking up client trace info, found %d", found);
179-
if (found) {
180-
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
181-
} else {
182-
init_new_trace(&req->tp);
183-
}
184-
read_skb_bytes(skb, dns_off, req->buf, sizeof(req->buf));
191+
u32 len = skb->len - dns_off;
192+
bpf_clamp_umax(len, 512);
193+
populate_dns_record(req, &p_conn, orig_dport, len, qr, hdr.id, conn_pid);
194+
195+
read_skb_bytes(skb, dns_off, req->buf, len);
196+
bpf_d_printk("sending dns trace");
197+
bpf_ringbuf_submit(req, get_flags());
198+
}
199+
200+
return 1;
201+
}
202+
203+
return 0;
204+
}
205+
206+
static __always_inline u8 handle_dns_buf(const unsigned char *buf,
207+
const int size,
208+
pid_connection_info_t *p_conn,
209+
u16 orig_dport) {
210+
211+
if (size < sizeof(struct dnshdr)) {
212+
bpf_d_printk("dns packet too small");
213+
return 0;
214+
}
215+
216+
struct dnshdr hdr;
217+
bpf_probe_read_user(&hdr, sizeof(struct dnshdr), buf);
218+
219+
const u16 flags = bpf_ntohs(hdr.flags);
220+
const u8 qr = dns_qr(flags);
221+
222+
bpf_d_printk("QR type: %d", qr);
223+
224+
if (qr == k_dns_qr_query || qr == k_dns_qr_resp) {
225+
conn_pid_t *conn_pid = bpf_map_lookup_elem(&sock_pids, &p_conn->conn);
226+
if (!conn_pid) {
227+
bpf_d_printk("can't find connection info for dns call");
228+
return 0;
229+
}
230+
231+
dns_req_t *req = bpf_ringbuf_reserve(&events, sizeof(dns_req_t), 0);
232+
if (req) {
233+
populate_dns_record(req, p_conn, orig_dport, size, qr, hdr.id, conn_pid);
234+
235+
bpf_probe_read(req->buf, sizeof(req->buf), buf);
185236
bpf_d_printk("sending dns trace");
186237
bpf_ringbuf_submit(req, get_flags());
187238
}

bpf/common/trace_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ static __always_inline void trace_key_from_pid_tid(trace_key_t *t_key) {
5858
}
5959

6060
static __always_inline void
61-
trace_key_from_pid_tid_with_p_key(trace_key_t *t_key, pid_key_t *p_key, u64 id) {
61+
trace_key_from_pid_tid_with_p_key(trace_key_t *t_key, const pid_key_t *p_key, u64 id) {
6262
t_key->p_key = *p_key;
6363

6464
u64 extra_id = extra_runtime_id_with_task_id(id);

bpf/generictracer/k_tracer.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ int BPF_KPROBE(obi_kprobe_tcp_connect, struct sock *sk) {
210210
}
211211

212212
SEC("kprobe/udp_sendmsg")
213-
int BPF_KPROBE(obi_kprobe_udp_sendmsg, struct sock *sk) {
213+
int BPF_KPROBE(obi_kprobe_udp_sendmsg, struct sock *sk, struct msghdr *msg, size_t len) {
214214
(void)ctx;
215215

216216
u64 id = bpf_get_current_pid_tgid();
@@ -219,10 +219,31 @@ int BPF_KPROBE(obi_kprobe_udp_sendmsg, struct sock *sk) {
219219
return 0;
220220
}
221221

222-
bpf_dbg_printk("=== udp_sendmsg %llx sock %llx ===", id, sk);
222+
bpf_dbg_printk("=== udp_sendmsg %llx sock %llx len %d ===", id, sk, len);
223223

224224
store_sock_pid(sk);
225225

226+
send_args_t s_args = {.size = len};
227+
228+
if (parse_sock_info(sk, &s_args.p_conn.conn)) {
229+
u16 orig_dport = s_args.p_conn.conn.d_port;
230+
dbg_print_http_connection_info(&s_args.p_conn.conn);
231+
if (is_dns(&s_args.p_conn.conn)) {
232+
sort_connection_info(&s_args.p_conn.conn);
233+
s_args.p_conn.pid = pid_from_pid_tgid(id);
234+
s_args.orig_dport = orig_dport;
235+
236+
unsigned char *buf = iovec_memory();
237+
if (buf) {
238+
len = read_msghdr_buf(msg, buf, len);
239+
if (len) {
240+
bpf_dbg_printk("Got buffer with len %d", len);
241+
handle_dns_buf(buf, len, &s_args.p_conn, orig_dport);
242+
}
243+
}
244+
}
245+
}
246+
226247
return 0;
227248
}
228249

@@ -806,9 +827,33 @@ int BPF_KRETPROBE(obi_kretprobe_sock_recvmsg, int copied_len) {
806827
info.pid = pid_from_pid_tgid(id);
807828
setup_cp_support_conn_info(&info, false);
808829
setup_connection_to_pid_mapping(id, &info, orig_dport);
830+
831+
if (is_dns(&info.conn)) {
832+
sort_connection_info(&info.conn);
833+
834+
iovec_iter_ctx *iov_ctx = (iovec_iter_ctx *)&args->iovec_ctx;
835+
836+
if (!iov_ctx->iov && !iov_ctx->ubuf) {
837+
bpf_dbg_printk("iovec_ptr found in kprobe is NULL, ignoring this sock_recvmsg");
838+
839+
goto done;
840+
}
841+
842+
unsigned char *buf = iovec_memory();
843+
if (buf) {
844+
copied_len = read_iovec_ctx(iov_ctx, buf, copied_len);
845+
if (!copied_len) {
846+
bpf_dbg_printk("Not copied anything");
847+
} else {
848+
bpf_d_printk("Got potential dns buffer with len %d", copied_len);
849+
handle_dns_buf(buf, copied_len, &info, orig_dport);
850+
}
851+
}
852+
}
809853
}
810854
}
811855

856+
done:
812857
bpf_map_delete_elem(&active_recv_args, &id);
813858

814859
return 0;

internal/test/oats/sql/docker-compose-beyla-sql.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ services:
4646
OTEL_EBPF_BPF_DEBUG: "true"
4747
OTEL_EXPORTER_OTLP_ENDPOINT: "http://collector:4318"
4848
OTEL_EBPF_BPF_BUFFER_SIZE_POSTGRES: 512
49+
OTEL_EBPF_NAME_RESOLVER_SOURCES: "rdns"
4950
depends_on:
5051
testserver:
5152
condition: service_started

internal/test/oats/sql/yaml/oats_sql_other_langs.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ docker-compose:
33
files:
44
- ../docker-compose-beyla-sql.yml
55
input:
6-
- path: '/query'
6+
- path: "/query"
77

88
interval: 500ms
99
expected:
1010
traces:
1111
- traceql: '{ .db.operation.name = "SELECT" && .db.system.name = "postgresql"}'
1212
spans:
13-
- name: 'SELECT accounting.contacts'
13+
- name: "SELECT accounting.contacts"
1414
attributes:
1515
db.operation.name: SELECT
1616
db.collection.name: accounting.contacts
@@ -23,5 +23,5 @@ expected:
2323
value: "== 0"
2424
- promql: 'db_client_operation_duration_bucket{le="10", db_system_name="postgresql"}'
2525
value: "> 0"
26-
- promql: 'db_client_operation_duration_count{db_system_name="postgresql"}'
26+
- promql: 'db_client_operation_duration_count{db_system_name="postgresql", server_address="sqlserver"}'
2727
value: "> 0"

pkg/ebpf/common/dns_request_transform.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ func readDNSEventIntoSpan(parseCtx *EBPFParseContext, record *ringbuf.Record) (r
6464
Host: hostname,
6565
HostPort: hostPort,
6666
ContentLength: 0,
67-
RequestStart: int64(event.Ts),
68-
Start: int64(event.Ts),
69-
End: int64(event.Ts + 1),
67+
RequestStart: int64(event.Tp.Ts),
68+
Start: int64(event.Tp.Ts),
69+
End: int64(event.Tp.Ts + 1),
7070
TraceID: trace.TraceID(event.Tp.TraceId),
7171
SpanID: trace.SpanID(event.Tp.SpanId),
7272
ParentSpanID: trace.SpanID(event.Tp.ParentId),
@@ -122,7 +122,7 @@ func readDNSEventIntoSpan(parseCtx *EBPFParseContext, record *ringbuf.Record) (r
122122
if msg.Response {
123123
responseCode = uint16(msg.RCode)
124124
span.Status = int(responseCode)
125-
span.End = int64(event.Ts)
125+
span.End = int64(event.Tp.Ts)
126126
} else {
127127
return *span, true, nil // ignore until we get a response or never hear back
128128
}

pkg/export/attributes/attr_defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ func getDefinitions(
229229
map[attr.Name]Default{
230230
attr.MessagingSystem: true,
231231
attr.MessagingDestination: true,
232+
attr.ServerAddr: true,
232233
},
233234
extraGroupAttributes[GroupMessaging],
234235
)
@@ -277,6 +278,7 @@ func getDefinitions(
277278
DBClientDuration.Section: {
278279
SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes},
279280
Attributes: map[attr.Name]Default{
281+
attr.ServerAddr: true,
280282
attr.DBOperation: true,
281283
attr.DBSystemName: true,
282284
attr.ErrorType: true,

pkg/internal/netolly/flow/reverse_dns.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ func ReverseDNSProvider(cfg *ReverseDNS, input, output *msg.Queue[[]*ebpf.Record
9191
func checkEBPFReverseDNS(ctx context.Context, cfg *ReverseDNS) error {
9292
if cfg.Type == ReverseDNSEBPF {
9393
// overriding netLookupAddr by an eBPF-based alternative
94-
ipToHosts := store.NewInMemory()
94+
ipToHosts, err := store.NewInMemory(cfg.CacheLen)
95+
if err != nil {
96+
return fmt.Errorf("initializing eBPF-based reverse DNS cache: %w", err)
97+
}
9598
if err := xdp.StartDNSPacketInspector(ctx, ipToHosts); err != nil {
9699
return fmt.Errorf("starting eBPF-based reverse DNS: %w", err)
97100
}

pkg/internal/rdns/store/memory.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package store
55

66
import (
77
"sync"
8+
9+
"github.com/hashicorp/golang-lru/v2/simplelru"
810
)
911

1012
type DNSEntry struct {
@@ -18,26 +20,37 @@ type InMemory struct {
1820
access sync.RWMutex
1921
// key: IP address, values: hostname
2022
// TODO: address scenarios where different hostnames point to a same IP
21-
entries map[string][]string
23+
entries *simplelru.LRU[string, []string]
2224
}
2325

24-
func NewInMemory() *InMemory {
25-
return &InMemory{
26-
entries: map[string][]string{},
26+
func NewInMemory(cacheSize int) (*InMemory, error) {
27+
cache, err := simplelru.NewLRU[string, []string](cacheSize, nil)
28+
if err != nil {
29+
return nil, err
2730
}
31+
return &InMemory{
32+
entries: cache,
33+
}, nil
2834
}
2935

3036
func (im *InMemory) Store(entry *DNSEntry) {
3137
im.access.Lock()
3238
defer im.access.Unlock()
3339
for _, ip := range entry.IPs {
3440
// TODO: store IPv4 also with its IPv6 representation
35-
im.entries[ip] = []string{entry.HostName}
41+
im.entries.Add(ip, []string{entry.HostName})
3642
}
3743
}
3844

45+
func (im *InMemory) StorePair(ip, name string) {
46+
im.access.Lock()
47+
defer im.access.Unlock()
48+
im.entries.Add(ip, []string{name})
49+
}
50+
3951
func (im *InMemory) GetHostnames(ip string) ([]string, error) {
4052
im.access.RLock()
4153
defer im.access.RUnlock()
42-
return im.entries[ip], nil
54+
r, _ := im.entries.Get(ip)
55+
return r, nil
4356
}

0 commit comments

Comments
 (0)