Skip to content

Commit 4a87bc0

Browse files
jasondellalucepoiana
authored andcommitted
test(pkg/sdk/symbols/extract): build test suite for async extractor
Signed-off-by: Jason Dellaluce <[email protected]>
1 parent 9284cdd commit 4a87bc0

File tree

1 file changed

+202
-3
lines changed

1 file changed

+202
-3
lines changed

pkg/sdk/symbols/extract/async_test.go

Lines changed: 202 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,213 @@ limitations under the License.
1616

1717
package extract
1818

19-
import "testing"
19+
import (
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
"testing"
24+
"unsafe"
2025

21-
func TestGetMaxWorkers(t *testing.T) {
26+
"github.com/falcosecurity/plugin-sdk-go/pkg/cgo"
27+
"github.com/falcosecurity/plugin-sdk-go/pkg/sdk"
28+
)
29+
30+
type sampleAsyncExtract struct {
31+
sampleExtract
32+
counter uint64
33+
}
34+
35+
func (s *sampleAsyncExtract) Extract(req sdk.ExtractRequest, evt sdk.EventReader) error {
36+
req.SetValue(s.counter)
37+
s.counter++
38+
return nil
39+
}
40+
41+
func testAllocAsyncBatch() []_Ctype_async_extractor_info {
42+
return make([]_Ctype_async_extractor_info, asyncBatchSize)
43+
}
44+
45+
func testReleaseAsyncBatch(c []_Ctype_async_extractor_info) {}
46+
47+
func TestSetAsync(t *testing.T) {
48+
a := asyncContext{}
49+
if !a.Async() {
50+
t.Fatalf("Async returned %v but expected %v", false, true)
51+
}
52+
53+
a.SetAsync(false)
54+
if a.Async() {
55+
t.Fatalf("Async returned %v but expected %v", true, false)
56+
}
57+
58+
a.SetAsync(true)
59+
if !a.Async() {
60+
t.Fatalf("Async returned %v but expected %v", false, true)
61+
}
62+
}
63+
64+
func TestAsyncGetMaxWorkers(t *testing.T) {
65+
a := asyncContext{}
2266
expected := []int32{0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5}
2367
for i, ex := range expected {
24-
v := getMaxWorkers(i + 1)
68+
v := a.getMaxWorkers(i + 1)
2569
if v != ex {
2670
t.Fatalf("getMaxWorkers returned %d but expected %d", v, ex)
2771
}
2872
}
2973
}
74+
75+
func TestAsyncBatchIdxWorkerIdx(t *testing.T) {
76+
for maxWorkers := 1; maxWorkers < 100; maxWorkers++ {
77+
a := asyncContext{maxWorkers: int32(maxWorkers)}
78+
for i := int32(0); i < 10; i++ {
79+
if a.batchIdxToWorkerIdx(i) != i%a.maxWorkers {
80+
t.Fatalf("batchIdxToWorkerIdx returned %d but expected %d", a.batchIdxToWorkerIdx(i), i%a.maxWorkers)
81+
}
82+
}
83+
for i := int32(0); i < a.maxWorkers; i++ {
84+
expectedIdx := int32(i)
85+
for _, v := range a.workerIdxToBatchIdxs(i) {
86+
if v != expectedIdx {
87+
t.Fatalf("workerIdxToBatchIdxs returned %d but expected %d", v, expectedIdx)
88+
}
89+
expectedIdx += a.maxWorkers
90+
}
91+
}
92+
}
93+
}
94+
95+
func testWithMockPlugins(n int, f func([]cgo.Handle)) {
96+
plugins := make([]sampleAsyncExtract, n)
97+
handles := make([]cgo.Handle, n)
98+
for i := 0; i < n; i++ {
99+
handles[i] = cgo.NewHandle(&plugins[i])
100+
plugins[i].SetExtractRequests(sdk.NewExtractRequestPool())
101+
}
102+
f(handles)
103+
for i := 0; i < n; i++ {
104+
handles[i].Delete()
105+
plugins[i].ExtractRequests().Free()
106+
}
107+
}
108+
109+
// this simulates a C consumer as in extract.c
110+
func testSimulateAsyncRequest(t testing.TB, a *asyncContext, h cgo.Handle, r *_Ctype_ss_plugin_extract_field) {
111+
i := a.handleToBatchIdx(h)
112+
a.batch[i].s = unsafe.Pointer(h)
113+
a.batch[i].evt = nil
114+
a.batch[i].num_fields = 1
115+
a.batch[i].fields = r
116+
117+
atomic.StoreInt32((*int32)(&a.batch[i].lock), state_data_req)
118+
for atomic.LoadInt32((*int32)(&a.batch[i].lock)) != state_wait {
119+
// spin
120+
}
121+
if int32(a.batch[i].rc) != sdk.SSPluginSuccess {
122+
t.Fatalf("extraction failed with rc %v", int32(a.batch[i].rc))
123+
}
124+
}
125+
126+
func TestAsyncExtract(t *testing.T) {
127+
a := asyncContext{}
128+
workload := func(nPlugins, nExtractions int) {
129+
testWithMockPlugins(nPlugins, func(handles []cgo.Handle) {
130+
var wg sync.WaitGroup
131+
for _, h := range handles {
132+
wg.Add(1)
133+
go func(h cgo.Handle) {
134+
counter := uint64(0)
135+
field, freeField := allocSSPluginExtractField(1, sdk.FieldTypeUint64, "", "")
136+
defer freeField()
137+
138+
// note: StartAsync/StopAsync are not invoked concurrently
139+
// in the plugin framework, however we want to test them to
140+
// be thread-safe as they are designed
141+
a.StartAsync(h, testAllocAsyncBatch)
142+
for e := 0; e < nExtractions; e++ {
143+
testSimulateAsyncRequest(t, &a, h, field)
144+
value := **((**uint64)(unsafe.Pointer(&field.res[0])))
145+
if value != counter {
146+
panic(fmt.Sprintf("extracted %d but expected %d", value, counter))
147+
}
148+
counter++
149+
}
150+
a.StopAsync(h, testReleaseAsyncBatch)
151+
wg.Done()
152+
}(h)
153+
}
154+
wg.Wait()
155+
})
156+
}
157+
158+
// run with increasing number of concurrent consumers
159+
for i := 1; i <= cgo.MaxHandle; i *= 2 {
160+
// run with increasing number of extractions
161+
for j := 1; j < 10000; j *= 10 {
162+
workload(i, j)
163+
}
164+
}
165+
}
166+
167+
func TestStartStopAsync(t *testing.T) {
168+
nPlugins := cgo.MaxHandle
169+
testWithMockPlugins(nPlugins, func(handles []cgo.Handle) {
170+
// test unbalanced start/stop calls
171+
assertPanic(t, func() {
172+
a := asyncContext{}
173+
a.StopAsync(handles[0], testReleaseAsyncBatch)
174+
})
175+
assertPanic(t, func() {
176+
a := asyncContext{}
177+
a.StartAsync(handles[0], testAllocAsyncBatch)
178+
a.StopAsync(handles[0], testReleaseAsyncBatch)
179+
a.StopAsync(handles[0], testReleaseAsyncBatch)
180+
})
181+
182+
// test with bad start/stop-handle pair
183+
assertPanic(t, func() {
184+
a := asyncContext{}
185+
a.StartAsync(handles[0], testAllocAsyncBatch)
186+
a.StartAsync(handles[1], testAllocAsyncBatch)
187+
a.StopAsync(handles[0], testReleaseAsyncBatch)
188+
a.StopAsync(handles[0], testReleaseAsyncBatch)
189+
})
190+
191+
// test with inconsistent enabled values
192+
a := asyncContext{}
193+
enabled := true
194+
for i := 0; i < nPlugins; i++ {
195+
a.SetAsync(enabled)
196+
a.StartAsync(handles[i], testAllocAsyncBatch)
197+
enabled = !enabled
198+
}
199+
for i := 0; i < nPlugins; i++ {
200+
a.StopAsync(handles[i], testReleaseAsyncBatch)
201+
}
202+
203+
// test workload after already having started/stopped the same context
204+
var wg sync.WaitGroup
205+
for _, h := range handles {
206+
wg.Add(1)
207+
a.StartAsync(h, testAllocAsyncBatch)
208+
go func(h cgo.Handle) {
209+
counter := uint64(0)
210+
field, freeField := allocSSPluginExtractField(1, sdk.FieldTypeUint64, "", "")
211+
defer freeField()
212+
for e := 0; e < 1000; e++ {
213+
testSimulateAsyncRequest(t, &a, h, field)
214+
value := **((**uint64)(unsafe.Pointer(&field.res[0])))
215+
if value != counter {
216+
panic(fmt.Sprintf("extracted %d but expected %d", value, counter))
217+
}
218+
counter++
219+
}
220+
wg.Done()
221+
}(h)
222+
}
223+
wg.Wait()
224+
for _, h := range handles {
225+
a.StopAsync(h, testReleaseAsyncBatch)
226+
}
227+
})
228+
}

0 commit comments

Comments
 (0)