Skip to content

Commit 2609b4b

Browse files
authored
Continue when failing the concurrency (#1810)
1 parent 7e6c443 commit 2609b4b

File tree

8 files changed

+401
-72
lines changed

8 files changed

+401
-72
lines changed

pkg/reconciler/finalizer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, pr *tektonv1.PipelineRun)
4545
repo.Spec.Merge(r.globalRepo.Spec)
4646
}
4747
logger = logger.With("namespace", repo.Namespace)
48-
next := r.qm.RemoveFromQueue(repo, pr)
48+
next := r.qm.RemoveAndTakeItemFromQueue(repo, pr)
4949
if next != "" {
5050
key := strings.Split(next, "/")
5151
pr, err := r.run.Clients.Tekton.TektonV1().PipelineRuns(key[0]).Get(ctx, key[1], metav1.GetOptions{})
@@ -54,7 +54,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, pr *tektonv1.PipelineRun)
5454
}
5555
if err := r.
5656
updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
57-
logger.Error("failed to update status: ", err)
57+
logger.Errorf("failed to update status: %w", err)
5858
return err
5959
}
6060
return nil

pkg/reconciler/queue_pipelineruns.go

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"strings"
77

88
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
9-
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
9+
pacAPIv1alpha1 "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
1010
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
1111
"github.com/openshift-pipelines/pipelines-as-code/pkg/sync"
1212
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
@@ -22,52 +22,84 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo
2222
return nil
2323
}
2424

25-
repoName := pr.GetAnnotations()[keys.Repository]
25+
// check if annotation exist
26+
repoName, exist := pr.GetAnnotations()[keys.Repository]
27+
if !exist {
28+
return fmt.Errorf("no %s annotation found", keys.Repository)
29+
}
30+
if repoName == "" {
31+
return fmt.Errorf("annotation %s is empty", keys.Repository)
32+
}
2633
repo, err := r.repoLister.Repositories(pr.Namespace).Get(repoName)
2734
if err != nil {
2835
// if repository is not found, then skip processing the pipelineRun and return nil
2936
if errors.IsNotFound(err) {
30-
r.qm.RemoveRepository(&v1alpha1.Repository{ObjectMeta: metav1.ObjectMeta{
31-
Name: repoName,
32-
Namespace: pr.Namespace,
33-
}})
37+
r.qm.RemoveRepository(&pacAPIv1alpha1.Repository{
38+
ObjectMeta: metav1.ObjectMeta{
39+
Name: repoName,
40+
Namespace: pr.Namespace,
41+
},
42+
})
3443
return nil
3544
}
36-
return fmt.Errorf("updateError: %w", err)
45+
return fmt.Errorf("error getting PipelineRun: %w", err)
3746
}
3847

3948
// merge local repo with global repo here in order to derive settings from global
4049
// for further concurrency and other operations.
4150
if r.globalRepo, err = r.repoLister.Repositories(r.run.Info.Kube.Namespace).Get(r.run.Info.Controller.GlobalRepository); err == nil && r.globalRepo != nil {
51+
logger.Info("Merging global repository settings with local repository settings")
4252
repo.Spec.Merge(r.globalRepo.Spec)
4353
}
4454

4555
// if concurrency was set and later removed or changed to zero
4656
// then remove pipelineRun from Queue and update pending state to running
4757
if repo.Spec.ConcurrencyLimit != nil && *repo.Spec.ConcurrencyLimit == 0 {
48-
_ = r.qm.RemoveFromQueue(repo, pr)
58+
_ = r.qm.RemoveAndTakeItemFromQueue(repo, pr)
4959
if err := r.updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
5060
return fmt.Errorf("failed to update PipelineRun to in_progress: %w", err)
5161
}
5262
return nil
5363
}
5464

55-
orderedList := sync.FilterPipelineRunByState(ctx, r.run.Clients.Tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
56-
acquired, err := r.qm.AddListToRunningQueue(repo, orderedList)
57-
if err != nil {
58-
return fmt.Errorf("failed to add to queue: %s: %w", pr.GetName(), err)
59-
}
65+
var processed bool
66+
var itered int
67+
maxIterations := 5
6068

61-
for _, prKeys := range acquired {
62-
nsName := strings.Split(prKeys, "/")
63-
pr, err = r.run.Clients.Tekton.TektonV1().PipelineRuns(nsName[0]).Get(ctx, nsName[1], metav1.GetOptions{})
69+
orderedList := sync.FilterPipelineRunByState(ctx, r.run.Clients.Tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
70+
for {
71+
acquired, err := r.qm.AddListToRunningQueue(repo, orderedList)
6472
if err != nil {
65-
logger.Info("failed to get pr with namespace and name: ", nsName[0], nsName[1])
66-
return err
73+
return fmt.Errorf("failed to add to queue: %s: %w", pr.GetName(), err)
6774
}
68-
if err := r.updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
69-
return fmt.Errorf("failed to update pipelineRun to in_progress: %w", err)
75+
if len(acquired) == 0 {
76+
logger.Infof("no new PipelineRun acquired for repo %s", repo.GetName())
77+
break
78+
}
79+
80+
for _, prKeys := range acquired {
81+
nsName := strings.Split(prKeys, "/")
82+
repoKey := sync.RepoKey(repo)
83+
pr, err = r.run.Clients.Tekton.TektonV1().PipelineRuns(nsName[0]).Get(ctx, nsName[1], metav1.GetOptions{})
84+
if err != nil {
85+
logger.Info("failed to get pr with namespace and name: ", nsName[0], nsName[1])
86+
_ = r.qm.RemoveFromQueue(repoKey, prKeys)
87+
} else {
88+
if err := r.updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
89+
logger.Errorf("failed to update pipelineRun to in_progress: %w", err)
90+
_ = r.qm.RemoveFromQueue(repoKey, prKeys)
91+
} else {
92+
processed = true
93+
}
94+
}
95+
}
96+
if processed {
97+
break
98+
}
99+
if itered >= maxIterations {
100+
return fmt.Errorf("max iterations reached of %d times trying to get a pipelinerun started for %s", maxIterations, repo.GetName())
70101
}
102+
itered++
71103
}
72104
return nil
73105
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package reconciler
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
8+
pacv1alpha1 "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
9+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params"
10+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/clients"
11+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/info"
12+
testclient "github.com/openshift-pipelines/pipelines-as-code/pkg/test/clients"
13+
testconcurrency "github.com/openshift-pipelines/pipelines-as-code/pkg/test/concurrency"
14+
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
15+
"go.uber.org/zap"
16+
zapobserver "go.uber.org/zap/zaptest/observer"
17+
"gotest.tools/v3/assert"
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
rtesting "knative.dev/pkg/reconciler/testing"
20+
)
21+
22+
func TestQueuePipelineRun(t *testing.T) {
23+
tests := []struct {
24+
name string
25+
wantErrString string
26+
wantLog string
27+
pipelineRun *tektonv1.PipelineRun
28+
testRepo *pacv1alpha1.Repository
29+
globalRepo *pacv1alpha1.Repository
30+
runningQueue []string
31+
}{
32+
{
33+
name: "no existing order annotation",
34+
pipelineRun: &tektonv1.PipelineRun{
35+
ObjectMeta: metav1.ObjectMeta{
36+
Annotations: map[string]string{},
37+
},
38+
},
39+
},
40+
{
41+
name: "no repo name annotation",
42+
pipelineRun: &tektonv1.PipelineRun{
43+
ObjectMeta: metav1.ObjectMeta{
44+
Annotations: map[string]string{
45+
keys.ExecutionOrder: "repo/foo1",
46+
},
47+
},
48+
},
49+
wantErrString: fmt.Sprintf("no %s annotation found", keys.Repository),
50+
},
51+
{
52+
name: "empty repo name annotation",
53+
pipelineRun: &tektonv1.PipelineRun{
54+
ObjectMeta: metav1.ObjectMeta{
55+
Annotations: map[string]string{
56+
keys.ExecutionOrder: "repo/foo1",
57+
keys.Repository: "",
58+
},
59+
},
60+
},
61+
wantErrString: fmt.Sprintf("annotation %s is empty", keys.Repository),
62+
},
63+
{
64+
name: "no repo found",
65+
pipelineRun: &tektonv1.PipelineRun{
66+
ObjectMeta: metav1.ObjectMeta{
67+
Annotations: map[string]string{
68+
keys.ExecutionOrder: "repo/foo1",
69+
keys.Repository: "foo",
70+
},
71+
},
72+
},
73+
},
74+
{
75+
name: "merging global repository settings",
76+
globalRepo: &pacv1alpha1.Repository{
77+
ObjectMeta: metav1.ObjectMeta{
78+
Name: "global",
79+
Namespace: "global",
80+
},
81+
Spec: pacv1alpha1.RepositorySpec{
82+
Settings: &pacv1alpha1.Settings{
83+
PipelineRunProvenance: "somewhere",
84+
},
85+
},
86+
},
87+
runningQueue: []string{},
88+
pipelineRun: &tektonv1.PipelineRun{
89+
ObjectMeta: metav1.ObjectMeta{
90+
Name: "test",
91+
Namespace: "test",
92+
Annotations: map[string]string{
93+
keys.ExecutionOrder: "repo/foo1",
94+
keys.Repository: "test",
95+
},
96+
},
97+
},
98+
testRepo: &pacv1alpha1.Repository{
99+
ObjectMeta: metav1.ObjectMeta{
100+
Name: "test",
101+
Namespace: "test",
102+
},
103+
Spec: pacv1alpha1.RepositorySpec{
104+
URL: randomURL,
105+
},
106+
},
107+
wantLog: "Merging global repository settings with local repository settings",
108+
},
109+
{
110+
name: "no new PR acquired",
111+
runningQueue: []string{},
112+
pipelineRun: &tektonv1.PipelineRun{
113+
ObjectMeta: metav1.ObjectMeta{
114+
Name: "test",
115+
Namespace: "test",
116+
Annotations: map[string]string{
117+
keys.ExecutionOrder: "repo/foo1",
118+
keys.Repository: "test",
119+
},
120+
},
121+
},
122+
testRepo: &pacv1alpha1.Repository{
123+
ObjectMeta: metav1.ObjectMeta{
124+
Name: "test",
125+
Namespace: "test",
126+
},
127+
Spec: pacv1alpha1.RepositorySpec{
128+
URL: randomURL,
129+
},
130+
},
131+
wantLog: "no new PipelineRun acquired for repo test",
132+
},
133+
{
134+
name: "failed to get PR from the Q after many iterations",
135+
runningQueue: []string{"test/test2"},
136+
pipelineRun: &tektonv1.PipelineRun{
137+
ObjectMeta: metav1.ObjectMeta{
138+
Name: "test",
139+
Namespace: "test",
140+
Annotations: map[string]string{
141+
keys.ExecutionOrder: "repo/foo1",
142+
keys.Repository: "test",
143+
},
144+
},
145+
},
146+
testRepo: &pacv1alpha1.Repository{
147+
ObjectMeta: metav1.ObjectMeta{
148+
Name: "test",
149+
Namespace: "test",
150+
},
151+
Spec: pacv1alpha1.RepositorySpec{
152+
URL: randomURL,
153+
},
154+
},
155+
wantLog: "failed to get PR",
156+
wantErrString: "max iterations reached of",
157+
},
158+
}
159+
for _, tt := range tests {
160+
t.Run(tt.name, func(t *testing.T) {
161+
observer, logcatch := zapobserver.New(zap.InfoLevel)
162+
fakelogger := zap.New(observer).Sugar()
163+
ctx, _ := rtesting.SetupFakeContext(t)
164+
repos := []*pacv1alpha1.Repository{}
165+
if tt.testRepo != nil {
166+
repos = append(repos, tt.testRepo)
167+
}
168+
if tt.globalRepo != nil {
169+
repos = append(repos, tt.globalRepo)
170+
}
171+
testData := testclient.Data{
172+
Repositories: repos,
173+
}
174+
stdata, informers := testclient.SeedTestData(t, ctx, testData)
175+
r := &Reconciler{
176+
qm: testconcurrency.TestQMI{
177+
RunningQueue: tt.runningQueue,
178+
},
179+
repoLister: informers.Repository.Lister(),
180+
run: &params.Run{
181+
Info: info.Info{
182+
Kube: &info.KubeOpts{
183+
Namespace: "global",
184+
},
185+
Controller: &info.ControllerInfo{},
186+
},
187+
Clients: clients.Clients{
188+
PipelineAsCode: stdata.PipelineAsCode,
189+
Tekton: stdata.Pipeline,
190+
Kube: stdata.Kube,
191+
Log: fakelogger,
192+
},
193+
},
194+
}
195+
if tt.globalRepo != nil {
196+
r.run.Info.Controller.GlobalRepository = tt.globalRepo.GetName()
197+
}
198+
err := r.queuePipelineRun(ctx, fakelogger, tt.pipelineRun)
199+
if tt.wantErrString != "" {
200+
assert.ErrorContains(t, err, tt.wantErrString)
201+
return
202+
}
203+
assert.NilError(t, err)
204+
205+
if tt.wantLog != "" {
206+
assert.Assert(t, logcatch.FilterMessage(tt.wantLog).Len() != 0, "We didn't get the expected log message", logcatch.All())
207+
}
208+
})
209+
}
210+
}

pkg/reconciler/reconciler.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Reconciler struct {
3838
repoLister pacapi.RepositoryLister
3939
pipelineRunLister tektonv1lister.PipelineRunLister
4040
kinteract kubeinteraction.Interface
41-
qm *sync.QueueManager
41+
qm sync.QueueManagerInterface
4242
metrics *metrics.Recorder
4343
eventEmitter *events.EventEmitter
4444
globalRepo *v1alpha1.Repository
@@ -198,17 +198,24 @@ func (r *Reconciler) reportFinalStatus(ctx context.Context, logger *zap.SugaredL
198198
}
199199

200200
// remove pipelineRun from Queue and start the next one
201-
next := r.qm.RemoveFromQueue(repo, pr)
202-
if next != "" {
201+
for {
202+
next := r.qm.RemoveAndTakeItemFromQueue(repo, pr)
203+
if next == "" {
204+
break
205+
}
203206
key := strings.Split(next, "/")
204207
pr, err := r.run.Clients.Tekton.TektonV1().PipelineRuns(key[0]).Get(ctx, key[1], metav1.GetOptions{})
205208
if err != nil {
206-
return repo, fmt.Errorf("cannot get pipeline for next in queue: %w", err)
209+
logger.Errorf("cannot get pipeline for next in queue: %w", err)
210+
continue
207211
}
208212

209213
if err := r.updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
210-
return repo, fmt.Errorf("failed to update status: %w", err)
214+
logger.Errorf("failed to update status: %w", err)
215+
_ = r.qm.RemoveFromQueue(sync.RepoKey(repo), sync.PrKey(pr))
216+
continue
211217
}
218+
break
212219
}
213220

214221
if err := r.cleanupPipelineRuns(ctx, logger, pacInfo, repo, pr); err != nil {

0 commit comments

Comments
 (0)