Skip to content

Commit c53ec35

Browse files
committed
Fallback to Argo Workflow archives for log collection
Signed-off-by: mprahl <[email protected]>
1 parent 36c8437 commit c53ec35

File tree

2 files changed

+249
-34
lines changed

2 files changed

+249
-34
lines changed

.github/actions/deploy/action.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,47 @@ runs:
117117
fi
118118
echo "ARGS=$ARGS" >> "$GITHUB_OUTPUT"
119119
120+
- name: Download Argo CLI
121+
id: download-argo-cli
122+
shell: bash
123+
continue-on-error: true
124+
env:
125+
ARGO_VERSION_INPUT: ${{ inputs.argo_version }}
126+
run: |
127+
set -euo pipefail
128+
129+
if command -v argo >/dev/null 2>&1; then
130+
echo "Argo CLI already available on PATH."
131+
exit 0
132+
fi
133+
134+
ARGO_VERSION=""
135+
if [[ -n "${ARGO_VERSION_INPUT}" ]]; then
136+
ARGO_VERSION="${ARGO_VERSION_INPUT}"
137+
elif [[ -f "third_party/argo/VERSION" ]]; then
138+
ARGO_VERSION="$(head -n 1 third_party/argo/VERSION | tr -d '[:space:]')"
139+
fi
140+
141+
if [[ -z "${ARGO_VERSION}" ]]; then
142+
echo "Unable to determine Argo Workflows version." >&2
143+
exit 1
144+
fi
145+
146+
if [[ "${ARGO_VERSION}" != v* ]]; then
147+
ARGO_VERSION="v${ARGO_VERSION}"
148+
fi
149+
150+
DOWNLOAD_URL="https://github.com/argoproj/argo-workflows/releases/download/${ARGO_VERSION}/argo-linux-amd64.gz"
151+
TMP_DIR="$(mktemp -d)"
152+
echo "Attempting to download Argo CLI ${ARGO_VERSION} from ${DOWNLOAD_URL}"
153+
curl -sSfL "${DOWNLOAD_URL}" -o "${TMP_DIR}/argo.gz"
154+
gunzip "${TMP_DIR}/argo.gz"
155+
chmod +x "${TMP_DIR}/argo-linux-amd64"
156+
mv "${TMP_DIR}/argo-linux-amd64" "${TMP_DIR}/argo"
157+
echo "${TMP_DIR}" >> "$GITHUB_PATH"
158+
echo "Downloaded Argo CLI to ${TMP_DIR}/argo"
159+
ls -l "${TMP_DIR}/argo"
160+
120161
- name: Deploy KFP
121162
id: deploy-kfp
122163
if: ${{ steps.configure.outcome == 'success' }}

backend/test/end2end/utils/e2e_utils.go

Lines changed: 208 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,39 @@
22
package utils
33

44
import (
5+
"context"
56
"fmt"
67
"maps"
8+
"os/exec"
79
"sort"
10+
"strings"
11+
"sync"
812
"time"
913

1014
runparams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service"
1115
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model"
1216
apiserver "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
17+
"github.com/kubeflow/pipelines/backend/src/common/util"
1318
"github.com/kubeflow/pipelines/backend/test/config"
1419
"github.com/kubeflow/pipelines/backend/test/logger"
1520
"github.com/kubeflow/pipelines/backend/test/testutil"
1621
apitests "github.com/kubeflow/pipelines/backend/test/v2/api"
1722

1823
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
24+
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
1925
"github.com/onsi/ginkgo/v2"
2026
"github.com/onsi/gomega"
2127
v1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2229
"k8s.io/client-go/kubernetes"
2330
)
2431

32+
var (
33+
workflowClient versioned.Interface
34+
workflowClientOnce sync.Once
35+
workflowClientErr error
36+
)
37+
2538
// CreatePipelineRun - Create a pipeline run
2639
func CreatePipelineRun(runClient *apiserver.RunClient, testContext *apitests.TestContext, pipelineID *string, pipelineVersionID *string, experimentID *string, inputParams map[string]interface{}) *run_model.V2beta1Run {
2740
runName := fmt.Sprintf("E2e Test Run-%v", testContext.TestStartTimeUTC)
@@ -90,63 +103,224 @@ func ValidateComponentStatuses(runClient *apiserver.RunClient, k8Client *kuberne
90103
gomega.Expect(len(actualTaskDetails)).To(gomega.BeNumerically(">=", len(expectedTaskDetails)), "Number of created DAG tasks should be >= number of expected tasks")
91104
}
92105
}
93-
94106
}
95107

96108
// CapturePodLogsForUnsuccessfulTasks - Capture pod logs of a failed component
97109
func CapturePodLogsForUnsuccessfulTasks(k8Client *kubernetes.Clientset, testContext *apitests.TestContext, taskDetails []*run_model.V2beta1PipelineTaskDetail) {
98110
failedTasks := make(map[string]string)
111+
archivedLogCache := make(map[string]string)
112+
namespace := testutil.GetNamespace()
99113
sort.Slice(taskDetails, func(i, j int) bool {
100114
return time.Time(taskDetails[i].EndTime).After(time.Time(taskDetails[j].EndTime)) // Sort Tasks by End Time in descending order
101115
})
102116
for _, task := range taskDetails {
103-
if task.State != nil {
104-
switch *task.State {
105-
case run_model.V2beta1RuntimeStateSUCCEEDED:
106-
{
107-
logger.Log("SUCCEEDED - Task %s for run %s has finished successfully", task.DisplayName, task.RunID)
108-
}
109-
case run_model.V2beta1RuntimeStateRUNNING:
110-
{
111-
logger.Log("RUNNING - Task %s for Run %s is running", task.DisplayName, task.RunID)
117+
if task.State == nil {
118+
continue
119+
}
112120

121+
switch *task.State {
122+
case run_model.V2beta1RuntimeStateSUCCEEDED:
123+
logger.Log("SUCCEEDED - Task %s for run %s has finished successfully", task.DisplayName, task.RunID)
124+
case run_model.V2beta1RuntimeStateRUNNING:
125+
logger.Log("RUNNING - Task %s for Run %s is running", task.DisplayName, task.RunID)
126+
case run_model.V2beta1RuntimeStateSKIPPED:
127+
logger.Log("SKIPPED - Task %s for Run %s skipped", task.DisplayName, task.RunID)
128+
case run_model.V2beta1RuntimeStateCANCELED:
129+
logger.Log("CANCELED - Task %s for Run %s canceled", task.DisplayName, task.RunID)
130+
case run_model.V2beta1RuntimeStateFAILED:
131+
logger.Log("%s - Task %s for Run %s did not complete successfully", *task.State, task.DisplayName, task.RunID)
132+
133+
podNames := map[string]struct{}{}
134+
if task.PodName != "" {
135+
podNames[task.PodName] = struct{}{}
136+
}
137+
for _, childTask := range task.ChildTasks {
138+
if childTask.PodName != "" {
139+
podNames[childTask.PodName] = struct{}{}
113140
}
114-
case run_model.V2beta1RuntimeStateSKIPPED:
115-
{
116-
logger.Log("SKIPPED - Task %s for Run %s skipped", task.DisplayName, task.RunID)
117-
}
118-
case run_model.V2beta1RuntimeStateCANCELED:
119-
{
120-
logger.Log("CANCELED - Task %s for Run %s canceled", task.DisplayName, task.RunID)
121-
}
122-
case run_model.V2beta1RuntimeStateFAILED:
123-
{
124-
logger.Log("%s - Task %s for Run %s did not complete successfully", *task.State, task.DisplayName, task.RunID)
125-
for _, childTask := range task.ChildTasks {
126-
podName := childTask.PodName
127-
if podName != "" {
128-
logger.Log("Capturing pod logs for task %s, with pod name %s", task.DisplayName, podName)
129-
podLog := testutil.ReadPodLogs(k8Client, *config.Namespace, podName, nil, &testContext.TestStartTimeUTC, config.PodLogLimit)
130-
logger.Log("Pod logs captured for task %s in pod %s", task.DisplayName, podName)
131-
logger.Log("Attaching pod logs to the report")
132-
ginkgo.AddReportEntry(fmt.Sprintf("Failing '%s' Component Log", task.DisplayName), podLog)
133-
logger.Log("Attached pod logs to the report")
141+
}
142+
143+
if len(podNames) == 0 {
144+
logger.Log("Task %s for Run %s did not report any pod names", task.DisplayName, task.RunID)
145+
failedTasks[task.DisplayName] = string(*task.State)
146+
continue
147+
}
148+
149+
var combinedLog strings.Builder
150+
for podName := range podNames {
151+
logger.Log("Collecting logs for task %s pod %s", task.DisplayName, podName)
152+
combinedLog.WriteString(fmt.Sprintf("===== Pod: %s =====\n", podName))
153+
154+
podLog := testutil.ReadPodLogs(k8Client, namespace, podName, nil, &testContext.TestStartTimeUTC, config.PodLogLimit)
155+
missingPod := false
156+
if hasMeaningfulLogs(podLog) {
157+
combinedLog.WriteString("----- Live Logs (kubectl) -----\n")
158+
combinedLog.WriteString(podLog)
159+
if !strings.HasSuffix(podLog, "\n") {
160+
combinedLog.WriteString("\n")
161+
}
162+
} else {
163+
if strings.Contains(strings.ToLower(podLog), "not found") {
164+
missingPod = true
165+
}
166+
if strings.TrimSpace(podLog) != "" {
167+
combinedLog.WriteString(podLog)
168+
if !strings.HasSuffix(podLog, "\n") {
169+
combinedLog.WriteString("\n")
134170
}
135171
}
136-
failedTasks[task.DisplayName] = string(*task.State)
172+
if missingPod {
173+
combinedLog.WriteString("Pod logs unavailable; pod not found. Falling back to archived logs.\n")
174+
} else {
175+
combinedLog.WriteString("Live logs unavailable via kubectl logs.\n")
176+
}
137177
}
138-
default:
139-
{
140-
logger.Log("UNKNOWN state - Task %s for Run %s has an UNKNOWN state", task.DisplayName, task.RunID)
178+
179+
if missingPod {
180+
archivedLog, err := getArchivedLogWithCache(archivedLogCache, k8Client, namespace, task.RunID, podName)
181+
if err != nil {
182+
logger.Log("Failed to retrieve archived logs for pod %s: %v", podName, err)
183+
combinedLog.WriteString(fmt.Sprintf("Failed to retrieve archived logs via Argo Workflows: %v\n", err))
184+
} else if strings.TrimSpace(archivedLog) != "" {
185+
combinedLog.WriteString("----- Archived Logs (Argo) -----\n")
186+
combinedLog.WriteString(archivedLog)
187+
if !strings.HasSuffix(archivedLog, "\n") {
188+
combinedLog.WriteString("\n")
189+
}
190+
} else {
191+
combinedLog.WriteString("Archived logs were empty.\n")
192+
}
141193
}
194+
195+
combinedLog.WriteString("\n")
142196
}
197+
198+
entryContent := combinedLog.String()
199+
if strings.TrimSpace(entryContent) == "" {
200+
entryContent = fmt.Sprintf("No logs were available for failed task %s", task.DisplayName)
201+
}
202+
203+
logger.Log("Attaching logs to report for task %s", task.DisplayName)
204+
ginkgo.AddReportEntry(fmt.Sprintf("Failing '%s' Component Log", task.DisplayName), entryContent)
205+
logger.Log("Attached logs to the report for task %s", task.DisplayName)
206+
207+
failedTasks[task.DisplayName] = string(*task.State)
208+
default:
209+
logger.Log("UNKNOWN state - Task %s for Run %s has an UNKNOWN state", task.DisplayName, task.RunID)
143210
}
144211
}
145212
if len(failedTasks) > 0 {
146213
logger.Log("Found failed tasks: %v", maps.Keys(failedTasks))
147214
}
148215
}
149216

217+
func getArchivedLogWithCache(cache map[string]string, k8Client *kubernetes.Clientset, namespace, runID, podName string) (string, error) {
218+
cacheKey := fmt.Sprintf("%s::%s", runID, podName)
219+
if val, ok := cache[cacheKey]; ok {
220+
return val, nil
221+
}
222+
223+
logContent, err := retrieveArchivedLogs(k8Client, namespace, runID, podName)
224+
if err != nil {
225+
return "", err
226+
}
227+
228+
cache[cacheKey] = logContent
229+
return logContent, nil
230+
}
231+
232+
func retrieveArchivedLogs(k8Client *kubernetes.Clientset, namespace, runID, podName string) (string, error) {
233+
workflowName, err := resolveWorkflowNameForRun(k8Client, namespace, runID)
234+
if err != nil {
235+
return "", err
236+
}
237+
238+
cliPath, err := ensureArgoCLI()
239+
if err != nil {
240+
return "", err
241+
}
242+
243+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
244+
defer cancel()
245+
246+
cmd := exec.CommandContext(ctx, cliPath, "logs", workflowName, podName, "-n", namespace, "--no-color")
247+
output, err := cmd.CombinedOutput()
248+
if err != nil {
249+
if ctx.Err() == context.DeadlineExceeded {
250+
return "", fmt.Errorf("argo logs command timed out after 30s\n%s", string(output))
251+
}
252+
return "", fmt.Errorf("argo logs command failed: %w\n%s", err, string(output))
253+
}
254+
255+
return string(output), nil
256+
}
257+
258+
func hasMeaningfulLogs(logText string) bool {
259+
trimmed := strings.TrimSpace(logText)
260+
if trimmed == "" {
261+
return false
262+
}
263+
lower := strings.ToLower(trimmed)
264+
if strings.Contains(lower, "no pod logs available") {
265+
return false
266+
}
267+
if strings.Contains(lower, "could not find pod containing container") {
268+
return false
269+
}
270+
if strings.Contains(lower, "failed to stream pod logs") {
271+
return false
272+
}
273+
return true
274+
}
275+
276+
func ensureArgoCLI() (string, error) {
277+
return exec.LookPath("argo")
278+
}
279+
280+
func resolveWorkflowNameForRun(k8Client *kubernetes.Clientset, namespace, runID string) (string, error) {
281+
if runID == "" {
282+
return "", fmt.Errorf("run ID is empty")
283+
}
284+
285+
wfClient, err := getWorkflowClient()
286+
if err != nil {
287+
return "", err
288+
}
289+
290+
labelSelector := fmt.Sprintf("%s=%s", util.LabelKeyWorkflowRunId, runID)
291+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
292+
defer cancel()
293+
294+
workflows, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, metav1.ListOptions{
295+
LabelSelector: labelSelector,
296+
})
297+
if err != nil {
298+
if ctx.Err() == context.DeadlineExceeded {
299+
return "", fmt.Errorf("listing workflows timed out after 10s for run ID %s", runID)
300+
}
301+
return "", fmt.Errorf("failed to list workflows: %w", err)
302+
}
303+
304+
if len(workflows.Items) == 0 {
305+
return "", fmt.Errorf("no workflow found in namespace %s with run ID %s", namespace, runID)
306+
}
307+
308+
return workflows.Items[0].Name, nil
309+
}
310+
311+
func getWorkflowClient() (versioned.Interface, error) {
312+
workflowClientOnce.Do(func() {
313+
restConfig, err := util.GetKubernetesConfig()
314+
if err != nil {
315+
workflowClientErr = fmt.Errorf("failed to create kubernetes config: %w", err)
316+
return
317+
}
318+
319+
workflowClient, workflowClientErr = versioned.NewForConfig(restConfig)
320+
})
321+
return workflowClient, workflowClientErr
322+
}
323+
150324
type TaskDetails struct {
151325
TaskName string
152326
Task v1alpha1.DAGTask

0 commit comments

Comments
 (0)