@@ -10,44 +10,50 @@ import (
1010 "github.com/go-logr/logr"
1111 redisv1beta1 "github.com/jaehanbyun/redis-operator/api/v1beta1"
1212 corev1 "k8s.io/api/core/v1"
13+ "k8s.io/apimachinery/pkg/api/errors"
1314 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415 "k8s.io/client-go/kubernetes"
1516 "sigs.k8s.io/controller-runtime/pkg/client"
1617)
1718
18- const (
19- FailureThreshold = 5
20- )
21-
2219type ClusterNodeInfo struct {
2320 NodeID string
2421 PodName string
25- Flags string
22+ Role string
23+ Up bool
2624 MasterNodeID string
2725}
2826
2927// GetClusterNodesInfo returns information about all nodes in a Cluster by executing "cluster nodes" command via redis-cli
3028func GetClusterNodesInfo (ctx context.Context , k8scl kubernetes.Interface , redisCluster * redisv1beta1.RedisCluster , logger logr.Logger ) ([]ClusterNodeInfo , error ) {
31- var masterPodName string
32- for _ , master := range redisCluster .Status .MasterMap {
33- isRunning , err := IsPodRunning (ctx , k8scl , redisCluster .Namespace , master .PodName , "redis" , logger )
29+ podList , err := k8scl .CoreV1 ().Pods (redisCluster .Namespace ).List (ctx , metav1.ListOptions {
30+ LabelSelector : fmt .Sprintf ("clusterName=%s" , redisCluster .Name ),
31+ })
32+ if err != nil {
33+ logger .Error (err , "Failed to list Pods" )
34+ return nil , err
35+ }
36+
37+ var redisPodName string
38+ for _ , pod := range podList .Items {
39+ isRunning , err := IsPodRunning (ctx , k8scl , redisCluster .Namespace , pod .Name , "redis" , logger )
3440 if err != nil {
35- logger .Error (err , "Error checking if Pod is running" , "PodName" , master . PodName )
41+ logger .Error (err , "Error checking if Pod is running" , "PodName" , pod . Name )
3642 continue
3743 }
3844 if isRunning {
39- masterPodName = master . PodName
45+ redisPodName = pod . Name
4046 break
4147 }
4248 }
43- if masterPodName == "" {
44- logger .Info ("No master nodes in the cluster" )
49+ if redisPodName == "" {
50+ logger .Info ("No running Pods found in the cluster" )
4551 return []ClusterNodeInfo {}, nil
4652 }
4753
48- port := ExtractPortFromPodName (masterPodName )
54+ port := ExtractPortFromPodName (redisPodName )
4955 cmd := []string {"redis-cli" , "-p" , fmt .Sprintf ("%d" , port ), "cluster" , "nodes" }
50- output , err := RunRedisCLI (k8scl , redisCluster .Namespace , masterPodName , cmd )
56+ output , err := RunRedisCLI (k8scl , redisCluster .Namespace , redisPodName , cmd )
5157 if err != nil {
5258 logger .Error (err , "Error executing Redis CLI command" , "Command" , strings .Join (cmd , " " ))
5359 return nil , err
@@ -63,22 +69,27 @@ func GetClusterNodesInfo(ctx context.Context, k8scl kubernetes.Interface, redisC
6369 }
6470 parts := strings .Split (line , " " )
6571 nodeID := parts [0 ]
66- flags := parts [2 ]
6772 masterNodeID := ""
73+ role := ""
74+ up := true
6875 if len (parts ) > 3 && parts [3 ] != "-" {
6976 masterNodeID = parts [3 ]
7077 }
71-
72- podName , err := GetPodNameByNodeID ( k8scl , redisCluster . Namespace , nodeID , logger )
73- if err != nil {
74- logger . Error ( err , "Failed to find Pod by NodeID" , "NodeID" , nodeID )
78+ if strings . Contains ( line , "master" ) {
79+ role = "master"
80+ } else {
81+ role = "slave"
7582 }
76-
83+ if strings .Contains (line , "fail" ) || strings .Contains (line , "disconnected" ) {
84+ up = false
85+ }
86+ podName , _ := GetPodNameByNodeID (k8scl , redisCluster .Namespace , nodeID , logger )
7787 nodesInfo = append (nodesInfo , ClusterNodeInfo {
7888 NodeID : nodeID ,
7989 PodName : podName ,
80- Flags : flags ,
90+ Role : role ,
8191 MasterNodeID : masterNodeID ,
92+ Up : up ,
8293 })
8394 }
8495
@@ -137,18 +148,7 @@ func UpdateClusterStatus(ctx context.Context, cl client.Client, k8scl kubernetes
137148 return err
138149 }
139150
140- if redisCluster .Status .NextAvailablePort == 0 {
141- redisCluster .Status .NextAvailablePort = redisCluster .Spec .BasePort
142- }
143- if redisCluster .Status .MasterMap == nil {
144- redisCluster .Status .MasterMap = make (map [string ]redisv1beta1.RedisNodeStatus )
145- }
146- if redisCluster .Status .ReplicaMap == nil {
147- redisCluster .Status .ReplicaMap = make (map [string ]redisv1beta1.RedisNodeStatus )
148- }
149- if redisCluster .Status .FailedNodes == nil {
150- redisCluster .Status .FailedNodes = make (map [string ]redisv1beta1.RedisFailedNodeStatus )
151- }
151+ initializeStatusMaps (redisCluster )
152152
153153 podList , err := k8scl .CoreV1 ().Pods (redisCluster .Namespace ).List (ctx , metav1.ListOptions {
154154 LabelSelector : fmt .Sprintf ("clusterName=%s" , redisCluster .Name )})
@@ -165,73 +165,38 @@ func UpdateClusterStatus(ctx context.Context, cl client.Client, k8scl kubernetes
165165
166166 currentMasters := make (map [string ]redisv1beta1.RedisNodeStatus )
167167 currentReplicas := make (map [string ]redisv1beta1.RedisNodeStatus )
168+ currentMasterFailedMap := make (map [string ]redisv1beta1.RedisNodeStatus )
169+ currentReplicaFailedMap := make (map [string ]redisv1beta1.RedisNodeStatus )
168170
169171 if len (nodesInfo ) == 0 {
170172 logger .Info ("No cluster node information found. Assuming initial state" )
171173 } else {
172174 for _ , node := range nodesInfo {
173- flagsList := strings .Split (node .Flags , "," )
174-
175- pod := existingPods [node .PodName ]
176-
177- if pod .Status .Phase != corev1 .PodRunning || ! isContainerReady (pod , "redis" ) {
178- incrementFailureCount (redisCluster , node .NodeID , node .PodName , 2 )
179- continue
180- }
181-
182- if containsFlag (flagsList , "fail" ) || containsFlag (flagsList , "disconnected" ) {
183- incrementFailureCount (redisCluster , node .NodeID , node .PodName , 1 )
184- } else {
185- resetFailureCount (redisCluster , node .NodeID )
186- }
187-
188- failureCount := redisCluster .Status .FailedNodes [node .NodeID ].FailureCount
189- if failureCount < 5 {
190- if containsFlag (flagsList , "master" ) {
191- currentMasters [node .NodeID ] = redisv1beta1.RedisNodeStatus {
192- PodName : node .PodName ,
193- NodeID : node .NodeID ,
175+ if ! node .Up {
176+ if node .Role == "master" {
177+ currentMasterFailedMap [node .NodeID ] = redisv1beta1.RedisNodeStatus {
178+ NodeID : node .NodeID ,
194179 }
195- } else if containsFlag (flagsList , "slave" ) {
196- currentReplicas [node .NodeID ] = redisv1beta1.RedisNodeStatus {
197- PodName : node .PodName ,
180+ } else {
181+ currentReplicaFailedMap [node .NodeID ] = redisv1beta1.RedisNodeStatus {
198182 NodeID : node .NodeID ,
199183 MasterNodeID : node .MasterNodeID ,
200184 }
201185 }
186+ } else {
187+ updateNodeStatus (currentMasters , currentReplicas , node )
202188 }
203189 }
204190 }
205191
206192 redisCluster .Status .MasterMap = currentMasters
207193 redisCluster .Status .ReplicaMap = currentReplicas
194+ redisCluster .Status .FailedMasterMap = currentMasterFailedMap
195+ redisCluster .Status .FailedReplicaMap = currentReplicaFailedMap
208196
209- logger .Info ("Current MasterMap" , "MasterMap" , redisCluster .Status .MasterMap )
210- logger .Info ("Current ReplicaMap" , "ReplicaMap" , redisCluster .Status .ReplicaMap )
211- logger .Info ("Current FailedNodes" , "FailedNodes" , redisCluster .Status .FailedNodes )
212-
213- failedNodes := make (map [string ]redisv1beta1.RedisFailedNodeStatus )
214- for _ , node := range redisCluster .Status .FailedNodes {
215- if node .FailureCount >= FailureThreshold {
216- logger .Info ("Handling permanently failed node" , "NodeID" , node .NodeID )
217- failedNodes [node .NodeID ] = node
218- }
219- }
220-
221- if len (failedNodes ) > FailureThreshold {
222- err = handleFailedNode (ctx , cl , k8scl , redisCluster , logger , failedNodes )
223- if err != nil {
224- logger .Error (err , "Failed to handle failed node" )
225- return err
226- }
227- }
228-
229- if err := cl .Status ().Update (ctx , redisCluster ); err != nil {
230- logger .Error (err , "Error updating RedisCluster status" )
231- return err
232- }
197+ logClusterStatus (logger , redisCluster )
233198
234- return nil
199+ return updateClusterStatusWithRetry ( ctx , cl , redisCluster )
235200}
236201
237202// WaitForClusterStabilization checks if all Redis cluster nodes agree on the configuration
@@ -320,3 +285,73 @@ func RemoveReplicasOfMaster(ctx context.Context, cl client.Client, k8scl kuberne
320285
321286 return nil
322287}
288+
289+ func initializeStatusMaps (redisCluster * redisv1beta1.RedisCluster ) {
290+ if redisCluster .Status .NextAvailablePort == 0 {
291+ redisCluster .Status .NextAvailablePort = redisCluster .Spec .BasePort
292+ }
293+ if redisCluster .Status .MasterMap == nil {
294+ redisCluster .Status .MasterMap = make (map [string ]redisv1beta1.RedisNodeStatus )
295+ }
296+ if redisCluster .Status .ReplicaMap == nil {
297+ redisCluster .Status .ReplicaMap = make (map [string ]redisv1beta1.RedisNodeStatus )
298+ }
299+ if redisCluster .Status .FailedMasterMap == nil {
300+ redisCluster .Status .FailedMasterMap = make (map [string ]redisv1beta1.RedisNodeStatus )
301+ }
302+ if redisCluster .Status .FailedMasterMap == nil {
303+ redisCluster .Status .FailedMasterMap = make (map [string ]redisv1beta1.RedisNodeStatus )
304+ }
305+ }
306+
307+ func updateNodeStatus (currentMasters , currentReplicas map [string ]redisv1beta1.RedisNodeStatus , node ClusterNodeInfo ) {
308+ if node .Role == "master" {
309+ currentMasters [node .NodeID ] = redisv1beta1.RedisNodeStatus {
310+ PodName : node .PodName ,
311+ NodeID : node .NodeID ,
312+ }
313+ } else if node .Role == "slave" {
314+ currentReplicas [node .NodeID ] = redisv1beta1.RedisNodeStatus {
315+ PodName : node .PodName ,
316+ NodeID : node .NodeID ,
317+ MasterNodeID : node .MasterNodeID ,
318+ }
319+ }
320+ }
321+
322+ func GetReplicasOfMaster (redisCluster * redisv1beta1.RedisCluster , masterNodeID string ) []redisv1beta1.RedisNodeStatus {
323+ replicas := []redisv1beta1.RedisNodeStatus {}
324+ for _ , replica := range redisCluster .Status .ReplicaMap {
325+ if replica .MasterNodeID == masterNodeID {
326+ replicas = append (replicas , replica )
327+ }
328+ }
329+ return replicas
330+ }
331+
332+ func updateClusterStatusWithRetry (ctx context.Context , cl client.Client , redisCluster * redisv1beta1.RedisCluster ) error {
333+ currentVersion := redisCluster .ResourceVersion
334+
335+ if err := cl .Status ().Update (ctx , redisCluster ); err != nil {
336+ if errors .IsConflict (err ) {
337+ updatedRedisCluster := & redisv1beta1.RedisCluster {}
338+ if err := cl .Get (ctx , client .ObjectKeyFromObject (redisCluster ), updatedRedisCluster ); err != nil {
339+ return err
340+ }
341+ if updatedRedisCluster .ResourceVersion != currentVersion {
342+ updatedRedisCluster .Status = redisCluster .Status
343+ return cl .Status ().Update (ctx , updatedRedisCluster )
344+ }
345+ }
346+ return err
347+ }
348+
349+ return nil
350+ }
351+
352+ func logClusterStatus (logger logr.Logger , redisCluster * redisv1beta1.RedisCluster ) {
353+ logger .Info ("Current MasterMap" , "MasterMap" , redisCluster .Status .MasterMap )
354+ logger .Info ("Current ReplicaMap" , "ReplicaMap" , redisCluster .Status .ReplicaMap )
355+ logger .Info ("Current FailedNodes" , "FailedMasterNodes" , redisCluster .Status .FailedMasterMap )
356+ logger .Info ("Current FailedNodes" , "FailedReplicaNodes" , redisCluster .Status .FailedReplicaMap )
357+ }
0 commit comments