Skip to content

Commit c5e5bdd

Browse files
committed
test(backend): Add integration tests for artifact reading and large artifacts handling
Signed-off-by: Helber Belmiro <[email protected]>
1 parent ce49b0a commit c5e5bdd

File tree

3 files changed

+406
-0
lines changed

3 files changed

+406
-0
lines changed
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
// Copyright 2025 The Kubeflow Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package integration
16+
17+
import (
18+
"encoding/base64"
19+
"encoding/json"
20+
"fmt"
21+
"io"
22+
"net/http"
23+
"regexp"
24+
"testing"
25+
"time"
26+
27+
"github.com/golang/glog"
28+
params "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/experiment_client/experiment_service"
29+
"github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/experiment_model"
30+
pipelineParams "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/pipeline_client/pipeline_service"
31+
uploadParams "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/pipeline_upload_client/pipeline_upload_service"
32+
"github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/pipeline_upload_model"
33+
runParams "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/run_client/run_service"
34+
"github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/run_model"
35+
"github.com/kubeflow/pipelines/backend/src/common/client/api_server/v1"
36+
"github.com/kubeflow/pipelines/backend/test"
37+
"github.com/kubeflow/pipelines/backend/test/config"
38+
"github.com/stretchr/testify/require"
39+
"github.com/stretchr/testify/suite"
40+
)
41+
42+
type ArtifactAPITest struct {
43+
suite.Suite
44+
namespace string
45+
resourceNamespace string
46+
experimentClient *api_server.ExperimentClient
47+
pipelineClient *api_server.PipelineClient
48+
pipelineUploadClient *api_server.PipelineUploadClient
49+
runClient *api_server.RunClient
50+
}
51+
52+
func (s *ArtifactAPITest) SetupTest() {
53+
if !*runIntegrationTests {
54+
s.T().SkipNow()
55+
return
56+
}
57+
58+
if !*isDevMode {
59+
err := test.WaitForReady(*initializeTimeout)
60+
if err != nil {
61+
glog.Exitf("Failed to initialize test. Error: %s", err.Error())
62+
}
63+
}
64+
s.namespace = *config.Namespace
65+
s.resourceNamespace = *config.Namespace
66+
clientConfig := test.GetClientConfig(*config.Namespace)
67+
var err error
68+
69+
s.experimentClient, err = api_server.NewExperimentClient(clientConfig, false)
70+
if err != nil {
71+
glog.Exitf("Failed to get experiment client. Error: %v", err)
72+
}
73+
s.pipelineClient, err = api_server.NewPipelineClient(clientConfig, false)
74+
if err != nil {
75+
glog.Exitf("Failed to get pipeline client. Error: %v", err)
76+
}
77+
s.runClient, err = api_server.NewRunClient(clientConfig, false)
78+
if err != nil {
79+
glog.Exitf("Failed to get run client. Error: %v", err)
80+
}
81+
s.pipelineUploadClient, err = api_server.NewPipelineUploadClient(clientConfig, false)
82+
if err != nil {
83+
glog.Exitf("Failed to get pipeline upload client. Error: %v", err)
84+
}
85+
}
86+
87+
func (s *ArtifactAPITest) TearDownSuite() {
88+
if *runIntegrationTests {
89+
if !*isDevMode {
90+
s.cleanUp()
91+
}
92+
}
93+
}
94+
95+
func (s *ArtifactAPITest) TearDownTest() {
96+
if *runIntegrationTests {
97+
s.cleanUp()
98+
}
99+
}
100+
101+
func (s *ArtifactAPITest) cleanUp() {
102+
test.DeleteAllRuns(s.runClient, s.namespace, s.T())
103+
test.DeleteAllExperiments(s.experimentClient, s.namespace, s.T())
104+
test.DeleteAllPipelines(s.pipelineClient, s.T())
105+
}
106+
107+
func (s *ArtifactAPITest) TestV1PipelineArtifactRead() {
108+
runID, experimentID, pipelineID := s.runPipeline()
109+
110+
defer func() {
111+
s.deleteRun(runID)
112+
s.deleteExperiment(experimentID)
113+
s.deleteAllPipelineVersions(pipelineID)
114+
}()
115+
116+
s.waitForRunCompletion(runID)
117+
118+
nodeID := s.extractWorkflowNodeID(runID)
119+
artifactName := "generate-large-artifact-large_file"
120+
121+
s.testReadArtifactEndpoint(runID, nodeID, artifactName)
122+
}
123+
124+
func (s *ArtifactAPITest) deleteAllPipelineVersions(pipelineID string) {
125+
test.DeleteAllPipelineVersions(s.pipelineClient, s.T(), pipelineID)
126+
if err := s.pipelineClient.Delete(&pipelineParams.PipelineServiceDeletePipelineV1Params{
127+
ID: pipelineID,
128+
}); err != nil {
129+
s.T().Logf("Failed to clean up test pipeline %s: %v", pipelineID, err)
130+
}
131+
}
132+
133+
func (s *ArtifactAPITest) deleteExperiment(experimentID string) {
134+
if err := s.experimentClient.Delete(&params.ExperimentServiceDeleteExperimentV1Params{
135+
ID: experimentID,
136+
}); err != nil {
137+
s.T().Logf("Failed to clean up test experiment %s: %v", experimentID, err)
138+
}
139+
}
140+
141+
func (s *ArtifactAPITest) deleteRun(runID string) {
142+
if err := s.runClient.Delete(&runParams.RunServiceDeleteRunV1Params{
143+
ID: runID,
144+
}); err != nil {
145+
s.T().Logf("Failed to clean up test run %s: %v", runID, err)
146+
}
147+
}
148+
149+
func (s *ArtifactAPITest) createExperiment() *experiment_model.APIExperiment {
150+
experimentName := fmt.Sprintf("artifact-test-experiment-%d", time.Now().Unix())
151+
experiment := test.GetExperiment(experimentName, "Test for artifact reading", s.namespace)
152+
experiment, err := s.experimentClient.Create(&params.ExperimentServiceCreateExperimentV1Params{Experiment: experiment})
153+
require.NoError(s.T(), err)
154+
return experiment
155+
}
156+
157+
func (s *ArtifactAPITest) uploadPipeline() *pipeline_upload_model.APIPipeline {
158+
pipelineParams := uploadParams.NewUploadPipelineParams()
159+
pipelineName := fmt.Sprintf("large-artifact-test-%d", time.Now().Unix())
160+
pipelineParams.SetName(&pipelineName)
161+
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/large_artifact_pipeline.yaml", pipelineParams)
162+
require.NoError(s.T(), err, "Failed to upload pipeline")
163+
return pipeline
164+
}
165+
166+
func (s *ArtifactAPITest) runPipeline() (runID, experimentID, pipelineID string) {
167+
experiment := s.createExperiment()
168+
pipeline := s.uploadPipeline()
169+
170+
runRequest := &runParams.RunServiceCreateRunV1Params{Run: &run_model.APIRun{
171+
Name: fmt.Sprintf("test-artifact-run-%d", time.Now().Unix()),
172+
PipelineSpec: &run_model.APIPipelineSpec{
173+
PipelineID: pipeline.ID,
174+
},
175+
ResourceReferences: []*run_model.APIResourceReference{
176+
{
177+
Key: &run_model.APIResourceKey{
178+
Type: run_model.APIResourceTypeEXPERIMENT.Pointer(),
179+
ID: experiment.ID,
180+
},
181+
Relationship: run_model.APIRelationshipOWNER.Pointer(),
182+
},
183+
},
184+
}}
185+
run, _, err := s.runClient.Create(runRequest)
186+
require.NoError(s.T(), err)
187+
return run.Run.ID, experiment.ID, pipeline.ID
188+
}
189+
190+
func (s *ArtifactAPITest) waitForRunCompletion(runID string) {
191+
t := s.T()
192+
maxWaitTime := 2 * time.Minute
193+
startTime := time.Now()
194+
195+
for time.Since(startTime) < maxWaitTime {
196+
runDetail, _, err := s.runClient.Get(&runParams.RunServiceGetRunV1Params{RunID: runID})
197+
require.NoError(t, err)
198+
199+
if runDetail.Run.Status == "Succeeded" || runDetail.Run.Status == "Completed" ||
200+
runDetail.Run.Status == "Failed" || runDetail.Run.Status == "Error" {
201+
require.Contains(t, []string{"Succeeded", "Completed"}, runDetail.Run.Status,
202+
"Run should have succeeded")
203+
t.Log("Run completed")
204+
return
205+
}
206+
207+
time.Sleep(2 * time.Second)
208+
}
209+
210+
require.Fail(t, "Run did not complete within %v", maxWaitTime)
211+
}
212+
213+
func (s *ArtifactAPITest) extractWorkflowNodeID(runID string) string {
214+
t := s.T()
215+
216+
resp, err := http.Get(fmt.Sprintf("http://localhost:8888/apis/v1beta1/runs/%s", runID))
217+
require.NoError(t, err)
218+
body, err := io.ReadAll(resp.Body)
219+
resp.Body.Close()
220+
require.NoError(t, err)
221+
222+
runDetailsStr := string(body)
223+
re := regexp.MustCompile(`large-artifact-memory-test-v1-1gb-[a-z0-9]+-[0-9]+`)
224+
matches := re.FindAllString(runDetailsStr, -1)
225+
226+
require.NotEmpty(t, matches, "Could not find workflow name in run details")
227+
228+
nodeID := matches[0]
229+
t.Logf("Found workflow name: %s", nodeID)
230+
231+
return nodeID
232+
}
233+
234+
func (s *ArtifactAPITest) testReadArtifactEndpoint(runID, nodeID, artifactName string) {
235+
t := s.T()
236+
237+
baseURL := "http://localhost:8888"
238+
artifactURL := fmt.Sprintf("%s/apis/v1beta1/runs/%s/nodes/%s/artifacts/%s:read",
239+
baseURL, runID, nodeID, artifactName)
240+
241+
t.Logf("Testing ReadArtifact endpoint: %s", artifactURL)
242+
243+
resp, err := http.Get(artifactURL)
244+
require.NoError(t, err)
245+
defer resp.Body.Close()
246+
247+
require.Equal(t, http.StatusOK, resp.StatusCode,
248+
"ReadArtifact endpoint should return 200 OK, got %d", resp.StatusCode)
249+
250+
s.validateResponseHeaders(resp)
251+
252+
body, err := io.ReadAll(resp.Body)
253+
require.NoError(t, err)
254+
require.NotEmpty(t, body)
255+
256+
// The response is JSON with base64-encoded gzip data
257+
var jsonResponse struct {
258+
Data string `json:"data"`
259+
}
260+
err = json.Unmarshal(body, &jsonResponse)
261+
require.NoError(t, err, "Failed to parse JSON response")
262+
require.NotEmpty(t, jsonResponse.Data, "JSON response should contain 'data' field")
263+
264+
decodedData, err := base64.StdEncoding.DecodeString(jsonResponse.Data)
265+
require.NoError(t, err, "Failed to decode base64 data")
266+
require.NotEmpty(t, decodedData)
267+
268+
s.requireGzipCompressed(decodedData)
269+
270+
t.Logf("Successfully downloaded gzip compressed artifact (decoded size: %d bytes)", len(decodedData))
271+
t.Log("ReadArtifact endpoint validated successfully")
272+
}
273+
274+
func (s *ArtifactAPITest) validateResponseHeaders(resp *http.Response) {
275+
t := s.T()
276+
277+
contentType := resp.Header.Get("Content-Type")
278+
require.Equal(t, "application/json", contentType,
279+
"Content-Type should be application/json, got %s", contentType)
280+
281+
contentEncoding := resp.Header.Get("Content-Encoding")
282+
require.Empty(t, contentEncoding,
283+
"Content-Encoding should not be set for JSON response (gzip is base64-encoded in JSON)")
284+
285+
t.Log("Response headers:")
286+
for key, values := range resp.Header {
287+
for _, value := range values {
288+
t.Logf(" %s: %s", key, value)
289+
}
290+
}
291+
}
292+
293+
func (s *ArtifactAPITest) requireGzipCompressed(body []byte) {
294+
require.True(s.T(), len(body) > 2 && body[0] == 0x1f && body[1] == 0x8b,
295+
"Artifact should be gzip compressed")
296+
}
297+
298+
func TestArtifactAPI(t *testing.T) {
299+
suite.Run(t, new(ArtifactAPITest))
300+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#!/usr/bin/env python3
2+
3+
import kfp
4+
from kfp import dsl
5+
6+
7+
@dsl.pipeline(name="large-artifact-memory-test-v1-1gb")
8+
def large_artifact_pipeline_v1():
9+
"""V1 pipeline that generates a large artifact for testing streaming endpoint"""
10+
11+
generate_task = dsl.ContainerOp(
12+
name='generate-large-artifact',
13+
image='python:3.9-slim',
14+
command=['python3', '-c'],
15+
arguments=['''
16+
import os
17+
18+
size_mb = 10 # 10MB
19+
chunk_size = 1024 * 1024 # 1MB chunks
20+
21+
# Create a file that will be saved as an artifact
22+
artifact_path = "/tmp/large_artifact.bin"
23+
24+
print(f"Generating {size_mb}MB test file at {artifact_path}")
25+
26+
# Ensure the directory exists
27+
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
28+
29+
with open(artifact_path, 'wb') as f:
30+
for i in range(size_mb):
31+
data = f"CHUNK_{i:04d}_" + "X" * (chunk_size - 20) + f"_END_{i:04d}\\n"
32+
f.write(data[:chunk_size].encode('utf-8'))
33+
34+
file_size = os.path.getsize(artifact_path)
35+
print(f"Generated file size: {file_size / (1024 * 1024):.2f}MB")
36+
print(f"Artifact saved at: {artifact_path}")
37+
'''],
38+
file_outputs={'large_file': '/tmp/large_artifact.bin'}
39+
)
40+
41+
generate_task.set_display_name("Generate Large Artifact V1 - 1GB")
42+
43+
44+
if __name__ == "__main__":
45+
# Compile the pipeline in v1 mode
46+
kfp.compiler.Compiler().compile(
47+
pipeline_func=large_artifact_pipeline_v1,
48+
package_path="large_artifact_pipeline.yaml"
49+
)
50+
print("Pipeline compiled to large_artifact_pipeline.yaml")

0 commit comments

Comments
 (0)