Skip to content

Commit 4aee72f

Browse files
committed
Add support for Elasticsearch client span
Signed-off-by: Giuseppe Ognibene <[email protected]>
1 parent f20e96e commit 4aee72f

File tree

8 files changed

+144
-52
lines changed

8 files changed

+144
-52
lines changed

internal/test/integration/components/elasticsearch/main.py

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastapi import FastAPI, HTTPException
1+
from fastapi import FastAPI, HTTPException, Request
22
import os
33
import uvicorn
44
import requests
@@ -8,34 +8,44 @@
88

99
app = FastAPI()
1010
HEADERS = {'Content-Type': 'application/json'}
11-
ELASTICSEARCH_HOST = "http://elasticsearchserver:9200"
1211

1312
@app.get("/health")
14-
async def health():
15-
HEALTH_URL = ELASTICSEARCH_HOST + "/_cluster/health"
16-
13+
async def health(request: Request):
14+
host_url = request.query_params.get("host_url")
15+
if host_url is None:
16+
raise HTTPException(
17+
status_code=400,
18+
detail="The 'host_url' query parameter is required."
19+
)
20+
server_url = host_url + "/_cluster/health"
1721
try:
18-
response = requests.get(HEALTH_URL, timeout=5)
22+
response = requests.get(server_url, timeout=5)
1923
response.raise_for_status()
2024
status = response.json().get("status", "red")
2125

2226
if status in ("red","yellow"):
2327
raise HTTPException(
2428
status_code=503,
25-
detail={"status": "red","message": "Elasticsearch cluster unhealthy"})
26-
return {"status": status, "message": "Elasticsearch cluster healthy"}
29+
detail={"status": "red","message": "Cluster unhealthy"})
30+
return {"status": status, "message": "Cluster healthy"}
2731

2832
except requests.RequestException as e:
2933
raise HTTPException(
3034
status_code=503,
31-
detail={"status": "error","message": f"Cannot reach Elasticsearch cluster: {str(e)}"})
35+
detail={"status": "error","message": f"Cannot reach Cluster: {str(e)}"})
3236

3337
@app.get("/doc")
34-
async def doc():
35-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_doc/1"
38+
async def doc(request: Request):
39+
host_url = request.query_params.get("host_url")
40+
if host_url is None:
41+
raise HTTPException(
42+
status_code=400,
43+
detail="The 'host_url' query parameter is required."
44+
)
45+
server_url = host_url + "/test_index/_doc/1"
3646

3747
try:
38-
response = requests.get(ELASTICSEARCH_URL, headers=HEADERS)
48+
response = requests.get(server_url, headers=HEADERS)
3949

4050
except Exception as e:
4151
print(json.dumps({"error": str(e)}))
@@ -44,8 +54,14 @@ async def doc():
4454

4555

4656
@app.get("/search")
47-
async def search():
48-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_search"
57+
async def search(request: Request):
58+
host_url = request.query_params.get("host_url")
59+
if host_url is None:
60+
raise HTTPException(
61+
status_code=400,
62+
detail="The 'host_url' query parameter is required."
63+
)
64+
server_url = host_url + "/test_index/_search"
4965
query_body = {
5066
"query": {
5167
"match": {
@@ -54,16 +70,22 @@ async def search():
5470
}
5571
}
5672
try:
57-
response = requests.post(ELASTICSEARCH_URL, json=query_body, headers=HEADERS)
73+
response = requests.post(server_url, json=query_body, headers=HEADERS)
5874

5975
except Exception as e:
6076
print(json.dumps({"error": str(e)}))
6177
sys.exit(1)
6278
return {"status": "OK"}
6379

6480
@app.get("/msearch")
65-
async def msearch():
66-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch"
81+
async def msearch(request: Request):
82+
host_url = request.query_params.get("host_url")
83+
if host_url is None:
84+
raise HTTPException(
85+
status_code=400,
86+
detail="The 'host_url' query parameter is required."
87+
)
88+
server_url = host_url + "/_msearch"
6789
searches = [
6890
{},
6991
{
@@ -83,7 +105,7 @@ async def msearch():
83105
}
84106
]
85107
try:
86-
response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS)
108+
response = requests.post(server_url, json=searches, headers=HEADERS)
87109

88110
except Exception as e:
89111
print(json.dumps({"error": str(e)}))
@@ -92,8 +114,14 @@ async def msearch():
92114

93115

94116
@app.get("/bulk")
95-
async def bulk():
96-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk"
117+
async def bulk(request: Request):
118+
host_url = request.query_params.get("host_url")
119+
if host_url is None:
120+
raise HTTPException(
121+
status_code=400,
122+
detail="The 'host_url' query parameter is required."
123+
)
124+
server_url = host_url + "/_bulk"
97125
actions=[
98126
{
99127
"index": {
@@ -132,7 +160,7 @@ async def bulk():
132160
}
133161
]
134162
try:
135-
response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS)
163+
response = requests.post(server_url, json=actions, headers=HEADERS)
136164

137165
except Exception as e:
138166
print(json.dumps({"error": str(e)}))

internal/test/integration/docker-compose-elasticsearch.yml

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,33 @@ services:
66
- "9200:9200"
77
environment:
88
- discovery.type=single-node
9-
- xpack.security.enabled=false # Only for local testing
9+
- xpack.security.enabled=false
10+
deploy:
11+
resources:
12+
limits:
13+
cpus: '1.5'
14+
memory: 2G
15+
reservations:
16+
cpus: '0.25'
17+
memory: 1G
18+
19+
opensearchserver:
20+
image: opensearchproject/opensearch:3.3.1@sha256:773b05b026c01e7d520f6b857f486b7312d9290e14f2aebe4630eb80787e9eb3
21+
container_name: opensearch-server
22+
ports:
23+
- "9201:9200"
24+
environment:
25+
- discovery.type=single-node
26+
- DISABLE_SECURITY_PLUGIN=true
27+
- DISABLE_INSTALL_DEMO_CONFIG=true
28+
deploy:
29+
resources:
30+
limits:
31+
cpus: '1.5'
32+
memory: 2G
33+
reservations:
34+
cpus: '0.25'
35+
memory: 1G
1036

1137
testserver:
1238
build:
@@ -20,6 +46,8 @@ services:
2046
condition: service_started
2147
elasticsearchserver:
2248
condition: service_started
49+
opensearchserver:
50+
condition: service_started
2351

2452
obi:
2553
build:

internal/test/integration/suites_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,12 @@ func TestSuite_PythonElasticsearch(t *testing.T) {
518518

519519
compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`)
520520
require.NoError(t, compose.Up())
521-
t.Run("Python Elasticsearch", testPythonElasticsearch)
521+
t.Run("Python Elasticsearch", func(t *testing.T) {
522+
testPythonElasticsearch(t, "elasticsearch")
523+
})
524+
t.Run("Python Opensearch", func(t *testing.T) {
525+
testPythonElasticsearch(t, "opensearch")
526+
})
522527
require.NoError(t, compose.Close())
523528
}
524529

internal/test/integration/test_python_elasticsearchclient.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,38 @@ import (
2121
ti "go.opentelemetry.io/obi/pkg/test/integration"
2222
)
2323

24-
func testPythonElasticsearch(t *testing.T) {
25-
url := "http://localhost:8381"
26-
comm := "python3.12"
27-
index := "test_index"
24+
const (
25+
comm = "python3.12"
26+
testIndex = "test_index"
27+
testServerURL = "http://localhost:8381"
28+
)
2829

29-
waitForTestComponentsRoute(t, url, "/health")
30-
testElasticsearchSearch(t, comm, url, index)
30+
func testPythonElasticsearch(t *testing.T, dbSystemName string) {
31+
var url string
32+
switch dbSystemName {
33+
case "elasticsearch":
34+
url = "http://elasticsearchserver:9200"
35+
case "opensearch":
36+
url = "http://opensearchserver:9200"
37+
}
38+
queryParam := "?host_url=" + url
39+
waitForTestComponentsNoMetrics(t, testServerURL+"/health"+queryParam)
40+
testElasticsearchSearch(t, dbSystemName, queryParam)
3141
// populate the server is optional, the elasticsearch request will fail
3242
// but we will have the span
33-
testElasticsearchMsearch(t, comm, url)
34-
testElasticsearchBulk(t, comm, url)
35-
testElasticsearchDoc(t, comm, url, index)
43+
testElasticsearchMsearch(t, dbSystemName, queryParam)
44+
testElasticsearchBulk(t, dbSystemName, queryParam)
45+
testElasticsearchDoc(t, dbSystemName, queryParam)
3646
}
3747

38-
func testElasticsearchSearch(t *testing.T, comm, url, index string) {
48+
func testElasticsearchSearch(t *testing.T, dbSystemName, queryParam string) {
3949
queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}"
4050
urlPath := "/search"
41-
ti.DoHTTPGet(t, url+urlPath, 200)
42-
assertElasticsearchOperation(t, comm, "search", queryText, index)
51+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
52+
assertElasticsearchOperation(t, dbSystemName, "search", queryText, testIndex)
4353
}
4454

45-
func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index string) {
55+
func assertElasticsearchOperation(t *testing.T, dbSystemName, op, queryText, index string) {
4656
params := neturl.Values{}
4757
params.Add("service", comm)
4858
var operationName string
@@ -85,31 +95,31 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
8595

8696
tag, found = jaeger.FindIn(span.Tags, "db.system.name")
8797
assert.True(t, found)
88-
assert.Equal(t, "elasticsearch", tag.Value)
98+
assert.Equal(t, dbSystemName, tag.Value)
8999

90100
tag, found = jaeger.FindIn(span.Tags, "elasticsearch.node.name")
91101
assert.True(t, found)
92102
assert.Empty(t, tag.Value)
93103
}, test.Interval(100*time.Millisecond))
94104
}
95105

96-
func testElasticsearchMsearch(t *testing.T, comm, url string) {
106+
func testElasticsearchMsearch(t *testing.T, dbSystemName, queryParam string) {
97107
queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]"
98108
urlPath := "/msearch"
99-
ti.DoHTTPGet(t, url+urlPath, 200)
100-
assertElasticsearchOperation(t, comm, "msearch", queryText, "")
109+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
110+
assertElasticsearchOperation(t, dbSystemName, "msearch", queryText, "")
101111
}
102112

103-
func testElasticsearchBulk(t *testing.T, comm, url string) {
113+
func testElasticsearchBulk(t *testing.T, dbSystemName, queryParam string) {
104114
queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]"
105115
urlPath := "/bulk"
106-
ti.DoHTTPGet(t, url+urlPath, 200)
107-
assertElasticsearchOperation(t, comm, "bulk", queryText, "")
116+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
117+
assertElasticsearchOperation(t, dbSystemName, "bulk", queryText, "")
108118
}
109119

110-
func testElasticsearchDoc(t *testing.T, comm, url, index string) {
120+
func testElasticsearchDoc(t *testing.T, dbSystemName, queryParam string) {
111121
queryText := ""
112122
urlPath := "/doc"
113-
ti.DoHTTPGet(t, url+urlPath, 200)
114-
assertElasticsearchOperation(t, comm, "doc", queryText, index)
123+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
124+
assertElasticsearchOperation(t, dbSystemName, "doc", queryText, testIndex)
115125
}

pkg/appolly/app/request/span.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ type Elasticsearch struct {
181181
NodeName string `json:"nodeName"`
182182
DBOperationName string `json:"dbOperationName"`
183183
DBQueryText string `json:"dbQueryText"`
184+
DBSystemName string `json:"dbSystemName"`
184185
}
185186

186187
type AWS struct {
@@ -304,6 +305,7 @@ func spanAttributes(s *Span) SpanAttributes {
304305
attrs["nodeName"] = s.Elasticsearch.NodeName
305306
attrs["dbOperationName"] = s.Elasticsearch.DBOperationName
306307
attrs["dbQueryText"] = s.Elasticsearch.DBQueryText
308+
attrs["dbSystemName"] = s.Elasticsearch.DBSystemName
307309
}
308310
if s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil {
309311
s3 := s.AWS.S3

pkg/appolly/app/request/span_getters.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,7 @@ func spanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu
105105
case EventTypeMongoClient:
106106
return DBSystemName(semconv.DBSystemMongoDB.Value.AsString())
107107
case EventTypeHTTPClient:
108-
if span.SubType == HTTPSubtypeElasticsearch {
109-
return DBSystemName(semconv.DBSystemElasticsearch.Value.AsString())
110-
}
108+
return DBSystemName(span.Elasticsearch.DBSystemName)
111109
}
112110
return DBSystemName("unknown")
113111
}

pkg/ebpf/common/http/elasticsearch.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type elasticsearchOperation struct {
2020
NodeName string
2121
DBQueryText string
2222
DBCollectionName string
23+
DBSytemName string
2324
}
2425

2526
var elasticsearchOperationMethods = map[string]map[string]struct{}{
@@ -37,7 +38,8 @@ var elasticsearchOperationMethods = map[string]map[string]struct{}{
3738
}
3839

3940
func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
40-
if !isElasticsearchResponse(resp) {
41+
dbSystemName := elasticsearchSystemName(resp)
42+
if dbSystemName == "" {
4143
return *baseSpan, false
4244
}
4345

@@ -66,10 +68,20 @@ func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Res
6668
DBOperationName: operationName,
6769
DBCollectionName: op.DBCollectionName,
6870
DBQueryText: op.DBQueryText,
71+
DBSystemName: dbSystemName,
6972
}
7073
return *baseSpan, true
7174
}
7275

76+
func elasticsearchSystemName(resp *http.Response) string {
77+
if isElasticsearchResponse(resp) {
78+
return "elasticsearch"
79+
} else if isOpensearchResponse(resp) {
80+
return "opensearch"
81+
}
82+
return ""
83+
}
84+
7385
func parseElasticsearchRequest(req *http.Request) (elasticsearchOperation, error) {
7486
var op elasticsearchOperation
7587
reqB, err := io.ReadAll(req.Body)
@@ -104,7 +116,16 @@ func isElasticsearchResponse(resp *http.Response) bool {
104116
return headerValue == expectedValue
105117
}
106118

107-
// extractElasticsearchOperationName is a generic function used to extract the operation name
119+
// isOpensearchResponse checks if X-Opensearch-Version HTTP header is present.
120+
// Note: this header should be present from release 3.0.0
121+
// https://github.com/opensearch-project/OpenSearch/blob/dc4efa821904cc2d7ea7ef61c0f577d3fc0d8be9/server/src/main/java/org/opensearch/http/DefaultRestChannel.java#L73
122+
func isOpensearchResponse(resp *http.Response) bool {
123+
headerValue := resp.Header.Get("X-OpenSearch-Version")
124+
expectedValue := "OpenSearch/"
125+
return strings.Contains(headerValue, expectedValue)
126+
}
127+
128+
// extractOperationName is a generic function used to extract the operation name
108129
// that is the endpoint identifier provided in the request
109130
// we can have different operations where the name of the operation is found in
110131
// the last or second to last part of the url

pkg/export/otel/tracesgen/tracesgen.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func TraceAttributesSelector(span *request.Span, optionalAttrs map[attr.Name]str
356356
attrs = append(attrs, request.DBQueryText(span.Elasticsearch.DBQueryText))
357357
}
358358
attrs = append(attrs, request.DBOperationName(span.Elasticsearch.DBOperationName))
359-
attrs = append(attrs, request.DBSystemName(semconv.DBSystemElasticsearch.Value.AsString()))
359+
attrs = append(attrs, request.DBSystemName(span.Elasticsearch.DBSystemName))
360360
attrs = append(attrs, request.ErrorType(span.DBError.ErrorCode))
361361
}
362362

0 commit comments

Comments
 (0)