@@ -165,7 +165,7 @@ impl Actor for ControlPlane {
165165 self . model
166166 . load_from_metastore ( & mut self . metastore , ctx. progress ( ) )
167167 . await
168- . context ( "failed to initialize the model" ) ?;
168+ . context ( "failed to initialize control plane model" ) ?;
169169
170170 let _rebuild_plan_waiter = self . rebuild_plan_debounced ( ctx) ;
171171
@@ -467,7 +467,10 @@ impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
467467 reply : impl FnOnce ( Self :: Reply ) + Send + Sync + ' static ,
468468 ctx : & ActorContext < Self > ,
469469 ) -> Result < ( ) , ActorExitStatus > {
470- let response = match self . metastore . create_index ( request) . await {
470+ let response = match ctx
471+ . protect_future ( self . metastore . create_index ( request) )
472+ . await
473+ {
471474 Ok ( response) => response,
472475 Err ( metastore_error) => {
473476 reply ( convert_metastore_error ( metastore_error) ?) ;
@@ -515,7 +518,10 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
515518 let index_uid: IndexUid = request. index_uid ( ) . clone ( ) ;
516519 debug ! ( %index_uid, "deleting index" ) ;
517520
518- if let Err ( metastore_error) = self . metastore . delete_index ( request) . await {
521+ if let Err ( metastore_error) = ctx
522+ . protect_future ( self . metastore . delete_index ( request) )
523+ . await
524+ {
519525 return convert_metastore_error ( metastore_error) ;
520526 } ;
521527 info ! ( %index_uid, "deleted index" ) ;
@@ -562,7 +568,7 @@ impl Handler<AddSourceRequest> for ControlPlane {
562568 let source_id = source_config. source_id . clone ( ) ;
563569 debug ! ( %index_uid, source_id, "adding source" ) ;
564570
565- if let Err ( error) = self . metastore . add_source ( request) . await {
571+ if let Err ( error) = ctx . protect_future ( self . metastore . add_source ( request) ) . await {
566572 return Ok ( Err ( ControlPlaneError :: from ( error) ) ) ;
567573 } ;
568574 self . model
@@ -596,7 +602,10 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
596602 let enable = request. enable ;
597603 debug ! ( %index_uid, source_id, enable, "toggling source" ) ;
598604
599- if let Err ( error) = self . metastore . toggle_source ( request) . await {
605+ if let Err ( error) = ctx
606+ . protect_future ( self . metastore . toggle_source ( request) )
607+ . await
608+ {
600609 return Ok ( Err ( ControlPlaneError :: from ( error) ) ) ;
601610 } ;
602611 info ! ( %index_uid, source_id, enabled=enable, "toggled source" ) ;
@@ -629,7 +638,10 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
629638 source_id : source_id. clone ( ) ,
630639 } ;
631640
632- if let Err ( metastore_error) = self . metastore . delete_source ( request) . await {
641+ if let Err ( metastore_error) = ctx
642+ . protect_future ( self . metastore . delete_source ( request) )
643+ . await
644+ {
633645 // TODO If the metastore fails returns an error but somehow succeed deleting the source,
634646 // the control plane will restart and the shards will be remaining on the ingesters.
635647 //
0 commit comments