@@ -11,8 +11,10 @@ import (
1111 "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1212 "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
1313 . "github.com/onsi/gomega"
14+ "github.com/prometheus/client_golang/prometheus"
1415
1516 "github.com/Azure/go-shuttle/v2"
17+ "github.com/Azure/go-shuttle/v2/metrics/processor"
1618)
1719
1820type fakeSBLockRenewer struct {
@@ -150,24 +152,40 @@ func Test_RenewPeriodically_Error(t *testing.T) {
150152 isRenewerCanceled bool
151153 cancelCtxOnStop *bool
152154 gotMessageCtx context.Context
153- verify func(g Gomega, tc *testCase)
155+ verify func(g Gomega, tc *testCase, metrics *processor.Informer )
154156 }
155157 testCases := []testCase{
156158 {
157159 name: "continue periodic renewal on unknown error",
158160 renewer: &fakeSBLockRenewer{Err: fmt.Errorf("unknown error")},
159- verify: func(g Gomega, tc *testCase) {
161+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer ) {
160162 g.Eventually(
161163 func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
162164 130*time.Millisecond,
163165 20*time.Millisecond).Should(Succeed())
164166 },
165167 },
168+ {
169+ name: "stop periodic renewal on context canceled",
170+ isRenewerCanceled: false,
171+ renewer: &fakeSBLockRenewer{Err: context.Canceled},
172+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
173+ g.Consistently(
174+ func(g Gomega) {
175+ g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)),
176+ "should not attempt to renew")
177+ g.Expect(metrics.GetMessageLockRenewedFailureCount()).To(Equal(float64(0)),
178+ "should not record failure metric")
179+ },
180+ 130*time.Millisecond,
181+ 20*time.Millisecond).Should(Succeed())
182+ },
183+ },
166184 {
167185 name: "stop periodic renewal on context canceled",
168186 isRenewerCanceled: true,
169187 renewer: &fakeSBLockRenewer{Err: context.Canceled},
170- verify: func(g Gomega, tc *testCase) {
188+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer ) {
171189 g.Consistently(
172190 func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) },
173191 130*time.Millisecond,
@@ -177,7 +195,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
177195 {
178196 name: "stop periodic renewal on permanent error (lockLost)",
179197 renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
180- verify: func(g Gomega, tc *testCase) {
198+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer ) {
181199 g.Consistently(
182200 func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
183201 130*time.Millisecond,
@@ -187,7 +205,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
187205 {
188206 name: "cancel message context on stop by default",
189207 renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
190- verify: func(g Gomega, tc *testCase) {
208+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer ) {
191209 g.Consistently(
192210 func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
193211 130*time.Millisecond,
@@ -199,7 +217,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
199217 name: "does not cancel message context on stop if disabled",
200218 renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
201219 cancelCtxOnStop: to.Ptr(false),
202- verify: func(g Gomega, tc *testCase) {
220+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer ) {
203221 g.Consistently(
204222 func(g Gomega) {
205223 g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)))
@@ -212,7 +230,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
212230 {
213231 name: "continue periodic renewal on transient error (timeout)",
214232 renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeTimeout}},
215- verify: func(g Gomega, tc *testCase) {
233+ verify: func(g Gomega, tc *testCase, metrics *processor.Informer ) {
216234 g.Eventually(
217235 func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
218236 140*time.Millisecond,
@@ -225,7 +243,15 @@ func Test_RenewPeriodically_Error(t *testing.T) {
225243 t.Run(tc.name, func(t *testing.T) {
226244 t.Parallel()
227245 interval := 50 * time.Millisecond
228- lr := shuttle.NewLockRenewalHandler(tc.renewer, &shuttle.LockRenewalOptions{Interval: &interval, CancelMessageContextOnStop: tc.cancelCtxOnStop},
246+ reg := processor.NewRegistry()
247+ reg.Init(prometheus.NewRegistry())
248+ informer := processor.NewInformerFor(reg)
249+ lr := shuttle.NewLockRenewalHandler(tc.renewer,
250+ &shuttle.LockRenewalOptions{
251+ Interval: &interval,
252+ CancelMessageContextOnStop: tc.cancelCtxOnStop,
253+ MetricRecorder: reg,
254+ },
229255 shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
230256 message *azservicebus.ReceivedMessage) {
231257 tc.gotMessageCtx = ctx
@@ -237,13 +263,13 @@ func Test_RenewPeriodically_Error(t *testing.T) {
237263 }
238264 }))
239265 msg := &azservicebus.ReceivedMessage{}
240- ctx, cancel := context.WithTimeout(context.TODO (), 200*time.Millisecond)
266+ ctx, cancel := context.WithTimeout(context.Background (), 200*time.Millisecond)
241267 if tc.isRenewerCanceled {
242268 cancel()
243269 }
244270 defer cancel()
245271 lr.Handle(ctx, &fakeSettler{}, msg)
246- tc.verify(NewWithT(t), &tc)
272+ tc.verify(NewWithT(t), &tc, informer )
247273 })
248274 }
249275}
0 commit comments