Skip to content

Commit d668875

Browse files
committed
Better prevention and handling of redis waitgroup key expiry issues
1 parent 8396708 commit d668875

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

datastore/redis.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -819,13 +819,18 @@ func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err
819819
}
820820

821821
keyProcessingSlot := r.keyProcessingSlot(slot)
822-
err = r.client.Incr(ctx, keyProcessingSlot).Err()
822+
823+
pipe := r.client.TxPipeline()
824+
pipe.Incr(ctx, keyProcessingSlot)
825+
pipe.Expire(ctx, keyProcessingSlot, expiryLock)
826+
_, err = pipe.Exec(ctx)
827+
823828
if err != nil {
824829
return err
825830
}
831+
826832
r.currentSlot = slot
827-
err = r.client.Expire(ctx, keyProcessingSlot, expiryLock).Err()
828-
return err
833+
return nil
829834
}
830835

831836
// EndProcessingSlot signals that a builder process is done handling blocks for the current slot
@@ -836,9 +841,18 @@ func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) {
836841
}
837842

838843
keyProcessingSlot := r.keyProcessingSlot(r.currentSlot)
839-
err = r.client.Decr(ctx, keyProcessingSlot).Err()
844+
845+
pipe := r.client.TxPipeline()
846+
pipe.Decr(ctx, keyProcessingSlot)
847+
pipe.Expire(ctx, keyProcessingSlot, expiryLock)
848+
_, err = pipe.Exec(ctx)
849+
850+
if err != nil {
851+
return err
852+
}
853+
840854
r.currentSlot = 0
841-
return err
855+
return nil
842856
}
843857

844858
// WaitForSlotComplete waits for a slot to be completed by all builder processes

services/api/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,7 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
844844
// Now we release our lock and wait for all other builder processes to wrap up
845845
err := api.redis.EndProcessingSlot(context.Background())
846846
if err != nil {
847-
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
847+
api.log.WithError(err).Error("failed to unlock redis optimistic processing slot")
848848
}
849849
err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1)
850850
if err != nil {
@@ -860,7 +860,8 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
860860
api.optimisticSlot.Store(headSlot + 1)
861861
err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1)
862862
if err != nil {
863-
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
863+
api.log.WithError(err).Error("failed to lock redis optimistic processing slot")
864+
api.optimisticSlot.Store(0)
864865
}
865866

866867
builders, err := api.db.GetBlockBuilders()

0 commit comments

Comments
 (0)