@@ -5,10 +5,12 @@ package discover
55
66import (
77 "context"
8+ "fmt"
89 "log/slog"
910 "os"
1011
1112 "github.com/cilium/ebpf/link"
13+ "github.com/cilium/ebpf/rlimit"
1214
1315 "go.opentelemetry.io/obi/pkg/app/request"
1416 "go.opentelemetry.io/obi/pkg/components/ebpf"
@@ -22,6 +24,7 @@ import (
2224 "go.opentelemetry.io/obi/pkg/obi"
2325 "go.opentelemetry.io/obi/pkg/pipe/msg"
2426 "go.opentelemetry.io/obi/pkg/pipe/swarm"
27+ "go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
2528)
2629
2730// TraceAttacher creates the available trace.Tracer implementations (Go HTTP tracer, GRPC tracer, Generic tracer...)
@@ -91,49 +94,36 @@ func (ta *TraceAttacher) attacherLoop(_ context.Context) (swarm.RunFunc, error)
9194 in := ta .InputInstrumentables .Subscribe ()
9295 return func (ctx context.Context ) {
9396 defer ta .OutputTracerEvents .Close ()
94-
95- for {
96- select {
97- case <- ctx .Done ():
98- ta .log .Debug ("context done. terminating process attacher" )
99- ta .close ()
100- return
101- case instrumentables , ok := <- in :
102- if ! ok {
103- ta .log .Debug ("input channel closed. terminating process attacher" )
104- ta .close ()
105- return
106- }
107- for _ , instr := range instrumentables {
108- ta .log .Debug ("Instrumentable" , "created" , instr .Type , "type" , instr .Obj .Type ,
109- "exec" , instr .Obj .FileInfo .CmdExePath , "pid" , instr .Obj .FileInfo .Pid )
110- switch instr .Type {
111- case EventCreated :
112- sdkInstrumented := false
113- if ta .sdkInjectionPossible (& instr .Obj ) {
114- if err := ta .sdkInjector .NewExecutable (& instr .Obj ); err == nil {
115- sdkInstrumented = true
116- }
97+ swarms .ForEachInput (ctx , in , ta .log .Debug , func (instrumentables []Event [ebpf.Instrumentable ]) {
98+ for _ , instr := range instrumentables {
99+ ta .log .Debug ("Instrumentable" , "created" , instr .Type , "type" , instr .Obj .Type ,
100+ "exec" , instr .Obj .FileInfo .CmdExePath , "pid" , instr .Obj .FileInfo .Pid )
101+ switch instr .Type {
102+ case EventCreated :
103+ sdkInstrumented := false
104+ if ta .sdkInjectionPossible (& instr .Obj ) {
105+ if err := ta .sdkInjector .NewExecutable (& instr .Obj ); err == nil {
106+ sdkInstrumented = true
117107 }
108+ }
118109
119- if ! sdkInstrumented {
120- ta .nodeInjector .NewExecutable (& instr .Obj )
110+ if ! sdkInstrumented {
111+ ta .nodeInjector .NewExecutable (& instr .Obj )
121112
122- ta .processInstances .Inc (instr .Obj .FileInfo .Ino )
123- if ok := ta .getTracer (& instr .Obj ); ok {
124- ta .OutputTracerEvents .Send (Event [* ebpf.Instrumentable ]{Type : EventCreated , Obj : & instr .Obj })
125- }
113+ ta .processInstances .Inc (instr .Obj .FileInfo .Ino )
114+ if ok := ta .getTracer (& instr .Obj ); ok {
115+ ta .OutputTracerEvents .Send (Event [* ebpf.Instrumentable ]{Type : EventCreated , Obj : & instr .Obj })
116+ }
126117
127- if instr .Obj .FileInfo .ELF != nil {
128- _ = instr .Obj .FileInfo .ELF .Close ()
129- }
118+ if instr .Obj .FileInfo .ELF != nil {
119+ _ = instr .Obj .FileInfo .ELF .Close ()
130120 }
131- case EventDeleted :
132- ta .notifyProcessDeletion (& instr .Obj )
133121 }
122+ case EventDeleted :
123+ ta .notifyProcessDeletion (& instr .Obj )
134124 }
135125 }
136- }
126+ })
137127 }, nil
138128}
139129
@@ -397,3 +387,10 @@ func (ta *TraceAttacher) notifyProcessDeletion(ie *ebpf.Instrumentable) {
397387func (ta * TraceAttacher ) sdkInjectionPossible (ie * ebpf.Instrumentable ) bool {
398388 return ta .sdkInjector .Enabled () && ie .Type == svc .InstrumentableJava
399389}
390+
391+ func (ta * TraceAttacher ) init () error {
392+ if err := rlimit .RemoveMemlock (); err != nil {
393+ return fmt .Errorf ("removing memory lock: %w" , err )
394+ }
395+ return nil
396+ }
0 commit comments