@@ -6,6 +6,7 @@ package msg
66
77import (
88 "fmt"
9+ "log/slog"
910 "strings"
1011 "sync"
1112 "sync/atomic"
@@ -14,20 +15,22 @@ import (
1415
1516// if a Send operation takes more than this time, we panic informing about a deadlock
1617// in the user-provide pipeline
17- const sendTimeout = 20 * time .Second
18+ const defaultSendTimeout = 20 * time .Second
1819
1920const unnamed = "(unnamed)"
2021
2122type queueConfig struct {
2223 channelBufferLen int
2324 closingAttempts int
2425 name string
26+ sendTimeout time.Duration
2527}
2628
2729var defaultQueueConfig = queueConfig {
2830 channelBufferLen : 1 ,
2931 closingAttempts : 1 ,
3032 name : unnamed ,
33+ sendTimeout : defaultSendTimeout ,
3134}
3235
3336// QueueOpts allow configuring some operation of a queue
@@ -47,6 +50,17 @@ func Name(name string) QueueOpts {
4750 }
4851}
4952
53+ // SendTimeout sets the timeout for Send operations. This is useful for detecting
54+ // deadlocks derived from a wrong Pipeline construction. It panics if after
55+ // a send operation, the channel is blocked for more than this timeout.
56+ // Some nodes might require too long to initialize. For example the Kubernetes Decorator
57+ // at start, has to download a whole snapshot
58+ func SendTimeout (to time.Duration ) QueueOpts {
59+ return func (c * queueConfig ) {
60+ c .sendTimeout = to
61+ }
62+ }
63+
5064// ClosingAttempts sets the number of invocations to MarkCloseable before the channel is
5165// effectively closed.
5266// This is useful when multiple nodes are sending messages to the same queue, and we want
@@ -88,7 +102,7 @@ func NewQueue[T any](opts ...QueueOpts) *Queue[T] {
88102 for _ , opt := range opts {
89103 opt (& cfg )
90104 }
91- return & Queue [T ]{cfg : & cfg , remainingClosers : cfg .closingAttempts , sendTimeout : time .NewTimer (sendTimeout )}
105+ return & Queue [T ]{cfg : & cfg , remainingClosers : cfg .closingAttempts , sendTimeout : time .NewTimer (cfg . sendTimeout )}
92106}
93107
94108// Send a message to all subscribers of this queue.
@@ -114,11 +128,36 @@ func (q *Queue[T]) chainedSend(o T, bypassPath []string) {
114128 if len (q .dsts ) == 0 {
115129 return
116130 }
117- q .sendTimeout .Reset (sendTimeout )
131+
132+ // instead of directly panicking in sendTimeout, we first warn at 0.75*sendTimeout,
133+ // to get logged about other blocked senders before panicking
134+ q .sendTimeout .Reset (3 * q .cfg .sendTimeout / 4 )
135+ var blocked []dst [T ]
118136 for _ , d := range q .dsts {
119137 select {
120138 case d .ch <- o :
121- // good!
139+ // good!
140+ case <- q .sendTimeout .C :
141+ slog .With (
142+ "timeout" , q .cfg .sendTimeout ,
143+ "queueLen" , len (d .ch ), "queueCap" , cap (d .ch ),
144+ "sendPath" , strings .Join (bypassPath , "->" ),
145+ "dstName" , d .name ).
146+ Warn ("subscriber channel is taking too long to respond" )
147+ // reset timeout to a small amount to detect any other possible blocked subscriber
148+ q .sendTimeout .Reset (time .Second )
149+ blocked = append (blocked , d )
150+ }
151+ }
152+ if len (blocked ) == 0 {
153+ return
154+ }
155+ // if we confirm that the blocker candidates are actually blocked, we panic
156+ q .sendTimeout .Reset (q .cfg .sendTimeout / 4 )
157+ for _ , d := range blocked {
158+ select {
159+ case d .ch <- o :
160+ // good!
122161 case <- q .sendTimeout .C :
123162 panic (fmt .Sprintf ("sending through queue path %s. Subscriber channel %s is blocked" ,
124163 strings .Join (bypassPath , "->" ), d .name ))
0 commit comments