Skip to content

Commit 1bf2c0b

Browse files
authored
derivation pipeline worker graceful shutdown (#357)
1 parent d86dc0c commit 1bf2c0b

File tree

1 file changed

+20
-6
lines changed
  • crates/derivation-pipeline/src

1 file changed

+20
-6
lines changed

crates/derivation-pipeline/src/lib.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,30 @@ where
173173
tokio::select! {
174174
biased;
175175

176-
Some(batch_info) = self.batch_receiver.recv(), if self.futures.len() < DERIVATION_PIPELINE_WORKER_CONCURRENCY => {
177-
let fut = self.derivation_future(batch_info);
178-
self.futures.push_back(fut);
176+
maybe_batch = self.batch_receiver.recv(), if self.futures.len() < DERIVATION_PIPELINE_WORKER_CONCURRENCY => {
177+
match maybe_batch {
178+
Some(batch_info) => {
179+
let fut = self.derivation_future(batch_info);
180+
self.futures.push_back(fut);
181+
}
182+
None => {
183+
tracing::info!(target: "scroll::derivation_pipeline", "Batch channel closed, shutting down derivation pipeline worker");
184+
break;
185+
}
186+
}
187+
179188
}
180189
Some(result) = self.futures.next() => {
181190
match result {
182-
Ok(res) => self.result_sender.send(res).expect("Failed to send batch derivation result"),
191+
Ok(res) => {
192+
if self.result_sender.send(res).is_err() {
193+
tracing::info!(target: "scroll::derivation_pipeline", "Result channel closed, shutting down derivation pipeline worker");
194+
break;
195+
}
196+
}
183197
Err((batch_info, err)) => {
184-
tracing::error!(target: "scroll::derivation_pipeline", ?batch_info, ?err, "Failed to derive payload attributes");
185-
self.futures.push_front(self.derivation_future(batch_info));
198+
tracing::error!(target: "scroll::derivation_pipeline", ?batch_info, ?err, "Failed to derive payload attributes");
199+
self.futures.push_front(self.derivation_future(batch_info));
186200
}
187201
}
188202
}

0 commit comments

Comments
 (0)