Skip to content

Commit 1a3338d

Browse files
committed
Add support for Opensearch client span
Signed-off-by: Giuseppe Ognibene <[email protected]>
1 parent 8b80320 commit 1a3338d

File tree

10 files changed

+359
-49
lines changed

10 files changed

+359
-49
lines changed

internal/test/integration/components/elasticsearch/main.py

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastapi import FastAPI, HTTPException
1+
from fastapi import FastAPI, HTTPException, Request
22
import os
33
import uvicorn
44
import requests
@@ -8,34 +8,44 @@
88

99
app = FastAPI()
1010
HEADERS = {'Content-Type': 'application/json'}
11-
ELASTICSEARCH_HOST = "http://elasticsearchserver:9200"
1211

1312
@app.get("/health")
14-
async def health():
15-
HEALTH_URL = ELASTICSEARCH_HOST + "/_cluster/health"
16-
13+
async def health(request: Request):
14+
host_url = request.query_params.get("host_url")
15+
if host_url is None:
16+
raise HTTPException(
17+
status_code=400,
18+
detail="The 'host_url' query parameter is required."
19+
)
20+
SERVER_URL = host_url + "/_cluster/health"
1721
try:
18-
response = requests.get(HEALTH_URL, timeout=5)
22+
response = requests.get(SERVER_URL, timeout=5)
1923
response.raise_for_status()
2024
status = response.json().get("status", "red")
2125

2226
if status in ("red","yellow"):
2327
raise HTTPException(
2428
status_code=503,
25-
detail={"status": "red","message": "Elasticsearch cluster unhealthy"})
26-
return {"status": status, "message": "Elasticsearch cluster healthy"}
29+
detail={"status": "red","message": "Cluster unhealthy"})
30+
return {"status": status, "message": "Cluster healthy"}
2731

2832
except requests.RequestException as e:
2933
raise HTTPException(
3034
status_code=503,
31-
detail={"status": "error","message": f"Cannot reach Elasticsearch cluster: {str(e)}"})
35+
detail={"status": "error","message": f"Cannot reach Cluster: {str(e)}"})
3236

3337
@app.get("/doc")
34-
async def doc():
35-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_doc/1"
38+
async def doc(request: Request):
39+
host_url = request.query_params.get("host_url")
40+
if host_url is None:
41+
raise HTTPException(
42+
status_code=400,
43+
detail="The 'host_url' query parameter is required."
44+
)
45+
SERVER_URL = host_url + "/test_index/_doc/1"
3646

3747
try:
38-
response = requests.get(ELASTICSEARCH_URL, headers=HEADERS)
48+
response = requests.get(SERVER_URL, headers=HEADERS)
3949

4050
except Exception as e:
4151
print(json.dumps({"error": str(e)}))
@@ -44,8 +54,15 @@ async def doc():
4454

4555

4656
@app.get("/search")
47-
async def search():
48-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_search"
57+
async def search(request: Request):
58+
host_url = request.query_params.get("host_url")
59+
if host_url is None:
60+
raise HTTPException(
61+
status_code=400,
62+
detail="The 'host_url' query parameter is required."
63+
)
64+
SERVER_URL = host_url + "/test_index/_search"
65+
print(SERVER_URL)
4966
query_body = {
5067
"query": {
5168
"match": {
@@ -54,16 +71,22 @@ async def search():
5471
}
5572
}
5673
try:
57-
response = requests.post(ELASTICSEARCH_URL, json=query_body, headers=HEADERS)
74+
response = requests.post(SERVER_URL, json=query_body, headers=HEADERS)
5875

5976
except Exception as e:
6077
print(json.dumps({"error": str(e)}))
6178
sys.exit(1)
6279
return {"status": "OK"}
6380

6481
@app.get("/msearch")
65-
async def msearch():
66-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch"
82+
async def msearch(request: Request):
83+
host_url = request.query_params.get("host_url")
84+
if host_url is None:
85+
raise HTTPException(
86+
status_code=400,
87+
detail="The 'host_url' query parameter is required."
88+
)
89+
SERVER_URL = host_url + "/_msearch"
6790
searches = [
6891
{},
6992
{
@@ -83,7 +106,7 @@ async def msearch():
83106
}
84107
]
85108
try:
86-
response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS)
109+
response = requests.post(SERVER_URL, json=searches, headers=HEADERS)
87110

88111
except Exception as e:
89112
print(json.dumps({"error": str(e)}))
@@ -92,8 +115,14 @@ async def msearch():
92115

93116

94117
@app.get("/bulk")
95-
async def bulk():
96-
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk"
118+
async def bulk(request: Request):
119+
host_url = request.query_params.get("host_url")
120+
if host_url is None:
121+
raise HTTPException(
122+
status_code=400,
123+
detail="The 'host_url' query parameter is required."
124+
)
125+
SERVER_URL = host_url + "/_bulk"
97126
actions=[
98127
{
99128
"index": {
@@ -132,7 +161,7 @@ async def bulk():
132161
}
133162
]
134163
try:
135-
response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS)
164+
response = requests.post(SERVER_URL, json=actions, headers=HEADERS)
136165

137166
except Exception as e:
138167
print(json.dumps({"error": str(e)}))
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
services:
2+
opensearchserver:
3+
image: opensearchproject/opensearch:3.3.1@sha256:773b05b026c01e7d520f6b857f486b7312d9290e14f2aebe4630eb80787e9eb3
4+
container_name: opensearch-server
5+
ports:
6+
- "9200:9200"
7+
environment:
8+
- discovery.type=single-node
9+
- DISABLE_SECURITY_PLUGIN=true
10+
- DISABLE_INSTALL_DEMO_CONFIG=true
11+
# we can reuse the elasticseearch server test
12+
# no need to duplicate code
13+
testserver:
14+
build:
15+
context: ../../test/integration/components/elasticsearch/
16+
dockerfile: Dockerfile
17+
image: hatest-testserver-python-opensearch
18+
ports:
19+
- "${TEST_SERVICE_PORTS}"
20+
depends_on:
21+
otelcol:
22+
condition: service_started
23+
opensearchserver:
24+
condition: service_started
25+
26+
obi:
27+
build:
28+
context: ../../..
29+
dockerfile: ./internal/test/integration/components/ebpf-instrument/Dockerfile
30+
volumes:
31+
- ./configs/:/configs
32+
- ./system/sys/kernel/security:/sys/kernel/security
33+
- ../../../testoutput:/coverage
34+
- ../../../testoutput/run-python-opensearch:/var/run/beyla
35+
image: hatest-obi
36+
container_name: hatest-obi
37+
privileged: true # in some environments (not GH Pull Requests) you can set it to false and then cap_add: [ SYS_ADMIN ]
38+
network_mode: "service:testserver"
39+
pid: "service:testserver"
40+
environment:
41+
OTEL_EBPF_CONFIG_PATH: "/configs/obi-config.yml"
42+
GOCOVERDIR: "/coverage"
43+
OTEL_EBPF_TRACE_PRINTER: "json_indent"
44+
OTEL_EBPF_OPEN_PORT: "${OTEL_EBPF_OPEN_PORT}"
45+
OTEL_EBPF_DISCOVERY_POLL_INTERVAL: 500ms
46+
OTEL_EBPF_EXECUTABLE_PATH: "${OTEL_EBPF_EXECUTABLE_PATH}"
47+
OTEL_EBPF_SERVICE_NAMESPACE: "integration-test"
48+
OTEL_EBPF_METRICS_INTERVAL: "10ms"
49+
OTEL_EBPF_BPF_BATCH_TIMEOUT: "10ms"
50+
OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT: "1ms"
51+
OTEL_EBPF_LOG_LEVEL: "DEBUG"
52+
OTEL_EBPF_LOG_CONFIG: "yaml"
53+
OTEL_EBPF_BPF_DEBUG: "TRUE"
54+
OTEL_EBPF_HOSTNAME: "beyla"
55+
OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s"
56+
OTEL_EBPF_PROCESSES_INTERVAL: "100ms"
57+
OTEL_EBPF_METRICS_FEATURES: "application"
58+
OTEL_EBPF_BPF_BUFFER_SIZE_HTTP: 1024
59+
OTEL_EBPF_HTTP_ELASTICSEARCH_ENABLED: true
60+
OTEL_EBPF_PROTOCOL_DEBUG_PRINT: true
61+
depends_on:
62+
testserver:
63+
condition: service_started
64+
65+
# OpenTelemetry Collector
66+
otelcol:
67+
image: otel/opentelemetry-collector-contrib:0.104.0@sha256:e07e325e303e86f4a87a617491e921b579a92da6d404007394757ac910bf9587
68+
container_name: otel-col
69+
deploy:
70+
resources:
71+
limits:
72+
memory: 125M
73+
restart: unless-stopped
74+
command: ["--config=/etc/otelcol-config/otelcol-config.yml"]
75+
volumes:
76+
- ./configs/:/etc/otelcol-config
77+
ports:
78+
- "4317" # OTLP over gRPC receiver
79+
- "4318:4318" # OTLP over HTTP receiver
80+
- "9464" # Prometheus exporter
81+
- "8888" # metrics endpoint
82+
depends_on:
83+
prometheus:
84+
condition: service_started
85+
86+
# Prometheus
87+
prometheus:
88+
image: quay.io/prometheus/prometheus:v2.55.1@sha256:2659f4c2ebb718e7695cb9b25ffa7d6be64db013daba13e05c875451cf51b0d3
89+
container_name: prometheus
90+
command:
91+
- --config.file=/etc/prometheus/prometheus-config.yml
92+
- --web.enable-lifecycle
93+
- --web.route-prefix=/
94+
volumes:
95+
- ./configs/:/etc/prometheus
96+
ports:
97+
- "9090:9090"
98+
99+
jaeger:
100+
image: jaegertracing/all-in-one:1.60@sha256:4fd2d70fa347d6a47e79fcb06b1c177e6079f92cba88b083153d56263082135e
101+
ports:
102+
- "16686:16686" # Query frontend
103+
- "4317" # OTEL GRPC traces collector
104+
- "4318" # OTEL HTTP traces collector
105+
environment:
106+
- COLLECTOR_OTLP_ENABLED=true
107+
- LOG_LEVEL=debug
108+
# curl http://localhost:16686/api/services
109+
# curl http://localhost:16686/api/traces?service=testserver

internal/test/integration/suites_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,16 @@ func TestSuite_PythonElasticsearch(t *testing.T) {
522522
require.NoError(t, compose.Close())
523523
}
524524

525+
func TestSuite_PythonOpensearch(t *testing.T) {
526+
compose, err := docker.ComposeSuite("docker-compose-opensearch.yml", path.Join(pathOutput, "test-suite-opensearch.log"))
527+
require.NoError(t, err)
528+
529+
compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`)
530+
require.NoError(t, compose.Up())
531+
t.Run("Python opensearch", testPythonOpensearch)
532+
require.NoError(t, compose.Close())
533+
}
534+
525535
func TestSuite_PythonAWSS3(t *testing.T) {
526536
compose, err := docker.ComposeSuite("docker-compose-python-aws.yml", path.Join(pathOutput, "test-suite-python-aws-s3.log"))
527537
require.NoError(t, err)

internal/test/integration/test_python_elasticsearchclient.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,24 @@ import (
2222
)
2323

2424
func testPythonElasticsearch(t *testing.T) {
25-
url := "http://localhost:8381"
25+
testServerURL := "http://localhost:8381"
26+
elasticsearchURL := "http://elasticsearchserver:9200"
2627
comm := "python3.12"
2728
index := "test_index"
28-
29-
waitForTestComponentsRoute(t, url, "/health")
30-
testElasticsearchSearch(t, comm, url, index)
29+
queryParam := "?host_url=" + elasticsearchURL
30+
waitForTestComponentsNoMetrics(t, testServerURL+"/health"+queryParam)
31+
testElasticsearchSearch(t, comm, testServerURL, queryParam, index)
3132
// populate the server is optional, the elasticsearch request will fail
3233
// but we will have the span
33-
testElasticsearchMsearch(t, comm, url)
34-
testElasticsearchBulk(t, comm, url)
35-
testElasticsearchDoc(t, comm, url, index)
34+
testElasticsearchMsearch(t, comm, testServerURL, queryParam)
35+
testElasticsearchBulk(t, comm, testServerURL, queryParam)
36+
testElasticsearchDoc(t, comm, testServerURL, queryParam, index)
3637
}
3738

38-
func testElasticsearchSearch(t *testing.T, comm, url, index string) {
39+
func testElasticsearchSearch(t *testing.T, comm, testServerURL, queryParam, index string) {
3940
queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}"
4041
urlPath := "/search"
41-
ti.DoHTTPGet(t, url+urlPath, 200)
42+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
4243
assertElasticsearchOperation(t, comm, "search", queryText, index)
4344
}
4445

@@ -93,23 +94,23 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
9394
}, test.Interval(100*time.Millisecond))
9495
}
9596

96-
func testElasticsearchMsearch(t *testing.T, comm, url string) {
97+
func testElasticsearchMsearch(t *testing.T, comm, testServerURL, queryParam string) {
9798
queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]"
9899
urlPath := "/msearch"
99-
ti.DoHTTPGet(t, url+urlPath, 200)
100+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
100101
assertElasticsearchOperation(t, comm, "msearch", queryText, "")
101102
}
102103

103-
func testElasticsearchBulk(t *testing.T, comm, url string) {
104+
func testElasticsearchBulk(t *testing.T, comm, testServerURL, queryParam string) {
104105
queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]"
105106
urlPath := "/bulk"
106-
ti.DoHTTPGet(t, url+urlPath, 200)
107+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
107108
assertElasticsearchOperation(t, comm, "bulk", queryText, "")
108109
}
109110

110-
func testElasticsearchDoc(t *testing.T, comm, url, index string) {
111+
func testElasticsearchDoc(t *testing.T, comm, testServerURL, queryParam, index string) {
111112
queryText := ""
112113
urlPath := "/doc"
113-
ti.DoHTTPGet(t, url+urlPath, 200)
114+
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
114115
assertElasticsearchOperation(t, comm, "doc", queryText, index)
115116
}

0 commit comments

Comments
 (0)