Skip to content

Commit 045ac64

Browse files
authored
fix: revert non-source updates to AppProjects and Repositories (#596)
Signed-off-by: Chetan Banavikalmutt <[email protected]>
1 parent f957238 commit 045ac64

File tree

21 files changed

+1093
-325
lines changed

21 files changed

+1093
-325
lines changed

agent/agent.go

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject"
2929
kubenamespace "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/namespace"
3030
kuberepository "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/repository"
31+
"github.com/argoproj-labs/argocd-agent/internal/cache"
3132
"github.com/argoproj-labs/argocd-agent/internal/config"
3233
"github.com/argoproj-labs/argocd-agent/internal/event"
3334
"github.com/argoproj-labs/argocd-agent/internal/informer"
@@ -49,7 +50,6 @@ import (
4950
"k8s.io/apimachinery/pkg/runtime"
5051
"k8s.io/apimachinery/pkg/watch"
5152

52-
appCache "github.com/argoproj-labs/argocd-agent/internal/cache"
5353
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
5454
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
5555
appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate"
@@ -104,6 +104,13 @@ type Agent struct {
104104

105105
cacheRefreshInterval time.Duration
106106
clusterCache *appstatecache.Cache
107+
108+
// sourceCache is a cache of resources from the source. We use it to revert any changes made to the local resources.
109+
sourceCache *cache.SourceCache
110+
111+
// deletions tracks valid deletions from the source.
112+
// This is used to differentiate between valid and invalid deletions
113+
deletions *manager.DeletionTracker
107114
}
108115

109116
const defaultQueueName = "default"
@@ -129,7 +136,9 @@ type AgentOption func(*Agent) error
129136
// options.
130137
func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace string, opts ...AgentOption) (*Agent, error) {
131138
a := &Agent{
132-
version: version.New("argocd-agent"),
139+
version: version.New("argocd-agent"),
140+
deletions: manager.NewDeletionTracker(),
141+
sourceCache: cache.NewSourceCache(),
133142
}
134143
a.infStopCh = make(chan struct{})
135144
a.namespace = namespace
@@ -198,6 +207,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
198207
appManagerOpts := []application.ApplicationManagerOption{
199208
application.WithRole(manager.ManagerRoleAgent),
200209
application.WithMode(managerMode),
210+
application.WithDeletionTracker(a.deletions),
201211
}
202212

203213
if a.options.metricsPort > 0 {
@@ -322,20 +332,12 @@ func (a *Agent) Start(ctx context.Context) error {
322332
a.context = infCtx
323333
a.cancelFn = cancelFn
324334

325-
// For managed-agent we need to maintain a cache to keep applications in sync with last known state of
326-
// principal in case agent is disconnected with principal or application in managed-cluster is modified.
335+
// For managed-agent we need to maintain a cache to keep resources in sync with last known state of
336+
// principal in case agent is disconnected with principal or resources in managed-cluster are modified.
327337
if a.mode == types.AgentModeManaged {
328-
log().Infof("Recreating application spec cache from existing resources on cluster")
329-
appList, err := a.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{a.namespace}})
330-
if err != nil {
331-
log().Errorf("Error while fetching list of applications: %v", err)
332-
}
333-
334-
for _, app := range appList {
335-
sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation]
336-
if exists {
337-
appCache.SetApplicationSpec(ty.UID(sourceUID), app.Spec, log())
338-
}
338+
if err := a.populateSourceCache(ctx); err != nil {
339+
log().WithError(err).Error("failed to populate the source cache")
340+
return err
339341
}
340342
}
341343

@@ -485,3 +487,47 @@ func (a *Agent) healthzHandler(w http.ResponseWriter, r *http.Request) {
485487
w.WriteHeader(http.StatusServiceUnavailable)
486488
}
487489
}
490+
491+
func (a *Agent) populateSourceCache(ctx context.Context) error {
492+
log().Infof("Recreating application spec cache from existing resources on cluster")
493+
appList, err := a.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{a.namespace}})
494+
if err != nil {
495+
return err
496+
}
497+
498+
for _, app := range appList {
499+
sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation]
500+
if exists {
501+
a.sourceCache.Application.Set(ty.UID(sourceUID), app.Spec)
502+
}
503+
}
504+
505+
log().Infof("Recreating appProject spec cache from existing resources on cluster")
506+
appProjectList, err := a.projectManager.List(ctx, backend.AppProjectSelector{Namespace: a.namespace})
507+
if err != nil {
508+
return err
509+
}
510+
511+
for _, appProject := range appProjectList {
512+
sourceUID, exists := appProject.Annotations[manager.SourceUIDAnnotation]
513+
if exists {
514+
a.sourceCache.AppProject.Set(ty.UID(sourceUID), appProject.Spec)
515+
}
516+
}
517+
518+
log().Infof("Recreating repository spec cache from existing resources on cluster")
519+
repoList, err := a.repoManager.List(ctx, backend.RepositorySelector{Namespace: a.namespace})
520+
if err != nil {
521+
return err
522+
}
523+
524+
for _, repo := range repoList {
525+
sourceUID, exists := repo.Annotations[manager.SourceUIDAnnotation]
526+
if exists {
527+
a.sourceCache.Repository.Set(ty.UID(sourceUID), repo.Data)
528+
}
529+
}
530+
531+
log().Infof("Source cache populated successfully")
532+
return nil
533+
}

agent/inbound.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"time"
2020

2121
"github.com/argoproj-labs/argocd-agent/internal/backend"
22-
appCache "github.com/argoproj-labs/argocd-agent/internal/cache"
2322
"github.com/argoproj-labs/argocd-agent/internal/checkpoint"
2423
"github.com/argoproj-labs/argocd-agent/internal/event"
2524
"github.com/argoproj-labs/argocd-agent/internal/manager"
@@ -30,6 +29,7 @@ import (
3029
"github.com/sirupsen/logrus"
3130
corev1 "k8s.io/api/core/v1"
3231
apierrors "k8s.io/apimachinery/pkg/api/errors"
32+
ktypes "k8s.io/apimachinery/pkg/types"
3333
"k8s.io/client-go/dynamic"
3434
)
3535

@@ -470,7 +470,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
470470

471471
if a.mode == types.AgentModeManaged {
472472
// Store app spec in cache
473-
appCache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx)
473+
a.sourceCache.Application.Set(incoming.UID, incoming.Spec)
474474
}
475475

476476
created, err := a.appManager.Create(a.context, incoming)
@@ -513,7 +513,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App
513513

514514
// Update app spec in cache
515515
logCtx.Tracef("Calling update spec for this event")
516-
appCache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx)
516+
a.sourceCache.Application.Set(incoming.UID, incoming.Spec)
517517

518518
napp, err = a.appManager.UpdateManagedApp(a.context, incoming)
519519
case types.AgentModeAutonomous:
@@ -544,21 +544,30 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
544544

545545
logCtx.Infof("Deleting application")
546546

547+
// Fetch the source UID of the existing app to mark it as expected deletion.
548+
app, err := a.appManager.Get(a.context, app.Name, app.Namespace)
549+
if err != nil {
550+
return err
551+
}
552+
553+
sourceUID := app.Annotations[manager.SourceUIDAnnotation]
554+
a.deletions.MarkExpected(ktypes.UID(sourceUID))
555+
547556
deletionPropagation := backend.DeletePropagationBackground
548-
err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation)
557+
err = a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation)
549558
if err != nil {
550559
if apierrors.IsNotFound(err) {
551560
logCtx.Debug("application is not found, perhaps it is already deleted")
552561
if a.mode == types.AgentModeManaged {
553-
appCache.DeleteApplicationSpec(app.UID, logCtx)
562+
a.sourceCache.Application.Delete(app.UID)
554563
}
555564
return nil
556565
}
557566
return err
558567
}
559568

560569
if a.mode == types.AgentModeManaged {
561-
appCache.DeleteApplicationSpec(app.UID, logCtx)
570+
a.sourceCache.Application.Delete(app.UID)
562571
}
563572

564573
err = a.appManager.Unmanage(app.QualifiedName())
@@ -595,6 +604,8 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
595604

596605
logCtx.Infof("Creating a new AppProject on behalf of an incoming event")
597606

607+
a.sourceCache.AppProject.Set(incoming.UID, incoming.Spec)
608+
598609
// Get rid of some fields that we do not want to have on the AppProject
599610
// as we start fresh.
600611
if incoming.Annotations != nil {
@@ -632,6 +643,8 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
632643

633644
logCtx.Infof("Updating appProject")
634645

646+
a.sourceCache.AppProject.Set(incoming.UID, incoming.Spec)
647+
635648
logCtx.Tracef("Calling update spec for this event")
636649
return a.projectManager.UpdateAppProject(a.context, incoming)
637650

@@ -656,15 +669,28 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {
656669

657670
logCtx.Infof("Deleting appProject")
658671

672+
// Fetch the source UID of the existing appProject to mark it as expected deletion.
673+
project, err := a.projectManager.Get(a.context, project.Name, project.Namespace)
674+
if err != nil {
675+
return err
676+
}
677+
678+
sourceUID := project.Annotations[manager.SourceUIDAnnotation]
679+
a.deletions.MarkExpected(ktypes.UID(sourceUID))
680+
659681
deletionPropagation := backend.DeletePropagationBackground
660-
err := a.projectManager.Delete(a.context, project, &deletionPropagation)
682+
err = a.projectManager.Delete(a.context, project, &deletionPropagation)
661683
if err != nil {
662684
if apierrors.IsNotFound(err) {
663685
logCtx.Debug("appProject not found, perhaps it is already deleted")
686+
a.sourceCache.AppProject.Delete(project.UID)
664687
return nil
665688
}
666689
return err
667690
}
691+
692+
a.sourceCache.AppProject.Delete(project.UID)
693+
668694
err = a.projectManager.Unmanage(project.Name)
669695
if err != nil {
670696
log().Warnf("Could not unmanage appProject %s: %v", project.Name, err)
@@ -702,6 +728,8 @@ func (a *Agent) createRepository(incoming *corev1.Secret) (*corev1.Secret, error
702728
incoming.Annotations = make(map[string]string)
703729
}
704730

731+
a.sourceCache.Repository.Set(incoming.UID, incoming.Data)
732+
705733
// Get rid of some fields that we do not want to have on the repository as we start fresh.
706734
delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration")
707735

@@ -738,6 +766,8 @@ func (a *Agent) updateRepository(incoming *corev1.Secret) (*corev1.Secret, error
738766

739767
logCtx.Infof("Updating repository")
740768

769+
a.sourceCache.Repository.Set(incoming.UID, incoming.Data)
770+
741771
return a.repoManager.UpdateManagedRepository(a.context, incoming)
742772
}
743773

@@ -756,16 +786,28 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error {
756786

757787
logCtx.Infof("Deleting repository")
758788

789+
// Fetch the source UID of the existing repository to mark it as expected deletion.
790+
repo, err := a.repoManager.Get(a.context, repo.Name, repo.Namespace)
791+
if err != nil {
792+
return err
793+
}
794+
795+
sourceUID := repo.Annotations[manager.SourceUIDAnnotation]
796+
a.deletions.MarkExpected(ktypes.UID(sourceUID))
797+
759798
deletionPropagation := backend.DeletePropagationBackground
760-
err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation)
799+
err = a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation)
761800
if err != nil {
762801
if apierrors.IsNotFound(err) {
763802
logCtx.Debug("repository is not found, perhaps it is already deleted")
803+
a.sourceCache.Repository.Delete(repo.UID)
764804
return nil
765805
}
766806
return err
767807
}
768808

809+
a.sourceCache.Repository.Delete(repo.UID)
810+
769811
err = a.repoManager.Unmanage(repo.Name)
770812
if err != nil {
771813
log().Warnf("Could not unmanage repository %s: %v", repo.Name, err)

0 commit comments

Comments
 (0)