Skip to content

Commit ebfe5ec

Browse files
authored
chore: cleanup some workaround for background siit (#23749)
1 parent a8bd2f3 commit ebfe5ec

File tree

4 files changed

+20
-72
lines changed

4 files changed

+20
-72
lines changed

src/frontend/src/handler/cancel_job.rs

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use pgwire::pg_response::StatementType;
15+
use itertools::Itertools;
16+
use pgwire::pg_response::{PgResponse, StatementType};
1617
use risingwave_common::types::Fields;
18+
use risingwave_pb::id::JobId;
1719
use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
1820
use risingwave_sqlparser::ast::JobIdents;
1921

@@ -27,46 +29,16 @@ pub(super) async fn handle_cancel(
2729
jobs: JobIdents,
2830
) -> Result<RwPgResponse> {
2931
let session = handler_args.session;
30-
let job_ids = jobs.0;
31-
let mut filtered_job_ids = vec![];
32-
let mut notices = vec![];
33-
{
34-
let catalog_reader = session.env().catalog_reader().read_guard();
35-
for job_id in job_ids {
36-
let database_catalog = catalog_reader.get_database_by_name(&session.database())?;
37-
let sink_catalog = database_catalog
38-
.iter_schemas()
39-
.find_map(|schema| schema.get_sink_by_id(job_id.into()));
40-
if let Some(sink_catalog) = sink_catalog {
41-
if sink_catalog.is_created() {
42-
continue; // Skip already created sinks
43-
} else if sink_catalog.target_table.is_some() {
44-
notices.push(format!(
45-
"Please use `DROP SINK {}` to cancel sink into table job.",
46-
sink_catalog.name
47-
));
48-
continue;
49-
}
50-
}
51-
filtered_job_ids.push(job_id.into());
52-
}
53-
}
32+
let job_ids = jobs.0.into_iter().map(JobId::from).collect_vec();
5433

55-
let mut response_builder = RwPgResponse::builder(StatementType::CANCEL_COMMAND);
56-
for notice in notices {
57-
response_builder = response_builder.notice(notice);
58-
}
59-
60-
let canceled_jobs = if !filtered_job_ids.is_empty() {
34+
let canceled_jobs = if !job_ids.is_empty() {
6135
// Wrap in async block to convert RpcError to RwError
6236
execute_with_long_running_notification(
6337
async {
6438
session
6539
.env()
6640
.meta_client()
67-
.cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
68-
job_ids: filtered_job_ids,
69-
}))
41+
.cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids }))
7042
.await
7143
.map_err(Into::into)
7244
},
@@ -80,7 +52,9 @@ pub(super) async fn handle_cancel(
8052
let rows = canceled_jobs
8153
.into_iter()
8254
.map(|id| CancelRow { id: id.to_string() });
83-
Ok(response_builder.rows(rows).into())
55+
Ok(PgResponse::builder(StatementType::CANCEL_COMMAND)
56+
.rows(rows)
57+
.into())
8458
}
8559

8660
#[derive(Fields)]

src/meta/src/controller/catalog/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,16 +1166,6 @@ impl CatalogControllerInner {
11661166
}
11671167
}
11681168

1169-
pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, job_id: JobId) {
1170-
if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1171-
&& let Some(tx_list) = creating_tables.remove(&job_id)
1172-
{
1173-
for tx in tx_list {
1174-
let _ = tx.send(Err("Cancelled".to_owned()));
1175-
}
1176-
}
1177-
}
1178-
11791169
pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
11801170
let table_ids: Vec<TableId> = Table::find()
11811171
.select_only()

src/meta/src/manager/metadata.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -732,9 +732,4 @@ impl MetadataManager {
732732
let mut mgr = self.catalog_controller.get_inner_write_guard().await;
733733
mgr.notify_finish_failed(database_id, err);
734734
}
735-
736-
pub(crate) async fn notify_cancelled(&self, database_id: DatabaseId, job_id: JobId) {
737-
let mut mgr = self.catalog_controller.get_inner_write_guard().await;
738-
mgr.notify_cancelled(database_id, job_id);
739-
}
740735
}

src/meta/src/stream/stream_manager.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,6 @@ impl CreatingStreamingJobInfo {
177177

178178
Ok((receivers, background_job_ids))
179179
}
180-
181-
async fn check_job_exists(&self, job_id: JobId) -> bool {
182-
let jobs = self.streaming_jobs.lock().await;
183-
jobs.contains_key(&job_id)
184-
}
185180
}
186181

187182
type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
@@ -594,23 +589,10 @@ impl GlobalStreamManager {
594589
database_id: DatabaseId,
595590
removed_actors: Vec<ActorId>,
596591
streaming_job_ids: Vec<JobId>,
597-
state_table_ids: Vec<risingwave_meta_model::TableId>,
592+
state_table_ids: Vec<TableId>,
598593
fragment_ids: HashSet<FragmentId>,
599594
dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
600595
) {
601-
// TODO(august): This is a workaround for canceling SITT via drop, remove it after refactoring SITT.
602-
for &job_id in &streaming_job_ids {
603-
if self.creating_job_info.check_job_exists(job_id).await {
604-
tracing::info!(
605-
?job_id,
606-
"streaming job is creating, cancel it with drop directly"
607-
);
608-
self.metadata_manager
609-
.notify_cancelled(database_id, job_id)
610-
.await;
611-
}
612-
}
613-
614596
if !removed_actors.is_empty()
615597
|| !streaming_job_ids.is_empty()
616598
|| !state_table_ids.is_empty()
@@ -669,6 +651,13 @@ impl GlobalStreamManager {
669651
// we can directly cancel them by running the barrier command.
670652
let futures = background_job_ids.into_iter().map(|id| async move {
671653
let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
654+
if fragment.is_created() {
655+
tracing::warn!(
656+
"streaming job {} is already created, ignore cancel request",
657+
id
658+
);
659+
return Ok(None);
660+
}
672661
if fragment.is_created() {
673662
Err(MetaError::invalid_parameter(format!(
674663
"streaming job {} is already created",
@@ -694,15 +683,15 @@ impl GlobalStreamManager {
694683
.await?;
695684
}
696685

697-
tracing::info!(?id, "cancelled recovered streaming job");
698-
Ok(id)
686+
tracing::info!(?id, "cancelled background streaming job");
687+
Ok(Some(id))
699688
});
700689
let cancelled_recovered_ids = join_all(futures)
701690
.await
702691
.into_iter()
703692
.collect::<MetaResult<Vec<_>>>()?;
704693

705-
cancelled_ids.extend(cancelled_recovered_ids);
694+
cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
706695
Ok(cancelled_ids)
707696
}
708697

0 commit comments

Comments
 (0)