Skip to content

Commit 7c00bb1

Browse files
committed
Add support for _msearch, _bulk and _doc operations with minor improvements
Signed-off-by: Giuseppe Ognibene <[email protected]>
1 parent 2dd80df commit 7c00bb1

File tree

6 files changed

+297
-73
lines changed

6 files changed

+297
-73
lines changed

devdocs/features.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
| Kafka | All | All | produce, fetch | Yes | No | Might fail getting topic name for fetch requests in newer versions of kafka (where Fetch api version >= 13) |
1212
| JsonRPC | Go | All | - | Yes | No | N/A |
1313
| GraphQL | All but Go | All | All | Yes | No | N/A |
14-
| Elasticsearch | All but Go | 7.14+ | /_search | Yes | No | N/A |
14+
| Elasticsearch | All but Go | 7.14+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A |
1515
| AWS S3 | All but Go | | CreateBucket, DeleteBucket, PutObject, DeleteObject, ListBuckets, ListObjects, GetObject | Yes | No | N/A |
1616
| AWS SQS | All but Go | | All | Yes | No | N/A |

internal/test/integration/components/elasticsearch/main.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,85 @@ async def search():
6565
sys.exit(1)
6666
return {"status": "OK"}
6767

68+
@app.get("/msearch")
69+
async def msearch():
70+
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch"
71+
searches = [
72+
{},
73+
{
74+
"query": {
75+
"match": {
76+
"message": "this is a test"
77+
}
78+
}
79+
},
80+
{
81+
"index": "my-index-000002"
82+
},
83+
{
84+
"query": {
85+
"match_all": {}
86+
}
87+
}
88+
]
89+
try:
90+
response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS)
91+
92+
except Exception as e:
93+
print(json.dumps({"error": str(e)}))
94+
sys.exit(1)
95+
return {"status": "OK"}
96+
97+
98+
@app.get("/bulk")
99+
async def bulk():
100+
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk"
101+
actions=[
102+
{
103+
"index": {
104+
"_index": "test",
105+
"_id": "1"
106+
}
107+
},
108+
{
109+
"field1": "value1"
110+
},
111+
{
112+
"delete": {
113+
"_index": "test",
114+
"_id": "2"
115+
}
116+
},
117+
{
118+
"create": {
119+
"_index": "test",
120+
"_id": "3"
121+
}
122+
},
123+
{
124+
"field1": "value3"
125+
},
126+
{
127+
"update": {
128+
"_id": "1",
129+
"_index": "test"
130+
}
131+
},
132+
{
133+
"doc": {
134+
"field2": "value2"
135+
}
136+
}
137+
]
138+
try:
139+
response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS)
140+
141+
except Exception as e:
142+
print(json.dumps({"error": str(e)}))
143+
sys.exit(1)
144+
return {"status": "OK"}
145+
146+
68147
if __name__ == "__main__":
69148
print(f"Server running: port={8080} process_id={os.getpid()}")
70149
uvicorn.run(app, host="0.0.0.0", port=8080)

internal/test/integration/docker-compose-elasticsearch.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ services:
5050
OTEL_EBPF_LOG_CONFIG: "yaml"
5151
OTEL_EBPF_BPF_DEBUG: "TRUE"
5252
OTEL_EBPF_HOSTNAME: "beyla"
53-
OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s"
53+
OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "30s"
5454
OTEL_EBPF_PROCESSES_INTERVAL: "100ms"
5555
OTEL_EBPF_METRICS_FEATURES: "application"
5656
OTEL_EBPF_BPF_BUFFER_SIZE_HTTP: 1024

internal/test/integration/test_python_elasticsearchclient.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ func testPythonElasticsearch(t *testing.T) {
3030
// populate elasticsearch with a custom value
3131
populate(t, url)
3232
testElasticsearchSearch(t, comm, url, index)
33+
// populate is optional, the elasticsearch request will fail
34+
// but we will have the span
35+
testElasticsearchMsearch(t, comm, url)
36+
testElasticsearchBulk(t, comm, url)
37+
//testElasticsearchDoc(t, comm, index)
3338
}
3439

3540
func populate(t *testing.T, url string) {
@@ -38,7 +43,7 @@ func populate(t *testing.T, url string) {
3843
}
3944

4045
func testElasticsearchSearch(t *testing.T, comm, url, index string) {
41-
queryText := "{\"query\":{\"match\":{\"name\":\"OBI\"}}}"
46+
queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}"
4247
urlPath := "/search"
4348
ti.DoHTTPGet(t, url+urlPath, 200)
4449

@@ -48,8 +53,13 @@ func testElasticsearchSearch(t *testing.T, comm, url, index string) {
4853
func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index string) {
4954
params := neturl.Values{}
5055
params.Add("service", comm)
51-
operatioName := op + " " + index
52-
params.Add("operationName", operatioName)
56+
var operationName string
57+
if index != "" {
58+
operationName = op + " " + index
59+
} else {
60+
operationName = op
61+
}
62+
params.Add("operationName", operationName)
5363
fullJaegerURL := fmt.Sprintf("%s?%s", jaegerQueryURL, params.Encode())
5464

5565
test.Eventually(t, testTimeout, func(t require.TestingT) {
@@ -67,11 +77,11 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
6777
lastTrace := traces[len(traces)-1]
6878
span := lastTrace.Spans[0]
6979

70-
assert.Equal(t, operatioName, span.OperationName)
80+
assert.Contains(t, span.OperationName, operationName)
7181

7282
tag, found := jaeger.FindIn(span.Tags, "db.query.text")
7383
assert.True(t, found)
74-
assert.JSONEq(t, queryText, tag.Value.(string))
84+
assert.Equal(t, queryText, tag.Value.(string))
7585

7686
tag, found = jaeger.FindIn(span.Tags, "db.collection.name")
7787
assert.True(t, found)
@@ -90,3 +100,25 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
90100
assert.Empty(t, tag.Value)
91101
}, test.Interval(100*time.Millisecond))
92102
}
103+
104+
func testElasticsearchMsearch(t *testing.T, comm, url string) {
105+
queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]"
106+
urlPath := "/msearch"
107+
ti.DoHTTPGet(t, url+urlPath, 200)
108+
109+
assertElasticsearchOperation(t, comm, "msearch", queryText, "")
110+
}
111+
112+
func testElasticsearchBulk(t *testing.T, comm, url string) {
113+
queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]"
114+
urlPath := "/bulk"
115+
ti.DoHTTPGet(t, url+urlPath, 200)
116+
117+
assertElasticsearchOperation(t, comm, "bulk", queryText, "")
118+
}
119+
120+
func testElasticsearchDoc(t *testing.T, comm, index string) {
121+
queryText := "{\"name\": \"OBI\", \"description\": \"very cool\"}"
122+
123+
assertElasticsearchOperation(t, comm, "doc", queryText, index)
124+
}

pkg/ebpf/common/http/elasticsearch.go

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package ebpfcommon
55

66
import (
77
"bytes"
8-
"encoding/json"
98
"errors"
109
"fmt"
1110
"io"
@@ -20,19 +19,34 @@ import (
2019
type elasticsearchOperation struct {
2120
NodeName string
2221
DBQueryText string
23-
DBOperationName string
2422
DBCollectionName string
2523
}
2624

27-
const (
28-
pathSearch string = "_search"
29-
)
25+
var elasticsearchOperationMethods = map[string]map[string]struct{}{
26+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-search
27+
"search": {http.MethodPost: {}, http.MethodGet: {}},
28+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-msearch
29+
"msearch": {http.MethodPost: {}, http.MethodGet: {}},
30+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk
31+
"bulk": {http.MethodPost: {}, http.MethodPut: {}},
32+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-get
33+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-index
34+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete
35+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-exists
36+
"doc": {http.MethodGet: {}, http.MethodPost: {}, http.MethodPut: {}, http.MethodHead: {}, http.MethodDelete: {}},
37+
}
3038

3139
func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
3240
if !isElasticsearchResponse(resp) {
3341
return *baseSpan, false
3442
}
35-
if err := isSearchRequest(req); err != nil {
43+
44+
operationName := extractElasticsearchOperationName(req)
45+
if operationName == "" {
46+
return *baseSpan, false
47+
}
48+
49+
if err := isElasticsearchSupportedRequest(operationName, req.Method); err != nil {
3650
slog.Debug(err.Error())
3751
return *baseSpan, false
3852
}
@@ -42,19 +56,14 @@ func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Res
4256
slog.Debug("parse Elasticsearch request", "error", err)
4357
return *baseSpan, false
4458
}
45-
46-
if resp != nil {
47-
if v := resp.Header.Get("X-Found-Handling-Instance"); v != "" {
48-
op.NodeName = v
49-
}
50-
} else {
51-
op.NodeName = req.URL.Host
59+
if v := resp.Header.Get("X-Found-Handling-Instance"); v != "" {
60+
op.NodeName = v
5261
}
5362

5463
baseSpan.SubType = request.HTTPSubtypeElasticsearch
5564
baseSpan.Elasticsearch = &request.Elasticsearch{
5665
NodeName: op.NodeName,
57-
DBOperationName: op.DBOperationName,
66+
DBOperationName: operationName,
5867
DBCollectionName: op.DBCollectionName,
5968
DBQueryText: op.DBQueryText,
6069
}
@@ -67,42 +76,23 @@ func parseElasticsearchRequest(req *http.Request) (elasticsearchOperation, error
6776
if err != nil {
6877
return op, fmt.Errorf("failed to read Elasticsearch request body %w", err)
6978
}
70-
7179
req.Body = io.NopCloser(bytes.NewBuffer(reqB))
72-
if len(reqB) == 0 {
73-
op.DBQueryText = ""
74-
} else {
75-
dbQueryText, err := extractDBQueryText(reqB)
76-
if err != nil {
77-
return op, err
78-
}
79-
op.DBQueryText = dbQueryText
80-
}
81-
op.DBOperationName = extractOperationName(req)
82-
op.DBCollectionName = extractDBCollectionName(req)
80+
op.DBQueryText = string(reqB)
81+
op.DBCollectionName = extractElasticsearchDBCollectionName(req)
8382
return op, nil
8483
}
8584

86-
func extractDBQueryText(body []byte) (string, error) {
87-
var buf bytes.Buffer
88-
89-
if err := json.Compact(&buf, body); err != nil {
90-
return "", fmt.Errorf("invalid Elasticsearch JSON body: %w", err)
85+
func isElasticsearchSupportedRequest(operationName, methodName string) error {
86+
methods, exists := elasticsearchOperationMethods[operationName]
87+
if !exists {
88+
return errors.New("parse Elasticsearch request: unsupported endpoint")
9189
}
9290

93-
return buf.String(), nil
94-
}
95-
96-
func isSearchRequest(req *http.Request) error {
97-
// let's focus only on _search operation that has only GET and POST http methods
98-
if !strings.Contains(req.URL.Path, pathSearch) {
99-
return errors.New("parse Elasticsearch search request: unsupported endpoint")
100-
}
101-
102-
if req.Method != http.MethodGet && req.Method != http.MethodPost {
103-
return errors.New("parse Elasticsearch search request: unsupported method")
91+
_, supported := methods[methodName]
92+
if supported {
93+
return nil
10494
}
105-
return nil
95+
return fmt.Errorf("parse Elasticsearch %s request: unsupported method %s", operationName, methodName)
10696
}
10797

10898
// isElasticsearchResponse checks if X-Elastic-Product HTTP header is present.
@@ -114,26 +104,43 @@ func isElasticsearchResponse(resp *http.Response) bool {
114104
return headerValue == expectedValue
115105
}
116106

117-
// extractOperationName is a generic function used to extract the operation name
107+
// extractElasticsearchOperationName is a generic function used to extract the operation name
118108
// that is the endpoint identifier provided in the request
119-
func extractOperationName(req *http.Request) string {
109+
// we can have different operations where the name of the operation is found in
110+
// the last or second to last part of the url
111+
func extractElasticsearchOperationName(req *http.Request) string {
120112
path := strings.Trim(req.URL.Path, "/")
121113
if path == "" {
122114
return ""
123115
}
116+
124117
parts := strings.Split(path, "/")
125118
if len(parts) == 0 {
126119
return ""
127120
}
128-
name := parts[len(parts)-1]
129-
return strings.TrimPrefix(name, "_")
121+
122+
lastPart := parts[len(parts)-1]
123+
possibleOperationName := strings.TrimPrefix(lastPart, "_")
124+
125+
if _, found := elasticsearchOperationMethods[possibleOperationName]; found {
126+
return possibleOperationName
127+
}
128+
129+
if len(parts) >= 2 {
130+
secondLastPart := parts[len(parts)-2]
131+
possibleOperationName = strings.TrimPrefix(secondLastPart, "_")
132+
if _, found := elasticsearchOperationMethods[possibleOperationName]; found {
133+
return possibleOperationName
134+
}
135+
}
136+
return ""
130137
}
131138

132-
// extractDBCollectionName takes into account this rule from semconv
139+
// extractElasticsearchDBCollectionName takes into account this rule from semconv
133140
// The query may target multiple indices or data streams,
134141
// in which case it SHOULD be a comma separated list of those.
135142
// If the query doesn’t target a specific index, this field MUST NOT be set.
136-
func extractDBCollectionName(req *http.Request) string {
143+
func extractElasticsearchDBCollectionName(req *http.Request) string {
137144
path := strings.Trim(req.URL.Path, "/")
138145
if path == "" {
139146
return ""

0 commit comments

Comments
 (0)