Skip to content

Commit ee0f974

Browse files
authored
Merge pull request #1899 from InditexTech/GH-1876-support-scale-to-zero
Support scale to zero rabbitMQ
2 parents 57391f4 + 38aaf68 commit ee0f974

File tree

4 files changed

+348
-4
lines changed

4 files changed

+348
-4
lines changed

controllers/rabbitmqcluster_controller.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
201201
if err := builder.Update(sts); err != nil {
202202
return ctrl.Result{}, err
203203
}
204-
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
205-
// return when cluster scale down detected; unsupported operation
206-
return ctrl.Result{}, nil
204+
if ScaleToZero(current, sts) {
205+
err := r.saveReplicasBeforeZero(ctx, rabbitmqCluster, current)
206+
if err != nil {
207+
return ctrl.Result{}, err
208+
}
209+
} else {
210+
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
211+
// return when cluster scale down detected; unsupported operation
212+
return ctrl.Result{}, nil
213+
}
214+
}
215+
if ScaleFromZero(current, sts) {
216+
if r.scaleFromZeroToBeforeReplicasConfigured(ctx, rabbitmqCluster, sts) {
217+
// return when cluster scale down from zero detected; unsupported operation
218+
return ctrl.Result{}, nil
219+
}
220+
r.removeReplicasBeforeZeroAnnotationIfExists(ctx, rabbitmqCluster)
207221
}
208222
}
209223

@@ -212,8 +226,8 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
212226
r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
213227
return ctrl.Result{}, err
214228
}
215-
}
216229

230+
}
217231
var operationResult controllerutil.OperationResult
218232
err = clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
219233
var apiError error
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strconv"
8+
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
11+
"github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
12+
"github.com/rabbitmq/cluster-operator/v2/internal/status"
13+
appsv1 "k8s.io/api/apps/v1"
14+
corev1 "k8s.io/api/core/v1"
15+
)
16+
17+
const beforeZeroReplicasConfigured = "rabbitmq.com/before-zero-replicas-configured"
18+
19+
// ScaleToZero checks if the desired replicas is zero and the current replicas is not zero.
20+
func ScaleToZero(current, sts *appsv1.StatefulSet) bool {
21+
currentReplicas := *current.Spec.Replicas
22+
desiredReplicas := *sts.Spec.Replicas
23+
return desiredReplicas == 0 && currentReplicas > 0
24+
}
25+
26+
// ScaleFromZero checks if the current replicas is zero and the desired replicas is greater than zero.
27+
func ScaleFromZero(current, sts *appsv1.StatefulSet) bool {
28+
currentReplicas := *current.Spec.Replicas
29+
desiredReplicas := *sts.Spec.Replicas
30+
return currentReplicas == 0 && desiredReplicas > 0
31+
}
32+
33+
// scaleDownFromZero checks if the current replicas is desired replicas would be greatter than replicas configured before zero state.
34+
func (r *RabbitmqClusterReconciler) scaleFromZeroToBeforeReplicasConfigured(ctx context.Context, cluster *v1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) bool {
35+
logger := ctrl.LoggerFrom(ctx)
36+
var err error
37+
var beforeZeroReplicas int64
38+
desiredReplicas := *sts.Spec.Replicas
39+
annotationValue, ok := cluster.Annotations[beforeZeroReplicasConfigured]
40+
if !ok {
41+
return false
42+
}
43+
44+
beforeZeroReplicas, err = strconv.ParseInt(annotationValue, 10, 32)
45+
if err != nil {
46+
msg := "Failed to convert string to integer for before-zero-replicas-configuration annotation"
47+
reason := "TransformErrorOperation"
48+
logger.Error(errors.New(reason), msg)
49+
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
50+
if err != nil {
51+
logger.V(1).Info(err.Error())
52+
}
53+
return true
54+
}
55+
if desiredReplicas != int32(beforeZeroReplicas) {
56+
msg := fmt.Sprintf("Unsupported operation; when scaling from zero, you can only restore the previous number of replicas (%d)", int32(beforeZeroReplicas))
57+
reason := "UnsupportedOperation"
58+
logger.Error(errors.New(reason), msg)
59+
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
60+
if err != nil {
61+
logger.V(1).Info(err.Error())
62+
}
63+
return true
64+
}
65+
return false
66+
67+
}
68+
69+
// saveReplicasBeforeZero saves the current replicas count in an annotation before scaling down to zero.
70+
// This is used to prevent scaling down when the cluster change from zero replicas to a number less than the saved replicas count.
71+
func (r *RabbitmqClusterReconciler) saveReplicasBeforeZero(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current *appsv1.StatefulSet) error {
72+
currentReplicas := *current.Spec.Replicas
73+
logger := ctrl.LoggerFrom(ctx)
74+
msg := "Cluster Scale down to 0 replicas"
75+
reason := "ScaleDownToZero"
76+
logger.Info(msg)
77+
r.Recorder.Event(cluster, corev1.EventTypeNormal, reason, msg)
78+
return r.updateAnnotation(ctx, cluster, cluster.Namespace, cluster.Name, beforeZeroReplicasConfigured, fmt.Sprint(currentReplicas))
79+
}
80+
81+
// If the annotation rabbitmq.com/before-zero-replicas-configured exists it will be deleted.
82+
func (r *RabbitmqClusterReconciler) removeReplicasBeforeZeroAnnotationIfExists(ctx context.Context, cluster *v1beta1.RabbitmqCluster) {
83+
if _, ok := cluster.Annotations[beforeZeroReplicasConfigured]; ok {
84+
r.deleteAnnotation(ctx, cluster, beforeZeroReplicasConfigured)
85+
}
86+
}
87+
88+
func (r *RabbitmqClusterReconciler) recordEventsAndSetCondition(ctx context.Context, cluster *v1beta1.RabbitmqCluster, condType status.RabbitmqClusterConditionType, condStatus corev1.ConditionStatus, eventType, reason, msg string) error {
89+
r.Recorder.Event(cluster, eventType, reason, msg)
90+
cluster.Status.SetCondition(condType, condStatus, reason, msg)
91+
return r.Status().Update(ctx, cluster)
92+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package controllers_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
10+
"github.com/rabbitmq/cluster-operator/v2/internal/status"
11+
apierrors "k8s.io/apimachinery/pkg/api/errors"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/types"
14+
"k8s.io/utils/ptr"
15+
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
16+
)
17+
18+
var _ = Describe("Cluster scale to zero", func() {
19+
var (
20+
cluster *rabbitmqv1beta1.RabbitmqCluster
21+
defaultNamespace = "default"
22+
ctx = context.Background()
23+
)
24+
25+
AfterEach(func() {
26+
Expect(client.Delete(ctx, cluster)).To(Succeed())
27+
waitForClusterDeletion(ctx, cluster, client)
28+
Eventually(func() bool {
29+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
30+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
31+
return apierrors.IsNotFound(err)
32+
}).Should(BeTrue())
33+
})
34+
35+
It("scale to zero", func() {
36+
By("update statefulSet replicas to zero", func() {
37+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
38+
ObjectMeta: metav1.ObjectMeta{
39+
Name: "rabbitmq-to-zero",
40+
Namespace: defaultNamespace,
41+
},
42+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
43+
Replicas: ptr.To(int32(2)),
44+
},
45+
}
46+
Expect(client.Create(ctx, cluster)).To(Succeed())
47+
waitForClusterCreation(ctx, cluster, client)
48+
49+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
50+
r.Spec.Replicas = ptr.To(int32(0))
51+
})).To(Succeed())
52+
53+
Eventually(func() int32 {
54+
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
55+
Expect(err).NotTo(HaveOccurred())
56+
return *sts.Spec.Replicas
57+
}, 10, 1).Should(Equal(int32(0)))
58+
59+
})
60+
61+
By("setting ReconcileSuccess to 'true'", func() {
62+
Eventually(func() string {
63+
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
64+
Expect(client.Get(ctx, runtimeClient.ObjectKey{
65+
Name: cluster.Name,
66+
Namespace: defaultNamespace,
67+
}, rabbit)).To(Succeed())
68+
69+
for i := range rabbit.Status.Conditions {
70+
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
71+
return fmt.Sprintf(
72+
"ReconcileSuccess status: %s, with reason: %s and message: %s",
73+
rabbit.Status.Conditions[i].Status,
74+
rabbit.Status.Conditions[i].Reason,
75+
rabbit.Status.Conditions[i].Message)
76+
}
77+
}
78+
return "ReconcileSuccess status: condition not present"
79+
}, 0).Should(Equal("ReconcileSuccess status: True, " +
80+
"with reason: Success " +
81+
"and message: Finish reconciling"))
82+
})
83+
})
84+
})
85+
86+
var _ = Describe("Cluster scale from zero", func() {
87+
var (
88+
cluster *rabbitmqv1beta1.RabbitmqCluster
89+
defaultNamespace = "default"
90+
ctx = context.Background()
91+
)
92+
93+
AfterEach(func() {
94+
Expect(client.Delete(ctx, cluster)).To(Succeed())
95+
waitForClusterDeletion(ctx, cluster, client)
96+
Eventually(func() bool {
97+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
98+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
99+
return apierrors.IsNotFound(err)
100+
}).Should(BeTrue())
101+
})
102+
103+
It("scale from zero", func() {
104+
By("update statefulSet replicas from zero", func() {
105+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
106+
ObjectMeta: metav1.ObjectMeta{
107+
Name: "rabbitmq-from-zero",
108+
Namespace: defaultNamespace,
109+
Annotations: map[string]string{
110+
"rabbitmq.com/before-zero-replicas-configured": "2",
111+
},
112+
},
113+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
114+
Replicas: ptr.To(int32(0)),
115+
},
116+
}
117+
Expect(client.Create(ctx, cluster)).To(Succeed())
118+
waitForClusterCreation(ctx, cluster, client)
119+
120+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
121+
r.Spec.Replicas = ptr.To(int32(2))
122+
})).To(Succeed())
123+
124+
Eventually(func() int32 {
125+
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
126+
Expect(err).NotTo(HaveOccurred())
127+
return *sts.Spec.Replicas
128+
}, 10, 1).Should(Equal(int32(2)))
129+
130+
})
131+
132+
By("setting ReconcileSuccess to 'true'", func() {
133+
Eventually(func() string {
134+
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
135+
Expect(client.Get(ctx, runtimeClient.ObjectKey{
136+
Name: cluster.Name,
137+
Namespace: defaultNamespace,
138+
}, rabbit)).To(Succeed())
139+
140+
for i := range rabbit.Status.Conditions {
141+
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
142+
return fmt.Sprintf(
143+
"ReconcileSuccess status: %s, with reason: %s and message: %s",
144+
rabbit.Status.Conditions[i].Status,
145+
rabbit.Status.Conditions[i].Reason,
146+
rabbit.Status.Conditions[i].Message)
147+
}
148+
}
149+
return "ReconcileSuccess status: condition not present"
150+
}, 0).Should(Equal("ReconcileSuccess status: True, " +
151+
"with reason: Success " +
152+
"and message: Finish reconciling"))
153+
})
154+
})
155+
})
156+
157+
var _ = Describe("Cluster scale from zero to less replicas configured", Ordered, func() {
158+
var (
159+
cluster *rabbitmqv1beta1.RabbitmqCluster
160+
defaultNamespace = "default"
161+
ctx = context.Background()
162+
)
163+
164+
AfterEach(func() {
165+
Expect(client.Delete(ctx, cluster)).To(Succeed())
166+
waitForClusterDeletion(ctx, cluster, client)
167+
Eventually(func() bool {
168+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
169+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
170+
return apierrors.IsNotFound(err)
171+
}).Should(BeTrue())
172+
})
173+
174+
It("scale from zero to less replicas", func() {
175+
By("update statefulSet replicas from zero", func() {
176+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
177+
ObjectMeta: metav1.ObjectMeta{
178+
Name: "rabbitmq-from-zero-to-less",
179+
Namespace: defaultNamespace,
180+
Annotations: map[string]string{
181+
"rabbitmq.com/before-zero-replicas-configured": "2",
182+
},
183+
},
184+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
185+
Replicas: ptr.To(int32(0)),
186+
},
187+
}
188+
Expect(client.Create(ctx, cluster)).To(Succeed())
189+
waitForClusterCreation(ctx, cluster, client)
190+
191+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
192+
r.Spec.Replicas = ptr.To(int32(1))
193+
})).To(Succeed())
194+
195+
Consistently(func() int32 {
196+
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
197+
Expect(err).NotTo(HaveOccurred())
198+
return *sts.Spec.Replicas
199+
}, 10, 1).Should(Equal(int32(0)))
200+
201+
})
202+
203+
By("setting 'Warning' events", func() {
204+
Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To(
205+
ContainSubstring("Unsupported operation"))
206+
})
207+
208+
By("setting ReconcileSuccess to 'false'", func() {
209+
Eventually(func() string {
210+
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
211+
Expect(client.Get(ctx, runtimeClient.ObjectKey{
212+
Name: cluster.Name,
213+
Namespace: defaultNamespace,
214+
}, rabbit)).To(Succeed())
215+
216+
for i := range rabbit.Status.Conditions {
217+
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
218+
return fmt.Sprintf(
219+
"ReconcileSuccess status: %s, with reason: %s and message: %s",
220+
rabbit.Status.Conditions[i].Status,
221+
rabbit.Status.Conditions[i].Reason,
222+
rabbit.Status.Conditions[i].Message)
223+
}
224+
}
225+
return "ReconcileSuccess status: condition not present"
226+
}, 0).Should(Equal("ReconcileSuccess status: False, " +
227+
"with reason: UnsupportedOperation " +
228+
"and message: Unsupported operation; when scaling from zero, you can only restore the previous number of replicas (2)"))
229+
})
230+
})
231+
})

internal/status/all_replicas_ready.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ func AllReplicasReadyCondition(resources []runtime.Object,
4141
if resource.Spec.Replicas != nil {
4242
desiredReplicas = *resource.Spec.Replicas
4343
}
44+
45+
if desiredReplicas == 0 {
46+
condition.Status = corev1.ConditionFalse
47+
condition.Reason = "ScaledToZero"
48+
goto assignLastTransitionTime
49+
}
50+
4451
if desiredReplicas == resource.Status.ReadyReplicas {
4552
condition.Status = corev1.ConditionTrue
4653
condition.Reason = "AllPodsAreReady"

0 commit comments

Comments
 (0)