-
Notifications
You must be signed in to change notification settings - Fork 1.8k
test(backend): Add integration tests for artifact reading and large artifacts handling #12445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,300 @@ | ||
| // Copyright 2025 The Kubeflow Authors | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package integration | ||
|
|
||
| import ( | ||
| "encoding/base64" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "regexp" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/golang/glog" | ||
| params "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/experiment_client/experiment_service" | ||
| "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/experiment_model" | ||
| pipelineParams "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/pipeline_client/pipeline_service" | ||
| uploadParams "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/pipeline_upload_client/pipeline_upload_service" | ||
| "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/pipeline_upload_model" | ||
| runParams "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/run_client/run_service" | ||
| "github.com/kubeflow/pipelines/backend/api/v1beta1/go_http_client/run_model" | ||
| "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v1" | ||
| "github.com/kubeflow/pipelines/backend/test" | ||
| "github.com/kubeflow/pipelines/backend/test/config" | ||
| "github.com/stretchr/testify/require" | ||
| "github.com/stretchr/testify/suite" | ||
| ) | ||
|
|
||
| type ArtifactAPITest struct { | ||
| suite.Suite | ||
| namespace string | ||
| resourceNamespace string | ||
| experimentClient *api_server.ExperimentClient | ||
| pipelineClient *api_server.PipelineClient | ||
| pipelineUploadClient *api_server.PipelineUploadClient | ||
| runClient *api_server.RunClient | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) SetupTest() { | ||
| if !*runIntegrationTests { | ||
| s.T().SkipNow() | ||
| return | ||
| } | ||
|
|
||
| if !*isDevMode { | ||
| err := test.WaitForReady(*initializeTimeout) | ||
| if err != nil { | ||
| glog.Exitf("Failed to initialize test. Error: %s", err.Error()) | ||
| } | ||
| } | ||
| s.namespace = *config.Namespace | ||
| s.resourceNamespace = *config.Namespace | ||
| clientConfig := test.GetClientConfig(*config.Namespace) | ||
| var err error | ||
|
|
||
| s.experimentClient, err = api_server.NewExperimentClient(clientConfig, false) | ||
| if err != nil { | ||
| glog.Exitf("Failed to get experiment client. Error: %v", err) | ||
| } | ||
| s.pipelineClient, err = api_server.NewPipelineClient(clientConfig, false) | ||
| if err != nil { | ||
| glog.Exitf("Failed to get pipeline client. Error: %v", err) | ||
| } | ||
| s.runClient, err = api_server.NewRunClient(clientConfig, false) | ||
| if err != nil { | ||
| glog.Exitf("Failed to get run client. Error: %v", err) | ||
| } | ||
| s.pipelineUploadClient, err = api_server.NewPipelineUploadClient(clientConfig, false) | ||
| if err != nil { | ||
| glog.Exitf("Failed to get pipeline upload client. Error: %v", err) | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) TearDownSuite() { | ||
| if *runIntegrationTests { | ||
| if !*isDevMode { | ||
| s.cleanUp() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) TearDownTest() { | ||
| if *runIntegrationTests { | ||
| s.cleanUp() | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) cleanUp() { | ||
| test.DeleteAllRuns(s.runClient, s.namespace, s.T()) | ||
| test.DeleteAllExperiments(s.experimentClient, s.namespace, s.T()) | ||
| test.DeleteAllPipelines(s.pipelineClient, s.T()) | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) TestV1PipelineArtifactRead() { | ||
| runID, experimentID, pipelineID := s.runPipeline() | ||
|
|
||
| defer func() { | ||
| s.deleteRun(runID) | ||
| s.deleteExperiment(experimentID) | ||
| s.deleteAllPipelineVersions(pipelineID) | ||
| }() | ||
|
|
||
| s.waitForRunCompletion(runID) | ||
|
|
||
| nodeID := s.extractWorkflowNodeID(runID) | ||
| artifactName := "generate-large-artifact-large_file" | ||
|
|
||
| s.testReadArtifactEndpoint(runID, nodeID, artifactName) | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) deleteAllPipelineVersions(pipelineID string) { | ||
| test.DeleteAllPipelineVersions(s.pipelineClient, s.T(), pipelineID) | ||
| if err := s.pipelineClient.Delete(&pipelineParams.PipelineServiceDeletePipelineV1Params{ | ||
| ID: pipelineID, | ||
| }); err != nil { | ||
| s.T().Logf("Failed to clean up test pipeline %s: %v", pipelineID, err) | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) deleteExperiment(experimentID string) { | ||
| if err := s.experimentClient.Delete(¶ms.ExperimentServiceDeleteExperimentV1Params{ | ||
| ID: experimentID, | ||
| }); err != nil { | ||
| s.T().Logf("Failed to clean up test experiment %s: %v", experimentID, err) | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) deleteRun(runID string) { | ||
| if err := s.runClient.Delete(&runParams.RunServiceDeleteRunV1Params{ | ||
| ID: runID, | ||
| }); err != nil { | ||
| s.T().Logf("Failed to clean up test run %s: %v", runID, err) | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) createExperiment() *experiment_model.APIExperiment { | ||
| experimentName := fmt.Sprintf("artifact-test-experiment-%d", time.Now().Unix()) | ||
| experiment := test.GetExperiment(experimentName, "Test for artifact reading", s.namespace) | ||
| experiment, err := s.experimentClient.Create(¶ms.ExperimentServiceCreateExperimentV1Params{Experiment: experiment}) | ||
| require.NoError(s.T(), err) | ||
| return experiment | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) uploadPipeline() *pipeline_upload_model.APIPipeline { | ||
| pipelineParams := uploadParams.NewUploadPipelineParams() | ||
| pipelineName := fmt.Sprintf("large-artifact-test-%d", time.Now().Unix()) | ||
| pipelineParams.SetName(&pipelineName) | ||
| pipeline, err := s.pipelineUploadClient.UploadFile("../resources/large_artifact_pipeline.yaml", pipelineParams) | ||
| require.NoError(s.T(), err, "Failed to upload pipeline") | ||
| return pipeline | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) runPipeline() (runID, experimentID, pipelineID string) { | ||
| experiment := s.createExperiment() | ||
| pipeline := s.uploadPipeline() | ||
|
|
||
| runRequest := &runParams.RunServiceCreateRunV1Params{Run: &run_model.APIRun{ | ||
| Name: fmt.Sprintf("test-artifact-run-%d", time.Now().Unix()), | ||
| PipelineSpec: &run_model.APIPipelineSpec{ | ||
| PipelineID: pipeline.ID, | ||
| }, | ||
| ResourceReferences: []*run_model.APIResourceReference{ | ||
| { | ||
| Key: &run_model.APIResourceKey{ | ||
| Type: run_model.APIResourceTypeEXPERIMENT.Pointer(), | ||
| ID: experiment.ID, | ||
| }, | ||
| Relationship: run_model.APIRelationshipOWNER.Pointer(), | ||
| }, | ||
| }, | ||
| }} | ||
| run, _, err := s.runClient.Create(runRequest) | ||
| require.NoError(s.T(), err) | ||
| return run.Run.ID, experiment.ID, pipeline.ID | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) waitForRunCompletion(runID string) { | ||
| t := s.T() | ||
| maxWaitTime := 2 * time.Minute | ||
| startTime := time.Now() | ||
|
|
||
| for time.Since(startTime) < maxWaitTime { | ||
| runDetail, _, err := s.runClient.Get(&runParams.RunServiceGetRunV1Params{RunID: runID}) | ||
| require.NoError(t, err) | ||
|
|
||
| if runDetail.Run.Status == "Succeeded" || runDetail.Run.Status == "Completed" || | ||
| runDetail.Run.Status == "Failed" || runDetail.Run.Status == "Error" { | ||
| require.Contains(t, []string{"Succeeded", "Completed"}, runDetail.Run.Status, | ||
| "Run should have succeeded") | ||
| t.Log("Run completed") | ||
| return | ||
| } | ||
|
|
||
| time.Sleep(2 * time.Second) | ||
| } | ||
|
|
||
| require.Fail(t, "Run did not complete within %v", maxWaitTime) | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) extractWorkflowNodeID(runID string) string { | ||
| t := s.T() | ||
|
|
||
| resp, err := http.Get(fmt.Sprintf("http://localhost:8888/apis/v1beta1/runs/%s", runID)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you re-use the api server host info (as well as the scheme http/https) from |
||
| require.NoError(t, err) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. handle error (same with other resp.Body.Close() calls) |
||
| body, err := io.ReadAll(resp.Body) | ||
| resp.Body.Close() | ||
| require.NoError(t, err) | ||
|
|
||
| runDetailsStr := string(body) | ||
| re := regexp.MustCompile(`large-artifact-memory-test-v1-1gb-[a-z0-9]+-[0-9]+`) | ||
| matches := re.FindAllString(runDetailsStr, -1) | ||
|
|
||
| require.NotEmpty(t, matches, "Could not find workflow name in run details") | ||
|
|
||
| nodeID := matches[0] | ||
| t.Logf("Found workflow name: %s", nodeID) | ||
|
|
||
| return nodeID | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) testReadArtifactEndpoint(runID, nodeID, artifactName string) { | ||
| t := s.T() | ||
|
|
||
| baseURL := "http://localhost:8888" | ||
| artifactURL := fmt.Sprintf("%s/apis/v1beta1/runs/%s/nodes/%s/artifacts/%s:read", | ||
| baseURL, runID, nodeID, artifactName) | ||
|
|
||
| t.Logf("Testing ReadArtifact endpoint: %s", artifactURL) | ||
|
|
||
| resp, err := http.Get(artifactURL) | ||
| require.NoError(t, err) | ||
| defer resp.Body.Close() | ||
|
|
||
| require.Equal(t, http.StatusOK, resp.StatusCode, | ||
| "ReadArtifact endpoint should return 200 OK, got %d", resp.StatusCode) | ||
|
|
||
| s.validateResponseHeaders(resp) | ||
|
|
||
| body, err := io.ReadAll(resp.Body) | ||
| require.NoError(t, err) | ||
| require.NotEmpty(t, body) | ||
|
|
||
| // The response is JSON with base64-encoded gzip data | ||
| var jsonResponse struct { | ||
| Data string `json:"data"` | ||
| } | ||
| err = json.Unmarshal(body, &jsonResponse) | ||
| require.NoError(t, err, "Failed to parse JSON response") | ||
| require.NotEmpty(t, jsonResponse.Data, "JSON response should contain 'data' field") | ||
|
|
||
| decodedData, err := base64.StdEncoding.DecodeString(jsonResponse.Data) | ||
| require.NoError(t, err, "Failed to decode base64 data") | ||
| require.NotEmpty(t, decodedData) | ||
|
|
||
| s.requireGzipCompressed(decodedData) | ||
|
|
||
| t.Logf("Successfully downloaded gzip compressed artifact (decoded size: %d bytes)", len(decodedData)) | ||
| t.Log("ReadArtifact endpoint validated successfully") | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) validateResponseHeaders(resp *http.Response) { | ||
| t := s.T() | ||
|
|
||
| contentType := resp.Header.Get("Content-Type") | ||
| require.Equal(t, "application/json", contentType, | ||
| "Content-Type should be application/json, got %s", contentType) | ||
|
|
||
| contentEncoding := resp.Header.Get("Content-Encoding") | ||
| require.Empty(t, contentEncoding, | ||
| "Content-Encoding should not be set for JSON response (gzip is base64-encoded in JSON)") | ||
|
|
||
| t.Log("Response headers:") | ||
| for key, values := range resp.Header { | ||
| for _, value := range values { | ||
| t.Logf(" %s: %s", key, value) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (s *ArtifactAPITest) requireGzipCompressed(body []byte) { | ||
| require.True(s.T(), len(body) > 2 && body[0] == 0x1f && body[1] == 0x8b, | ||
| "Artifact should be gzip compressed") | ||
| } | ||
|
|
||
| func TestArtifactAPI(t *testing.T) { | ||
| suite.Run(t, new(ArtifactAPITest)) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| import kfp | ||
| from kfp import dsl | ||
|
|
||
|
|
||
| @dsl.pipeline(name="large-artifact-memory-test-v1-1gb") | ||
| def large_artifact_pipeline_v1(): | ||
| """V1 pipeline that generates a large artifact for testing streaming endpoint""" | ||
|
|
||
| generate_task = dsl.ContainerOp( | ||
| name='generate-large-artifact', | ||
| image='python:3.9-slim', | ||
| command=['python3', '-c'], | ||
| arguments=[''' | ||
| import os | ||
|
|
||
| size_mb = 10 # 10MB | ||
| chunk_size = 1024 * 1024 # 1MB chunks | ||
|
|
||
| # Create a file that will be saved as an artifact | ||
| artifact_path = "/tmp/large_artifact.bin" | ||
|
|
||
| print(f"Generating {size_mb}MB test file at {artifact_path}") | ||
|
|
||
| # Ensure the directory exists | ||
| os.makedirs(os.path.dirname(artifact_path), exist_ok=True) | ||
|
|
||
| with open(artifact_path, 'wb') as f: | ||
| for i in range(size_mb): | ||
| data = f"CHUNK_{i:04d}_" + "X" * (chunk_size - 20) + f"_END_{i:04d}\\n" | ||
| f.write(data[:chunk_size].encode('utf-8')) | ||
|
|
||
| file_size = os.path.getsize(artifact_path) | ||
| print(f"Generated file size: {file_size / (1024 * 1024):.2f}MB") | ||
| print(f"Artifact saved at: {artifact_path}") | ||
| '''], | ||
| file_outputs={'large_file': '/tmp/large_artifact.bin'} | ||
| ) | ||
|
|
||
| generate_task.set_display_name("Generate Large Artifact V1 - 1GB") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| # Compile the pipeline in v1 mode | ||
| kfp.compiler.Compiler().compile( | ||
| pipeline_func=large_artifact_pipeline_v1, | ||
| package_path="large_artifact_pipeline.yaml" | ||
| ) | ||
| print("Pipeline compiled to large_artifact_pipeline.yaml") |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I should have been more clear, what I was suggesting we test is the endpoint allows us to continue testing that we can download artifacts - and not that we can download large files. My concern around download large files is that it can choke the CI and cause flaky behavior.
My preference is to just write a simple pipeline that will output an artifact (using @component instead of
ContainerOp). The artifact shouldn't be large, we just need to confirm we can continue to download the artifact via this endpoint, and then verify the data is what we expect. This helps us ensure your PR doesn't break the current api behavior.Verifying large downloads in master doesn't help us verify the issue once it's fixed, since to do that you would need to emulate the DOS for api server, and this is more of a perf test for which we don't have CI infrastructure for. So I think we should just rely on the fact blob handles this gracefully since that's in its spec, and don't test large file downloads for this endpoint.
Also, you can make this part of the original PR to keep things simple.