@@ -525,9 +525,8 @@ func (c *clusterClient) toReplica(cmd Completed) bool {
525525 return false
526526}
527527
528- func (c * clusterClient ) _pickMulti (multi []Completed ) (retries * connretry ) {
528+ func (c * clusterClient ) _pickMulti (multi []Completed ) (retries * connretry , init bool ) {
529529 last := cmds .InitSlot
530- init := false
531530
532531 for _ , cmd := range multi {
533532 if cmd .Slot () == cmds .InitSlot {
@@ -550,7 +549,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
550549 cc = c .pslots [cmd .Slot ()]
551550 }
552551 if cc == nil {
553- return nil
552+ return nil , false
554553 }
555554 count .m [cc ]++
556555 }
@@ -569,13 +568,13 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
569568 cc = c .pslots [cmd .Slot ()]
570569 }
571570 if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
572- return nil
571+ return nil , false
573572 }
574573 re := retries .m [cc ]
575574 re .commands = append (re .commands , cmd )
576575 re .cIndexes = append (re .cIndexes , i )
577576 }
578- return retries
577+ return retries , init
579578 }
580579
581580 inits := 0
@@ -589,25 +588,28 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
589588 } else if init && last != cmd .Slot () {
590589 panic (panicMixCxSlot )
591590 }
592- p := c .pslots [cmd .Slot ()]
593- if p == nil {
594- return nil
591+ cc := c .pslots [cmd .Slot ()]
592+ if cc == nil {
593+ return nil , false
595594 }
596- count .m [p ]++
595+ count .m [cc ]++
597596 }
598597
599598 if last == cmds .InitSlot {
600599 // if all commands have no slots, such as INFO, we pick a non-nil slot.
601- for i , p := range c .pslots {
602- if p != nil {
600+ for i , cc := range c .pslots {
601+ if cc != nil {
603602 last = uint16 (i )
604- count .m [p ] = inits
603+ count .m [cc ] = inits
605604 break
606605 }
607606 }
608607 if last == cmds .InitSlot {
609- return nil
608+ return nil , false
610609 }
610+ } else if init {
611+ cc := c .pslots [last ]
612+ count .m [cc ] += inits
611613 }
612614
613615 retries = connretryp .Get (len (count .m ), len (count .m ))
@@ -627,25 +629,34 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
627629 re .commands = append (re .commands , cmd )
628630 re .cIndexes = append (re .cIndexes , i )
629631 }
630- return retries
632+ return retries , init
631633}
632634
633- func (c * clusterClient ) pickMulti (ctx context.Context , multi []Completed ) (* connretry , error ) {
634- conns := c ._pickMulti (multi )
635+ func (c * clusterClient ) pickMulti (ctx context.Context , multi []Completed ) (* connretry , bool , error ) {
636+ conns , hasInit := c ._pickMulti (multi )
635637 if conns == nil {
636638 if err := c .refresh (ctx ); err != nil {
637- return nil , err
639+ return nil , false , err
638640 }
639- if conns = c ._pickMulti (multi ); conns == nil {
640- return nil , ErrNoSlot
641+ if conns , hasInit = c ._pickMulti (multi ); conns == nil {
642+ return nil , false , ErrNoSlot
641643 }
642644 }
643- return conns , nil
645+ return conns , hasInit , nil
646+ }
647+
648+ func isMulti (cmd Completed ) bool {
649+ return len (cmd .Commands ()) == 1 && cmd .Commands ()[0 ] == "MULTI"
650+ }
651+ func isExec (cmd Completed ) bool {
652+ return len (cmd .Commands ()) == 1 && cmd .Commands ()[0 ] == "EXEC"
644653}
645654
646655func (c * clusterClient ) doresultfn (
647- ctx context.Context , results * redisresults , retries * connretry , mu * sync.Mutex , cc conn , cIndexes []int , commands []Completed , resps []RedisResult , attempts int ,
656+ ctx context.Context , results * redisresults , retries * connretry , mu * sync.Mutex , cc conn , cIndexes []int , commands []Completed , resps []RedisResult , attempts int , hasInit bool ,
648657) (clean bool ) {
658+ mi := - 1
659+ ei := - 1
649660 clean = true
650661 for i , resp := range resps {
651662 clean = clean && resp .NonRedisError () == nil
@@ -664,6 +675,37 @@ func (c *clusterClient) doresultfn(
664675 } else {
665676 nc = c .redirectOrNew (addr , cc , cm .Slot (), mode )
666677 }
678+ if hasInit && ei < i { // find out if there is a transaction block or not.
679+ for mi = i ; mi >= 0 && ! isMulti (commands [mi ]) && ! isExec (commands [mi ]); mi -- {
680+ }
681+ for ei = i ; ei < len (commands ) && ! isMulti (commands [ei ]) && ! isExec (commands [ei ]); ei ++ {
682+ }
683+ if mi >= 0 && ei < len (commands ) && isMulti (commands [mi ]) && isExec (commands [ei ]) && resps [mi ].val .string == ok { // a transaction is found.
684+ mu .Lock ()
685+ retries .Redirects ++
686+ nr := retries .m [nc ]
687+ if nr == nil {
688+ nr = retryp .Get (0 , len (commands ))
689+ retries .m [nc ] = nr
690+ }
691+ for i := mi ; i <= ei ; i ++ {
692+ ii := cIndexes [i ]
693+ cm := commands [i ]
694+ if mode == RedirectAsk {
695+ nr .aIndexes = append (nr .aIndexes , ii )
696+ nr .cAskings = append (nr .cAskings , cm )
697+ } else {
698+ nr .cIndexes = append (nr .cIndexes , ii )
699+ nr .commands = append (nr .commands , cm )
700+ }
701+ }
702+ mu .Unlock ()
703+ continue // the transaction has been added to the retries, go to the next cmd.
704+ }
705+ }
706+ if hasInit && mi < i && i < ei && mi >= 0 && isMulti (commands [mi ]) {
707+ continue // the current cmd is in the processed transaction and has been added to the retries.
708+ }
667709 mu .Lock ()
668710 if mode != RedirectRetry {
669711 retries .Redirects ++
@@ -690,17 +732,17 @@ func (c *clusterClient) doresultfn(
690732}
691733
692734func (c * clusterClient ) doretry (
693- ctx context.Context , cc conn , results * redisresults , retries * connretry , re * retry , mu * sync.Mutex , wg * sync.WaitGroup , attempts int ,
735+ ctx context.Context , cc conn , results * redisresults , retries * connretry , re * retry , mu * sync.Mutex , wg * sync.WaitGroup , attempts int , hasInit bool ,
694736) {
695737 clean := true
696738 if len (re .commands ) != 0 {
697739 resps := cc .DoMulti (ctx , re .commands ... )
698- clean = c .doresultfn (ctx , results , retries , mu , cc , re .cIndexes , re .commands , resps .s , attempts )
740+ clean = c .doresultfn (ctx , results , retries , mu , cc , re .cIndexes , re .commands , resps .s , attempts , hasInit )
699741 resultsp .Put (resps )
700742 }
701743 if len (re .cAskings ) != 0 {
702744 resps := askingMulti (cc , ctx , re .cAskings )
703- clean = c .doresultfn (ctx , results , retries , mu , cc , re .aIndexes , re .cAskings , resps .s , attempts ) && clean
745+ clean = c .doresultfn (ctx , results , retries , mu , cc , re .aIndexes , re .cAskings , resps .s , attempts , hasInit ) && clean
704746 resultsp .Put (resps )
705747 }
706748 if clean {
@@ -714,7 +756,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
714756 return nil
715757 }
716758
717- retries , err := c .pickMulti (ctx , multi )
759+ retries , hasInit , err := c .pickMulti (ctx , multi )
718760 if err != nil {
719761 return fillErrs (len (multi ), err )
720762 }
@@ -742,18 +784,17 @@ retry:
742784 }
743785 for cc , re := range retries .m {
744786 delete (retries .m , cc )
745- go c .doretry (ctx , cc , results , retries , re , & mu , & wg , attempts )
787+ go c .doretry (ctx , cc , results , retries , re , & mu , & wg , attempts , hasInit )
746788 }
747789 mu .Unlock ()
748- c .doretry (ctx , cc1 , results , retries , re1 , & mu , & wg , attempts )
790+ c .doretry (ctx , cc1 , results , retries , re1 , & mu , & wg , attempts , hasInit )
749791 wg .Wait ()
750792
751793 if len (retries .m ) != 0 {
752794 if retries .Redirects > 0 {
753795 retries .Redirects = 0
754796 goto retry
755797 }
756-
757798 if retries .RetryDelay >= 0 {
758799 c .retryHandler .WaitForRetry (ctx , retries .RetryDelay )
759800 attempts ++
@@ -817,14 +858,23 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur
817858}
818859
819860func askingMulti (cc conn , ctx context.Context , multi []Completed ) * redisresults {
861+ var inTx bool
820862 commands := make ([]Completed , 0 , len (multi )* 2 )
821863 for _ , cmd := range multi {
822- commands = append (commands , cmds .AskingCmd , cmd )
864+ if inTx {
865+ commands = append (commands , cmd )
866+ inTx = ! isExec (cmd )
867+ } else {
868+ commands = append (commands , cmds .AskingCmd , cmd )
869+ inTx = isMulti (cmd )
870+ }
823871 }
824872 results := resultsp .Get (0 , len (multi ))
825873 resps := cc .DoMulti (ctx , commands ... )
826- for i := 1 ; i < len (resps .s ); i += 2 {
827- results .s = append (results .s , resps .s [i ])
874+ for i , resp := range resps .s {
875+ if commands [i ] != cmds .AskingCmd {
876+ results .s = append (results .s , resp )
877+ }
828878 }
829879 resultsp .Put (resps )
830880 return results
@@ -946,7 +996,6 @@ func (c *clusterClient) resultcachefn(
946996 if ! c .retry {
947997 continue
948998 }
949-
950999 retryDelay = c .retryHandler .RetryDelay (attempts , Completed (cm .Cmd ), resp .Error ())
9511000 } else {
9521001 nc = c .redirectOrNew (addr , cc , cm .Cmd .Slot (), mode )
@@ -1040,7 +1089,6 @@ retry:
10401089 retries .Redirects = 0
10411090 goto retry
10421091 }
1043-
10441092 if retries .RetryDelay >= 0 {
10451093 c .retryHandler .WaitForRetry (ctx , retries .RetryDelay )
10461094 attempts ++
0 commit comments