Skip to content

Commit dca369b

Browse files
authored
Sandbox get pod from warmpool (#115)
1 parent b5ab85e commit dca369b

12 files changed

+862
-3876
lines changed

controllers/sandbox_controller.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
k8serrors "k8s.io/apimachinery/pkg/api/errors"
2727
"k8s.io/apimachinery/pkg/api/meta"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/labels"
2930
"k8s.io/apimachinery/pkg/runtime"
3031
"k8s.io/apimachinery/pkg/types"
3132
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -42,6 +43,7 @@ import (
4243

4344
const (
4445
sandboxLabel = "agents.x-k8s.io/sandbox-name-hash"
46+
SanboxPodNameAnnotation = "agents.x-k8s.io/pod-name"
4547
sandboxControllerFieldOwner = "sandbox-controller"
4648
)
4749

@@ -290,13 +292,45 @@ func (r *SandboxReconciler) reconcileService(ctx context.Context, sandbox *sandb
290292

291293
func (r *SandboxReconciler) reconcilePod(ctx context.Context, sandbox *sandboxv1alpha1.Sandbox, nameHash string) (*corev1.Pod, error) {
292294
log := log.FromContext(ctx)
295+
296+
// List all pods with the pool label matching the warm pool name hash
297+
// TODO: find a better way to make sure one sandbox has at most one pod
298+
podList := &corev1.PodList{}
299+
labelSelector := labels.SelectorFromSet(labels.Set{
300+
sandboxLabel: nameHash,
301+
})
302+
303+
if err := r.List(ctx, podList, &client.ListOptions{
304+
LabelSelector: labelSelector,
305+
Namespace: sandbox.Namespace,
306+
}); err != nil {
307+
log.Error(err, "Failed to list pods")
308+
}
309+
310+
if len(podList.Items) > 1 {
311+
log.Info("Multiple pods found for sandbox, this should not happen", "Sandbox", sandbox.Name, "PodCount", len(podList.Items))
312+
}
313+
314+
// Determine the pod name to look up
315+
podName := sandbox.Name
316+
var trackedPodName string
317+
var podNameAnnotationExists bool
318+
if trackedPodName, podNameAnnotationExists = sandbox.Annotations[SanboxPodNameAnnotation]; podNameAnnotationExists && trackedPodName != "" {
319+
podName = trackedPodName
320+
log.Info("Using tracked pod name from sandbox annotation", "podName", podName)
321+
}
322+
293323
pod := &corev1.Pod{}
294-
err := r.Get(ctx, types.NamespacedName{Name: sandbox.Name, Namespace: sandbox.Namespace}, pod)
324+
err := r.Get(ctx, types.NamespacedName{Name: podName, Namespace: sandbox.Namespace}, pod)
295325
if err != nil {
296326
if !k8serrors.IsNotFound(err) {
297327
log.Error(err, "Failed to get Pod")
298328
return nil, fmt.Errorf("Pod Get Failed: %w", err)
299329
}
330+
if k8serrors.IsNotFound(err) && podNameAnnotationExists {
331+
log.Error(err, "Pod not found")
332+
return nil, fmt.Errorf("Pod in Annotation Get Failed: %w", err)
333+
}
300334
pod = nil
301335
}
302336

@@ -312,17 +346,46 @@ func (r *SandboxReconciler) reconcilePod(ctx context.Context, sandbox *sandboxv1
312346
log.Info("Pod is already being deleted", "Pod.Namespace", pod.Namespace, "Pod.Name", pod.Name)
313347
}
314348
}
349+
350+
// Remove the pod name annotation from the sandbox if it exists
351+
if _, exists := sandbox.Annotations[SanboxPodNameAnnotation]; exists {
352+
log.Info("Removing pod name annotation from sandbox", "Sandbox.Name", sandbox.Name)
353+
// Create a patch to update only the annotations
354+
patch := client.MergeFrom(sandbox.DeepCopy())
355+
delete(sandbox.Annotations, SanboxPodNameAnnotation)
356+
357+
if err := r.Patch(ctx, sandbox, patch); err != nil {
358+
return nil, fmt.Errorf("failed to remove pod name annotation: %w", err)
359+
}
360+
}
361+
315362
return nil, nil
316363
}
317364

318365
if pod != nil {
319366
log.Info("Found Pod", "Pod.Namespace", pod.Namespace, "Pod.Name", pod.Name)
367+
368+
if pod.Labels == nil {
369+
pod.Labels = make(map[string]string)
370+
}
371+
pod.Labels[sandboxLabel] = nameHash
372+
373+
// Set controller reference if the pod is not controlled by anything.
374+
if controllerRef := metav1.GetControllerOf(pod); controllerRef == nil {
375+
if err := ctrl.SetControllerReference(sandbox, pod, r.Scheme); err != nil {
376+
return nil, fmt.Errorf("SetControllerReference for Pod failed: %w", err)
377+
}
378+
}
379+
380+
if err := r.Update(ctx, pod); err != nil {
381+
return nil, fmt.Errorf("failed to update pod: %w", err)
382+
}
383+
320384
// TODO - Do we enfore (change) spec if a pod exists ?
321385
// r.Patch(ctx, pod, client.Apply, client.ForceOwnership, client.FieldOwner("sandbox-controller"))
322386
return pod, nil
323387
}
324388

325-
// Create a pod object from the sandbox
326389
log.Info("Creating a new Pod", "Pod.Namespace", sandbox.Namespace, "Pod.Name", sandbox.Name)
327390
labels := map[string]string{
328391
sandboxLabel: nameHash,
@@ -365,6 +428,7 @@ func (r *SandboxReconciler) reconcilePod(ctx context.Context, sandbox *sandboxv1
365428
log.Error(err, "Failed to create", "Pod.Namespace", pod.Namespace, "Pod.Name", pod.Name)
366429
return nil, err
367430
}
431+
368432
return pod, nil
369433
}
370434

controllers/sandbox_controller_test.go

Lines changed: 208 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -478,14 +478,15 @@ func TestReconcilePod(t *testing.T) {
478478
},
479479
}
480480
testCases := []struct {
481-
name string
482-
initialObjs []runtime.Object
483-
sandbox *sandboxv1alpha1.Sandbox
484-
wantPod *corev1.Pod
485-
expectErr bool
481+
name string
482+
initialObjs []runtime.Object
483+
sandbox *sandboxv1alpha1.Sandbox
484+
wantPod *corev1.Pod
485+
expectErr bool
486+
wantSandboxAnnotations map[string]string
486487
}{
487488
{
488-
name: "no-op if Pod already exists",
489+
name: "updates label and owner reference if Pod already exists",
489490
initialObjs: []runtime.Object{
490491
&corev1.Pod{
491492
ObjectMeta: metav1.ObjectMeta{
@@ -503,11 +504,15 @@ func TestReconcilePod(t *testing.T) {
503504
},
504505
},
505506
sandbox: sandboxObj,
506-
wantPod: &corev1.Pod{ // Pod is not updated
507+
wantPod: &corev1.Pod{
507508
ObjectMeta: metav1.ObjectMeta{
508509
Name: sandboxName,
509510
Namespace: sandboxNs,
510-
ResourceVersion: "1",
511+
ResourceVersion: "2",
512+
Labels: map[string]string{
513+
"agents.x-k8s.io/sandbox-name-hash": nameHash,
514+
},
515+
OwnerReferences: []metav1.OwnerReference{sandboxControllerRef(sandboxName)},
511516
},
512517
Spec: corev1.PodSpec{
513518
Containers: []corev1.Container{
@@ -578,12 +583,186 @@ func TestReconcilePod(t *testing.T) {
578583
},
579584
wantPod: nil,
580585
},
586+
{
587+
name: "adopts existing pod via annotation - pod gets label and owner reference",
588+
initialObjs: []runtime.Object{
589+
&corev1.Pod{
590+
ObjectMeta: metav1.ObjectMeta{
591+
Name: "adopted-pod-name",
592+
Namespace: sandboxNs,
593+
ResourceVersion: "1",
594+
},
595+
Spec: corev1.PodSpec{
596+
Containers: []corev1.Container{
597+
{
598+
Name: "existing-container",
599+
},
600+
},
601+
},
602+
},
603+
},
604+
sandbox: &sandboxv1alpha1.Sandbox{
605+
ObjectMeta: metav1.ObjectMeta{
606+
Name: sandboxName,
607+
Namespace: sandboxNs,
608+
Annotations: map[string]string{
609+
SanboxPodNameAnnotation: "adopted-pod-name",
610+
},
611+
},
612+
Spec: sandboxv1alpha1.SandboxSpec{
613+
Replicas: ptr.To(int32(1)),
614+
PodTemplate: sandboxv1alpha1.PodTemplate{
615+
Spec: corev1.PodSpec{
616+
Containers: []corev1.Container{
617+
{
618+
Name: "test-container",
619+
},
620+
},
621+
},
622+
},
623+
},
624+
},
625+
wantPod: &corev1.Pod{
626+
ObjectMeta: metav1.ObjectMeta{
627+
Name: "adopted-pod-name",
628+
Namespace: sandboxNs,
629+
ResourceVersion: "2",
630+
Labels: map[string]string{
631+
sandboxLabel: nameHash,
632+
},
633+
OwnerReferences: []metav1.OwnerReference{sandboxControllerRef(sandboxName)},
634+
},
635+
Spec: corev1.PodSpec{
636+
Containers: []corev1.Container{
637+
{
638+
Name: "existing-container",
639+
},
640+
},
641+
},
642+
},
643+
expectErr: false,
644+
},
645+
{
646+
name: "does not change controller if Pod already has a different controller",
647+
initialObjs: []runtime.Object{
648+
&corev1.Pod{
649+
ObjectMeta: metav1.ObjectMeta{
650+
Name: sandboxName,
651+
Namespace: sandboxNs,
652+
ResourceVersion: "1",
653+
// Add a controller reference to a different controller
654+
OwnerReferences: []metav1.OwnerReference{
655+
{
656+
APIVersion: "apps/v1",
657+
Kind: "Deployment",
658+
Name: "some-other-controller",
659+
UID: "some-other-uid",
660+
Controller: ptr.To(true),
661+
BlockOwnerDeletion: ptr.To(true),
662+
},
663+
},
664+
},
665+
Spec: corev1.PodSpec{
666+
Containers: []corev1.Container{
667+
{
668+
Name: "foo",
669+
},
670+
},
671+
},
672+
},
673+
},
674+
sandbox: sandboxObj,
675+
// The pod should still have the original controller reference
676+
wantPod: &corev1.Pod{
677+
ObjectMeta: metav1.ObjectMeta{
678+
Name: sandboxName,
679+
Namespace: sandboxNs,
680+
ResourceVersion: "2",
681+
Labels: map[string]string{
682+
"agents.x-k8s.io/sandbox-name-hash": nameHash,
683+
},
684+
// Should still have the original controller reference
685+
OwnerReferences: []metav1.OwnerReference{
686+
{
687+
APIVersion: "apps/v1",
688+
Kind: "Deployment",
689+
Name: "some-other-controller",
690+
UID: "some-other-uid",
691+
Controller: ptr.To(true),
692+
BlockOwnerDeletion: ptr.To(true),
693+
},
694+
},
695+
},
696+
Spec: corev1.PodSpec{
697+
Containers: []corev1.Container{
698+
{
699+
Name: "foo",
700+
},
701+
},
702+
},
703+
},
704+
},
705+
{
706+
name: "error when annotated pod does not exist",
707+
initialObjs: []runtime.Object{},
708+
sandbox: &sandboxv1alpha1.Sandbox{
709+
ObjectMeta: metav1.ObjectMeta{
710+
Name: sandboxName,
711+
Namespace: sandboxNs,
712+
Annotations: map[string]string{
713+
SanboxPodNameAnnotation: "non-existent-pod",
714+
},
715+
},
716+
Spec: sandboxv1alpha1.SandboxSpec{
717+
Replicas: ptr.To(int32(1)),
718+
PodTemplate: sandboxv1alpha1.PodTemplate{
719+
Spec: corev1.PodSpec{
720+
Containers: []corev1.Container{
721+
{
722+
Name: "test-container",
723+
},
724+
},
725+
},
726+
},
727+
},
728+
},
729+
wantPod: nil,
730+
expectErr: true,
731+
},
732+
{
733+
name: "remove pod name annotation when replicas is 0",
734+
initialObjs: []runtime.Object{
735+
&corev1.Pod{
736+
ObjectMeta: metav1.ObjectMeta{
737+
Name: "annotated-pod-name",
738+
Namespace: sandboxNs,
739+
ResourceVersion: "1",
740+
},
741+
},
742+
},
743+
sandbox: &sandboxv1alpha1.Sandbox{
744+
ObjectMeta: metav1.ObjectMeta{
745+
Name: sandboxName,
746+
Namespace: sandboxNs,
747+
Annotations: map[string]string{
748+
SanboxPodNameAnnotation: "annotated-pod-name",
749+
"other-annotation": "other-value",
750+
},
751+
},
752+
Spec: sandboxv1alpha1.SandboxSpec{
753+
Replicas: ptr.To(int32(0)),
754+
},
755+
},
756+
wantPod: nil,
757+
expectErr: false,
758+
wantSandboxAnnotations: map[string]string{"other-annotation": "other-value"},
759+
},
581760
}
582761

583762
for _, tc := range testCases {
584763
t.Run(tc.name, func(t *testing.T) {
585764
r := SandboxReconciler{
586-
Client: newFakeClient(tc.initialObjs...),
765+
Client: newFakeClient(append(tc.initialObjs, tc.sandbox)...),
587766
Scheme: Scheme,
588767
}
589768

@@ -594,17 +773,35 @@ func TestReconcilePod(t *testing.T) {
594773
require.NoError(t, err)
595774
}
596775
require.Equal(t, tc.wantPod, pod)
776+
597777
// Validate the Pod from the "cluster" (fake client)
598778
if tc.wantPod != nil {
599779
livePod := &corev1.Pod{}
600780
err = r.Get(t.Context(), types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, livePod)
601781
require.NoError(t, err)
602782
require.Equal(t, tc.wantPod, livePod)
603-
} else {
783+
} else if !tc.expectErr {
784+
// When wantPod is nil and no error expected, verify pod doesn't exist
604785
livePod := &corev1.Pod{}
605-
err = r.Get(t.Context(), types.NamespacedName{Name: sandboxName, Namespace: sandboxNs}, livePod)
786+
podName := sandboxName
787+
// Check if there's an annotation with a non-empty value
788+
if annotatedPod, exists := tc.sandbox.Annotations[SanboxPodNameAnnotation]; exists && annotatedPod != "" {
789+
podName = annotatedPod
790+
}
791+
err = r.Get(t.Context(), types.NamespacedName{Name: podName, Namespace: sandboxNs}, livePod)
606792
require.True(t, k8serrors.IsNotFound(err))
607793
}
794+
795+
// Check if sandbox annotations were updated as expected
796+
if tc.wantSandboxAnnotations != nil {
797+
// Fetch the sandbox to see if annotations were updated
798+
liveSandbox := &sandboxv1alpha1.Sandbox{}
799+
err = r.Get(t.Context(), types.NamespacedName{Name: tc.sandbox.Name, Namespace: tc.sandbox.Namespace}, liveSandbox)
800+
require.NoError(t, err)
801+
802+
// Check if the annotations match what we expect
803+
require.Equal(t, tc.wantSandboxAnnotations, liveSandbox.Annotations)
804+
}
608805
})
609806
}
610807
}

0 commit comments

Comments
 (0)