Skip to content

Commit 2f0bb21

Browse files
committed
Add support for _msearch, _bulk and _doc operations with minor improvements
Signed-off-by: Giuseppe Ognibene <[email protected]>
1 parent 2dd80df commit 2f0bb21

File tree

5 files changed

+296
-85
lines changed

5 files changed

+296
-85
lines changed

devdocs/features.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
| Kafka | All | All | produce, fetch | Yes | No | Might fail getting topic name for fetch requests in newer versions of kafka (where Fetch api version >= 13) |
1212
| JsonRPC | Go | All | - | Yes | No | N/A |
1313
| GraphQL | All but Go | All | All | Yes | No | N/A |
14-
| Elasticsearch | All but Go | 7.14+ | /_search | Yes | No | N/A |
14+
| Elasticsearch | All but Go | 7.14+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A |
1515
| AWS S3 | All but Go | | CreateBucket, DeleteBucket, PutObject, DeleteObject, ListBuckets, ListObjects, GetObject | Yes | No | N/A |
1616
| AWS SQS | All but Go | | All | Yes | No | N/A |

internal/test/integration/components/elasticsearch/main.py

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,8 @@ async def health():
3434
async def doc():
3535
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_doc/1"
3636

37-
query_body = {
38-
"name": "OBI",
39-
"description": "very cool"
40-
}
4137
try:
42-
response = requests.post(ELASTICSEARCH_URL, json=query_body, headers=HEADERS)
38+
response = requests.get(ELASTICSEARCH_URL, headers=HEADERS)
4339

4440
except Exception as e:
4541
print(json.dumps({"error": str(e)}))
@@ -65,6 +61,85 @@ async def search():
6561
sys.exit(1)
6662
return {"status": "OK"}
6763

64+
@app.get("/msearch")
65+
async def msearch():
66+
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch"
67+
searches = [
68+
{},
69+
{
70+
"query": {
71+
"match": {
72+
"message": "this is a test"
73+
}
74+
}
75+
},
76+
{
77+
"index": "my-index-000002"
78+
},
79+
{
80+
"query": {
81+
"match_all": {}
82+
}
83+
}
84+
]
85+
try:
86+
response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS)
87+
88+
except Exception as e:
89+
print(json.dumps({"error": str(e)}))
90+
sys.exit(1)
91+
return {"status": "OK"}
92+
93+
94+
@app.get("/bulk")
95+
async def bulk():
96+
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk"
97+
actions=[
98+
{
99+
"index": {
100+
"_index": "test",
101+
"_id": "1"
102+
}
103+
},
104+
{
105+
"field1": "value1"
106+
},
107+
{
108+
"delete": {
109+
"_index": "test",
110+
"_id": "2"
111+
}
112+
},
113+
{
114+
"create": {
115+
"_index": "test",
116+
"_id": "3"
117+
}
118+
},
119+
{
120+
"field1": "value3"
121+
},
122+
{
123+
"update": {
124+
"_id": "1",
125+
"_index": "test"
126+
}
127+
},
128+
{
129+
"doc": {
130+
"field2": "value2"
131+
}
132+
}
133+
]
134+
try:
135+
response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS)
136+
137+
except Exception as e:
138+
print(json.dumps({"error": str(e)}))
139+
sys.exit(1)
140+
return {"status": "OK"}
141+
142+
68143
if __name__ == "__main__":
69144
print(f"Server running: port={8080} process_id={os.getpid()}")
70145
uvicorn.run(app, host="0.0.0.0", port=8080)

internal/test/integration/test_python_elasticsearchclient.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,31 @@ func testPythonElasticsearch(t *testing.T) {
2727
index := "test_index"
2828

2929
waitForTestComponentsRoute(t, url, "/health")
30-
// populate elasticsearch with a custom value
31-
populate(t, url)
3230
testElasticsearchSearch(t, comm, url, index)
33-
}
34-
35-
func populate(t *testing.T, url string) {
36-
urlPath := "/doc"
37-
ti.DoHTTPGet(t, url+urlPath, 200)
31+
// populate the server is optional, the elasticsearch request will fail
32+
// but we will have the span
33+
testElasticsearchMsearch(t, comm, url)
34+
testElasticsearchBulk(t, comm, url)
35+
testElasticsearchDoc(t, comm, url, index)
3836
}
3937

4038
func testElasticsearchSearch(t *testing.T, comm, url, index string) {
41-
queryText := "{\"query\":{\"match\":{\"name\":\"OBI\"}}}"
39+
queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}"
4240
urlPath := "/search"
4341
ti.DoHTTPGet(t, url+urlPath, 200)
44-
4542
assertElasticsearchOperation(t, comm, "search", queryText, index)
4643
}
4744

4845
func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index string) {
4946
params := neturl.Values{}
5047
params.Add("service", comm)
51-
operatioName := op + " " + index
52-
params.Add("operationName", operatioName)
48+
var operationName string
49+
if index != "" {
50+
operationName = op + " " + index
51+
} else {
52+
operationName = op
53+
}
54+
params.Add("operationName", operationName)
5355
fullJaegerURL := fmt.Sprintf("%s?%s", jaegerQueryURL, params.Encode())
5456

5557
test.Eventually(t, testTimeout, func(t require.TestingT) {
@@ -67,11 +69,11 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
6769
lastTrace := traces[len(traces)-1]
6870
span := lastTrace.Spans[0]
6971

70-
assert.Equal(t, operatioName, span.OperationName)
72+
assert.Contains(t, span.OperationName, operationName)
7173

7274
tag, found := jaeger.FindIn(span.Tags, "db.query.text")
7375
assert.True(t, found)
74-
assert.JSONEq(t, queryText, tag.Value.(string))
76+
assert.Equal(t, queryText, tag.Value.(string))
7577

7678
tag, found = jaeger.FindIn(span.Tags, "db.collection.name")
7779
assert.True(t, found)
@@ -90,3 +92,24 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
9092
assert.Empty(t, tag.Value)
9193
}, test.Interval(100*time.Millisecond))
9294
}
95+
96+
func testElasticsearchMsearch(t *testing.T, comm, url string) {
97+
queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]"
98+
urlPath := "/msearch"
99+
ti.DoHTTPGet(t, url+urlPath, 200)
100+
assertElasticsearchOperation(t, comm, "msearch", queryText, "")
101+
}
102+
103+
func testElasticsearchBulk(t *testing.T, comm, url string) {
104+
queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]"
105+
urlPath := "/bulk"
106+
ti.DoHTTPGet(t, url+urlPath, 200)
107+
assertElasticsearchOperation(t, comm, "bulk", queryText, "")
108+
}
109+
110+
func testElasticsearchDoc(t *testing.T, comm, url, index string) {
111+
queryText := ""
112+
urlPath := "/doc"
113+
ti.DoHTTPGet(t, url+urlPath, 200)
114+
assertElasticsearchOperation(t, comm, "doc", queryText, index)
115+
}

pkg/ebpf/common/http/elasticsearch.go

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package ebpfcommon
55

66
import (
77
"bytes"
8-
"encoding/json"
98
"errors"
109
"fmt"
1110
"io"
@@ -20,19 +19,34 @@ import (
2019
type elasticsearchOperation struct {
2120
NodeName string
2221
DBQueryText string
23-
DBOperationName string
2422
DBCollectionName string
2523
}
2624

27-
const (
28-
pathSearch string = "_search"
29-
)
25+
var elasticsearchOperationMethods = map[string]map[string]struct{}{
26+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-search
27+
"search": {http.MethodPost: {}, http.MethodGet: {}},
28+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-msearch
29+
"msearch": {http.MethodPost: {}, http.MethodGet: {}},
30+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk
31+
"bulk": {http.MethodPost: {}, http.MethodPut: {}},
32+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-get
33+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-index
34+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete
35+
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-exists
36+
"doc": {http.MethodGet: {}, http.MethodPost: {}, http.MethodPut: {}, http.MethodHead: {}, http.MethodDelete: {}},
37+
}
3038

3139
func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
3240
if !isElasticsearchResponse(resp) {
3341
return *baseSpan, false
3442
}
35-
if err := isSearchRequest(req); err != nil {
43+
44+
operationName := extractElasticsearchOperationName(req)
45+
if operationName == "" {
46+
return *baseSpan, false
47+
}
48+
49+
if err := isElasticsearchSupportedRequest(operationName, req.Method); err != nil {
3650
slog.Debug(err.Error())
3751
return *baseSpan, false
3852
}
@@ -42,19 +56,14 @@ func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Res
4256
slog.Debug("parse Elasticsearch request", "error", err)
4357
return *baseSpan, false
4458
}
45-
46-
if resp != nil {
47-
if v := resp.Header.Get("X-Found-Handling-Instance"); v != "" {
48-
op.NodeName = v
49-
}
50-
} else {
51-
op.NodeName = req.URL.Host
59+
if v := resp.Header.Get("X-Found-Handling-Instance"); v != "" {
60+
op.NodeName = v
5261
}
5362

5463
baseSpan.SubType = request.HTTPSubtypeElasticsearch
5564
baseSpan.Elasticsearch = &request.Elasticsearch{
5665
NodeName: op.NodeName,
57-
DBOperationName: op.DBOperationName,
66+
DBOperationName: operationName,
5867
DBCollectionName: op.DBCollectionName,
5968
DBQueryText: op.DBQueryText,
6069
}
@@ -67,42 +76,23 @@ func parseElasticsearchRequest(req *http.Request) (elasticsearchOperation, error
6776
if err != nil {
6877
return op, fmt.Errorf("failed to read Elasticsearch request body %w", err)
6978
}
70-
7179
req.Body = io.NopCloser(bytes.NewBuffer(reqB))
72-
if len(reqB) == 0 {
73-
op.DBQueryText = ""
74-
} else {
75-
dbQueryText, err := extractDBQueryText(reqB)
76-
if err != nil {
77-
return op, err
78-
}
79-
op.DBQueryText = dbQueryText
80-
}
81-
op.DBOperationName = extractOperationName(req)
82-
op.DBCollectionName = extractDBCollectionName(req)
80+
op.DBQueryText = string(reqB)
81+
op.DBCollectionName = extractElasticsearchDBCollectionName(req)
8382
return op, nil
8483
}
8584

86-
func extractDBQueryText(body []byte) (string, error) {
87-
var buf bytes.Buffer
88-
89-
if err := json.Compact(&buf, body); err != nil {
90-
return "", fmt.Errorf("invalid Elasticsearch JSON body: %w", err)
85+
func isElasticsearchSupportedRequest(operationName, methodName string) error {
86+
methods, exists := elasticsearchOperationMethods[operationName]
87+
if !exists {
88+
return errors.New("parse Elasticsearch request: unsupported endpoint")
9189
}
9290

93-
return buf.String(), nil
94-
}
95-
96-
func isSearchRequest(req *http.Request) error {
97-
// let's focus only on _search operation that has only GET and POST http methods
98-
if !strings.Contains(req.URL.Path, pathSearch) {
99-
return errors.New("parse Elasticsearch search request: unsupported endpoint")
100-
}
101-
102-
if req.Method != http.MethodGet && req.Method != http.MethodPost {
103-
return errors.New("parse Elasticsearch search request: unsupported method")
91+
_, supported := methods[methodName]
92+
if supported {
93+
return nil
10494
}
105-
return nil
95+
return fmt.Errorf("parse Elasticsearch %s request: unsupported method %s", operationName, methodName)
10696
}
10797

10898
// isElasticsearchResponse checks if X-Elastic-Product HTTP header is present.
@@ -114,26 +104,43 @@ func isElasticsearchResponse(resp *http.Response) bool {
114104
return headerValue == expectedValue
115105
}
116106

117-
// extractOperationName is a generic function used to extract the operation name
107+
// extractElasticsearchOperationName is a generic function used to extract the operation name
118108
// that is the endpoint identifier provided in the request
119-
func extractOperationName(req *http.Request) string {
109+
// we can have different operations where the name of the operation is found in
110+
// the last or second to last part of the url
111+
func extractElasticsearchOperationName(req *http.Request) string {
120112
path := strings.Trim(req.URL.Path, "/")
121113
if path == "" {
122114
return ""
123115
}
116+
124117
parts := strings.Split(path, "/")
125118
if len(parts) == 0 {
126119
return ""
127120
}
128-
name := parts[len(parts)-1]
129-
return strings.TrimPrefix(name, "_")
121+
122+
lastPart := parts[len(parts)-1]
123+
possibleOperationName := strings.TrimPrefix(lastPart, "_")
124+
125+
if _, found := elasticsearchOperationMethods[possibleOperationName]; found {
126+
return possibleOperationName
127+
}
128+
129+
if len(parts) >= 2 {
130+
secondLastPart := parts[len(parts)-2]
131+
possibleOperationName = strings.TrimPrefix(secondLastPart, "_")
132+
if _, found := elasticsearchOperationMethods[possibleOperationName]; found {
133+
return possibleOperationName
134+
}
135+
}
136+
return ""
130137
}
131138

132-
// extractDBCollectionName takes into account this rule from semconv
139+
// extractElasticsearchDBCollectionName takes into account this rule from semconv
133140
// The query may target multiple indices or data streams,
134141
// in which case it SHOULD be a comma separated list of those.
135142
// If the query doesn’t target a specific index, this field MUST NOT be set.
136-
func extractDBCollectionName(req *http.Request) string {
143+
func extractElasticsearchDBCollectionName(req *http.Request) string {
137144
path := strings.Trim(req.URL.Path, "/")
138145
if path == "" {
139146
return ""

0 commit comments

Comments
 (0)