diff --git a/Cargo.lock b/Cargo.lock index 6e3980e8fae6..ce3d2a11f58a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8361,6 +8361,7 @@ dependencies = [ "common-macro", "common-telemetry", "common-test-util", + "derive_builder 0.20.2", "futures", "humantime-serde", "lazy_static", diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index c823bebafec3..88680ed195f0 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -1200,7 +1200,8 @@ impl RegionServerInner { | RegionRequest::Flush(_) | RegionRequest::Compact(_) | RegionRequest::Truncate(_) - | RegionRequest::BuildIndex(_) => RegionChange::None, + | RegionRequest::BuildIndex(_) + | RegionRequest::EnterStaging(_) => RegionChange::None, RegionRequest::Catchup(_) => RegionChange::Catchup, }; diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 9e4632a7d2f4..3e5a1e3c4824 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -23,6 +23,7 @@ mod options; mod put; mod read; mod region_metadata; +mod staging; mod state; mod sync; @@ -211,6 +212,13 @@ impl RegionEngine for MetricEngine { let mut extension_return_value = HashMap::new(); let result = match request { + RegionRequest::EnterStaging(_) => { + if self.inner.is_physical_region(region_id) { + self.handle_enter_staging_request(region_id, request).await + } else { + UnsupportedRegionRequestSnafu { request }.fail() + } + } RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Create(create) => { self.inner diff --git a/src/metric-engine/src/engine/staging.rs b/src/metric-engine/src/engine/staging.rs new file mode 100644 index 000000000000..9db500957ca7 --- /dev/null +++ b/src/metric-engine/src/engine/staging.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::AffectedRows; +use snafu::ResultExt; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{EnterStagingRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngine; +use crate::error::{MitoEnterStagingOperationSnafu, Result}; +use crate::utils; + +impl MetricEngine { + /// Handles the enter staging request for the given region. + pub(crate) async fn handle_enter_staging_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { + let metadata_region_id = utils::to_metadata_region_id(region_id); + let data_region_id = utils::to_data_region_id(region_id); + + // For metadata region, it doesn't care about the partition expr, so we can just pass an empty string. + self.inner + .mito + .handle_request( + metadata_region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: String::new(), + }), + ) + .await + .context(MitoEnterStagingOperationSnafu)?; + + self.inner + .mito + .handle_request(data_region_id, request) + .await + .context(MitoEnterStagingOperationSnafu) + .map(|response| response.affected_rows) + } +} diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 243fd566a3a7..3d00b737c30d 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -156,6 +156,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mito enter staging operation fails"))] + MitoEnterStagingOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect record batch stream"))] CollectRecordBatchStream { source: common_recordbatch::error::Error, @@ -360,6 +367,7 @@ impl ErrorExt for Error { | MitoWriteOperation { source, .. } | MitoFlushOperation { source, .. } | MitoSyncOperation { source, .. } + | MitoEnterStagingOperation { source, .. } | BatchOpenMitoRegion { source, .. } | BatchCatchupMitoRegion { source, .. } => source.status_code(), diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 7926ae198ade..a3686251bb5b 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -55,7 +55,7 @@ lazy_static = "1.4" log-store = { workspace = true } mito-codec.workspace = true moka = { workspace = true, features = ["sync", "future"] } -object-store.workspace = true +object-store = { workspace = true, features = ["testing"] } parquet = { workspace = true, features = ["async"] } paste.workspace = true pin-project.workspace = true diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 3705132e41f0..e63ae7f3f66e 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -501,7 +501,7 @@ impl Compactor for DefaultCompactor { // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later. compaction_region .manifest_ctx - .update_manifest(RegionLeaderState::Writable, action_list) + .update_manifest(RegionLeaderState::Writable, action_list, false) .await?; Ok(edit) diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 8488c9af9e50..c952f4ba97c6 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -117,7 +117,7 @@ impl CompactionTaskImpl { }; if let Err(e) = compaction_region .manifest_ctx - .update_manifest(current_region_state, action_list) + .update_manifest(current_region_state, action_list, false) .await { warn!( diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index d5948da3c183..2aa26ba204a0 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) { check_ttl(&engine, &Duration::from_secs(500)); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_write_stall_on_altering() { common_telemetry::init_default_ut_logging(); @@ -952,6 +952,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) { .await .unwrap(); }); + // Make sure the loop is handling the alter request. + tokio::time::sleep(Duration::from_millis(100)).await; let column_schemas_cloned = column_schemas.clone(); let engine_cloned = engine.clone(); @@ -962,6 +964,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) { }; put_rows(&engine_cloned, region_id, rows).await; }); + // Make sure the loop is handling the put request. + tokio::time::sleep(Duration::from_millis(100)).await; listener.wake_notify(); alter_job.await.unwrap(); diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index ebc20ac28045..277c9a40503e 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -74,6 +74,9 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that region starts to send a region change result to worker. async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {} + /// Notifies the listener that region starts to send a enter staging result to worker. + async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {} + /// Notifies the listener that the index build task is executed successfully. async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {} @@ -307,6 +310,37 @@ impl EventListener for NotifyRegionChangeResultListener { region_id ); self.notify.notified().await; + info!( + "Continue to sending region change result for region {}", + region_id + ); + } +} + +#[derive(Default)] +pub struct NotifyEnterStagingResultListener { + notify: Notify, +} + +impl NotifyEnterStagingResultListener { + /// Continue to sending enter staging result. + pub fn wake_notify(&self) { + self.notify.notify_one(); + } +} + +#[async_trait] +impl EventListener for NotifyEnterStagingResultListener { + async fn on_enter_staging_result_begin(&self, region_id: RegionId) { + info!( + "Wait on notify to start notify enter staging result for region {}", + region_id + ); + self.notify.notified().await; + info!( + "Continue to sending enter staging result for region {}", + region_id + ); } } diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 6d802a5d9d67..91816a4f9f92 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -14,17 +14,30 @@ //! Integration tests for staging state functionality. +use std::assert_matches::assert_matches; use std::fs; +use std::sync::Arc; +use std::time::Duration; use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; +use object_store::Buffer; +use object_store::layers::mock::{ + Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder, + Result as MockResult, Write, Writer, +}; use store_api::region_engine::{RegionEngine, SettableRegionRoleState}; use store_api::region_request::{ - RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest, + EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, RegionRequest, + RegionTruncateRequest, }; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; +use crate::engine::listener::NotifyEnterStagingResultListener; +use crate::error::Error; use crate::region::{RegionLeaderState, RegionRoleState}; use crate::request::WorkerRequest; use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; @@ -214,6 +227,8 @@ async fn test_staging_state_validation_patterns() { ); } +const PARTITION_EXPR: &str = "partition_expr"; + #[tokio::test] async fn test_staging_manifest_directory() { test_staging_manifest_directory_with_format(false).await; @@ -221,6 +236,7 @@ async fn test_staging_manifest_directory() { } async fn test_staging_manifest_directory_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { @@ -255,10 +271,58 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { // Now test staging mode manifest creation // Set region to staging mode using the engine API engine - .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: PARTITION_EXPR.to_string(), + }), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone(); + assert_eq!(staging_partition_expr.unwrap(), PARTITION_EXPR); + { + let manager = region.manifest_ctx.manifest_manager.read().await; + assert_eq!( + manager + .staging_manifest() + .unwrap() + .metadata + .partition_expr + .as_deref() + .unwrap(), + PARTITION_EXPR + ); + assert!(manager.manifest().metadata.partition_expr.is_none()); + } + + // Should be ok to enter staging mode again with the same partition expr + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: PARTITION_EXPR.to_string(), + }), + ) .await .unwrap(); + // Should throw error if try to enter staging mode again with a different partition expr + let err = engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: "".to_string(), + }), + ) + .await + .unwrap_err(); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::StagingPartitionExprMismatch { .. } + ); + // Put some data and flush in staging mode let rows_data = Rows { schema: column_schemas.clone(), @@ -312,6 +376,7 @@ async fn test_staging_exit_success_with_manifests() { } async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { @@ -330,16 +395,28 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) .await .unwrap(); + // Add some data and flush in staging mode to generate staging manifests + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows_data).await; + // Enter staging mode engine - .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: PARTITION_EXPR.to_string(), + }), + ) .await .unwrap(); // Add some data and flush in staging mode to generate staging manifests let rows_data = Rows { schema: column_schemas.clone(), - rows: build_rows(0, 5), + rows: build_rows(3, 8), }; put_rows(&engine, region_id, rows_data).await; @@ -357,7 +434,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) // Add more data and flush again to generate multiple staging manifests let rows_data2 = Rows { schema: column_schemas.clone(), - rows: build_rows(5, 10), + rows: build_rows(8, 10), }; put_rows(&engine, region_id, rows_data2).await; @@ -382,8 +459,11 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) .unwrap(); assert_eq!( staging_files_before.len(), - 2, - "Staging manifest directory should contain two files before exit" + // Two files for flush operation + // One file for entering staging mode + 3, + "Staging manifest directory should contain 3 files before exit, got: {:?}", + staging_files_before ); // Count normal manifest files before exit @@ -394,8 +474,11 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) .unwrap(); let normal_count_before = normal_files_before.len(); assert_eq!( - normal_count_before, 1, - "Normal manifest directory should initially contain one file" + // One file for table creation + // One file for flush operation + normal_count_before, + 2, + "Normal manifest directory should initially contain 2 files" ); // Try read data before exiting staging, SST files should be invisible @@ -403,8 +486,8 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!( scanner.num_files(), - 0, - "No SST files should be scanned before exit" + 1, + "1 SST files should be scanned before exit" ); assert_eq!( scanner.num_memtables(), @@ -415,14 +498,20 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) let batches = RecordBatches::try_collect(stream).await.unwrap(); let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum(); assert_eq!( - total_rows, 0, - "No data should be readable before exit staging mode" + total_rows, 3, + "3 rows should be readable before exit staging mode" ); // Inspect SSTs from manifest let sst_entries = engine.all_ssts_from_manifest().await; - assert_eq!(sst_entries.len(), 2); - assert!(sst_entries.iter().all(|e| !e.visible)); + assert_eq!( + sst_entries.len(), + 3, + "sst entries should be 3, got: {:?}", + sst_entries + ); + assert_eq!(sst_entries.iter().filter(|e| e.visible).count(), 1); + assert_eq!(sst_entries.iter().filter(|e| !e.visible).count(), 2); // Exit staging mode successfully engine @@ -470,7 +559,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!( scanner.num_files(), - 2, + 3, "SST files should be scanned after exit" ); @@ -482,6 +571,209 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) // Inspect SSTs from manifest let sst_entries = engine.all_ssts_from_manifest().await; - assert_eq!(sst_entries.len(), 2); + assert_eq!(sst_entries.len(), 3); assert!(sst_entries.iter().all(|e| e.visible)); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_write_stall_on_enter_staging() { + test_write_stall_on_enter_staging_with_format(false).await; + test_write_stall_on_enter_staging_with_format(true).await; +} + +async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) { + let mut env = TestEnv::new().await; + let listener = Arc::new(NotifyEnterStagingResultListener::default()); + let engine = env + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let engine_cloned = engine.clone(); + let alter_job = tokio::spawn(async move { + engine_cloned + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: PARTITION_EXPR.to_string(), + }), + ) + .await + .unwrap(); + }); + // Make sure the loop is handling the alter request. + tokio::time::sleep(Duration::from_millis(100)).await; + + let column_schemas_cloned = column_schemas.clone(); + let engine_cloned = engine.clone(); + let put_job = tokio::spawn(async move { + let rows = Rows { + schema: column_schemas_cloned, + rows: build_rows(0, 3), + }; + put_rows(&engine_cloned, region_id, rows).await; + }); + // Make sure the loop is handling the put request. + tokio::time::sleep(Duration::from_millis(100)).await; + + listener.wake_notify(); + alter_job.await.unwrap(); + put_job.await.unwrap(); + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).await.unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_enter_staging_clean_staging_manifest_error() { + common_telemetry::init_default_ut_logging(); + test_enter_staging_clean_staging_manifest_error_with_format(false).await; + test_enter_staging_clean_staging_manifest_error_with_format(true).await; +} + +struct MockLister { + path: String, + inner: Lister, +} + +impl List for MockLister { + async fn next(&mut self) -> MockResult> { + if self.path.contains("staging") { + return Err(MockError::new(ErrorKind::Unexpected, "mock error")); + } + self.inner.next().await + } +} + +struct MockWriter { + path: String, + inner: Writer, +} + +impl Write for MockWriter { + async fn write(&mut self, bs: Buffer) -> MockResult<()> { + self.inner.write(bs).await + } + + async fn close(&mut self) -> MockResult { + if self.path.contains("staging") { + return Err(MockError::new(ErrorKind::Unexpected, "mock error")); + } + self.inner.close().await + } + + async fn abort(&mut self) -> MockResult<()> { + self.inner.abort().await + } +} + +async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) { + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + let region_id = RegionId::new(1024, 0); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let err = engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: PARTITION_EXPR.to_string(), + }), + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::StorageUnavailable); + let region = engine.get_region(region_id).unwrap(); + assert!( + region + .manifest_ctx + .manifest_manager + .read() + .await + .staging_manifest() + .is_none() + ); + let state = region.state(); + assert_eq!(state, RegionRoleState::Leader(RegionLeaderState::Writable)); +} + +async fn test_enter_staging_clean_staging_manifest_error_with_format(flat_format: bool) { + let mock_layer = MockLayerBuilder::default() + .lister_factory(Arc::new(|path, _args, lister| { + Box::new(MockLister { + path: path.to_string(), + inner: lister, + }) + })) + .build() + .unwrap(); + let mut env = TestEnv::new().await.with_mock_layer(mock_layer); + test_enter_staging_error(&mut env, flat_format).await; +} + +#[tokio::test] +async fn test_enter_staging_save_staging_manifest_error() { + common_telemetry::init_default_ut_logging(); + test_enter_staging_save_staging_manifest_error_with_format(false).await; + test_enter_staging_save_staging_manifest_error_with_format(true).await; +} + +async fn test_enter_staging_save_staging_manifest_error_with_format(flat_format: bool) { + let mock_layer = MockLayerBuilder::default() + .writer_factory(Arc::new(|path, _args, lister| { + Box::new(MockWriter { + path: path.to_string(), + inner: lister, + }) + })) + .build() + .unwrap(); + let mut env = TestEnv::new().await.with_mock_layer(mock_layer); + test_enter_staging_error(&mut env, flat_format).await; +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b002568def0d..d357c68774a7 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1150,6 +1150,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Staging partition expr mismatch, manifest: {:?}, request: {}", + manifest_expr, + request_expr + ))] + StagingPartitionExprMismatch { + manifest_expr: Option, + request_expr: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1196,7 +1208,8 @@ impl ErrorExt for Error { | InstallManifestTo { .. } | Unexpected { .. } | SerializeColumnMetadata { .. } - | SerializeManifest { .. } => StatusCode::Unexpected, + | SerializeManifest { .. } + | StagingPartitionExprMismatch { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index efd10e96a265..50bbf599417d 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -208,7 +208,7 @@ impl WriteBufferManager for WriteBufferManagerImpl { } /// Reason of a flush task. -#[derive(Debug, IntoStaticStr)] +#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)] pub enum FlushReason { /// Other reasons. Others, @@ -222,6 +222,8 @@ pub enum FlushReason { Periodically, /// Flush memtable during downgrading state. Downgrading, + /// Enter staging mode. + EnterStaging, } impl FlushReason { @@ -253,6 +255,8 @@ pub(crate) struct RegionFlushTask { pub(crate) index_options: IndexOptions, /// Semaphore to control flush concurrency. pub(crate) flush_semaphore: Arc, + /// Whether the region is in staging mode. + pub(crate) is_staging: bool, } impl RegionFlushTask { @@ -316,6 +320,7 @@ impl RegionFlushTask { _timer: timer, edit, memtables_to_remove, + is_staging: self.is_staging, }; WorkerRequest::Background { region_id: self.region_id, @@ -398,7 +403,10 @@ impl RegionFlushTask { flushed_sequence: Some(version_data.committed_sequence), committed_sequence: None, }; - info!("Applying {edit:?} to region {}", self.region_id); + info!( + "Applying {edit:?} to region {}, is_staging: {}", + self.region_id, self.is_staging + ); let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); @@ -417,11 +425,12 @@ impl RegionFlushTask { // add a cleanup job to remove them later. let version = self .manifest_ctx - .update_manifest(expected_state, action_list) + .update_manifest(expected_state, action_list, self.is_staging) .await?; info!( - "Successfully update manifest version to {version}, region: {}, reason: {}", + "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}", self.region_id, + self.is_staging, self.reason.as_str() ); @@ -1292,6 +1301,7 @@ mod tests { .await, index_options: IndexOptions::default(), flush_semaphore: Arc::new(Semaphore::new(2)), + is_staging: false, }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler @@ -1334,6 +1344,7 @@ mod tests { manifest_ctx: manifest_ctx.clone(), index_options: IndexOptions::default(), flush_semaphore: Arc::new(Semaphore::new(2)), + is_staging: false, }) .collect(); // Schedule first task. diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs index 3cb0694e71dd..1da03dda2197 100644 --- a/src/mito2/src/manifest/checkpointer.rs +++ b/src/mito2/src/manifest/checkpointer.rs @@ -25,7 +25,6 @@ use crate::manifest::action::{RegionCheckpoint, RegionManifest}; use crate::manifest::manager::RegionManifestOptions; use crate::manifest::storage::ManifestObjectStore; use crate::metrics::MANIFEST_OP_ELAPSED; -use crate::region::{RegionLeaderState, RegionRoleState}; /// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way. #[derive(Debug)] @@ -137,20 +136,7 @@ impl Checkpointer { /// Check if it's needed to do checkpoint for the region by the checkpoint distance. /// If needed, and there's no currently running checkpoint task, it will start a new checkpoint /// task running in the background. - pub(crate) fn maybe_do_checkpoint( - &self, - manifest: &RegionManifest, - region_state: RegionRoleState, - ) { - // Skip checkpoint if region is in staging state - if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) { - info!( - "Skipping checkpoint for region {} in staging mode, manifest version: {}", - manifest.metadata.region_id, manifest.manifest_version - ); - return; - } - + pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) { if self.manifest_options.checkpoint_distance == 0 { return; } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 13ea0a399e41..81e69d1539b8 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -151,6 +151,10 @@ pub struct RegionManifestManager { last_version: Arc, checkpointer: Checkpointer, manifest: Arc, + // Staging manifest is used to store the manifest of the staging region before it becomes available. + // It is initially inherited from the previous manifest(i.e., `self.manifest`). + // When the staging manifest becomes available, it will be used to construct the new manifest. + staging_manifest: Option>, stats: ManifestStats, stopped: bool, } @@ -229,6 +233,7 @@ impl RegionManifestManager { last_version: manifest_version, checkpointer, manifest: Arc::new(manifest), + staging_manifest: None, stats: stats.clone(), stopped: false, }) @@ -334,6 +339,8 @@ impl RegionManifestManager { last_version: manifest_version, checkpointer, manifest: Arc::new(manifest), + // TODO(weny): open the staging manifest if exists. + staging_manifest: None, stats: stats.clone(), stopped: false, })) @@ -504,7 +511,7 @@ impl RegionManifestManager { pub async fn update( &mut self, action_list: RegionMetaActionList, - region_state: RegionRoleState, + is_staging: bool, ) -> Result { let _t = MANIFEST_OP_ELAPSED .with_label_values(&["update"]) @@ -518,13 +525,19 @@ impl RegionManifestManager { ); let version = self.increase_version(); - let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging); self.store .save(version, &action_list.encode()?, is_staging) .await?; + // For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`). + // When the staging manifest becomes available, it will be used to construct the new manifest. let mut manifest_builder = - RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone())); + if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() { + RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone())) + } else { + RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone())) + }; + for action in action_list.actions { match action { RegionMetaAction::Change(action) => { @@ -544,17 +557,27 @@ impl RegionManifestManager { } } } - let new_manifest = manifest_builder.try_build()?; - new_manifest - .removed_files - .update_file_removed_cnt_to_stats(&self.stats); - let updated_manifest = self - .checkpointer - .update_manifest_removed_files(new_manifest)?; - self.manifest = Arc::new(updated_manifest); - self.checkpointer - .maybe_do_checkpoint(self.manifest.as_ref(), region_state); + if is_staging { + let new_manifest = manifest_builder.try_build()?; + self.staging_manifest = Some(Arc::new(new_manifest)); + + info!( + "Skipping checkpoint for region {} in staging mode, manifest version: {}", + self.manifest.metadata.region_id, self.manifest.manifest_version + ); + } else { + let new_manifest = manifest_builder.try_build()?; + new_manifest + .removed_files + .update_file_removed_cnt_to_stats(&self.stats); + let updated_manifest = self + .checkpointer + .update_manifest_removed_files(new_manifest)?; + self.manifest = Arc::new(updated_manifest); + self.checkpointer + .maybe_do_checkpoint(self.manifest.as_ref()); + } Ok(version) } @@ -575,6 +598,11 @@ impl RegionManifestManager { self.manifest.clone() } + /// Retrieves the current [RegionManifest]. + pub fn staging_manifest(&self) -> Option> { + self.staging_manifest.clone() + } + /// Returns total manifest size. pub fn manifest_usage(&self) -> u64 { self.store.total_manifest_size() @@ -711,6 +739,22 @@ impl RegionManifestManager { Ok(Some(RegionMetaActionList::new(merged_actions))) } + + /// Unsets the staging manifest. + pub(crate) fn unset_staging_manifest(&mut self) { + self.staging_manifest = None; + } + + /// Clear all staging manifests. + pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> { + self.staging_manifest = None; + self.store.clear_staging_manifests().await?; + info!( + "Cleared all staging manifests for region {}", + self.manifest.metadata.region_id + ); + Ok(()) + } } #[cfg(test)] @@ -837,13 +881,7 @@ mod test { sst_format: FormatType::PrimaryKey, })); - let current_version = manager - .update( - action_list, - RegionRoleState::Leader(RegionLeaderState::Writable), - ) - .await - .unwrap(); + let current_version = manager.update(action_list, false).await.unwrap(); assert_eq!(current_version, 1); manager.validate_manifest(&new_metadata, 1); @@ -906,13 +944,7 @@ mod test { sst_format: FormatType::PrimaryKey, })); - let current_version = manager - .update( - action_list, - RegionRoleState::Leader(RegionLeaderState::Writable), - ) - .await - .unwrap(); + let current_version = manager.update(action_list, false).await.unwrap(); assert_eq!(current_version, 1); manager.validate_manifest(&new_metadata, 1); @@ -933,7 +965,7 @@ mod test { flushed_sequence: None, committed_sequence: None, })]), - RegionRoleState::Leader(RegionLeaderState::Writable), + false, ) .await .unwrap(); diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 6f5ca235e1ef..da063fe24257 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -27,7 +27,6 @@ use crate::manifest::action::{ use crate::manifest::manager::RegionManifestManager; use crate::manifest::storage::CheckpointMetadata; use crate::manifest::tests::utils::basic_region_metadata; -use crate::region::{RegionLeaderState, RegionRoleState}; use crate::sst::file::FileMeta; use crate::test_util::TestEnv; @@ -87,13 +86,7 @@ async fn manager_without_checkpoint() { // apply 10 actions for _ in 0..10 { - manager - .update( - nop_action(), - RegionRoleState::Leader(RegionLeaderState::Writable), - ) - .await - .unwrap(); + manager.update(nop_action(), false).await.unwrap(); } // no checkpoint @@ -138,13 +131,7 @@ async fn manager_with_checkpoint_distance_1() { // apply 10 actions for _ in 0..10 { - manager - .update( - nop_action(), - RegionRoleState::Leader(RegionLeaderState::Writable), - ) - .await - .unwrap(); + manager.update(nop_action(), false).await.unwrap(); while manager.checkpointer().is_doing_checkpoint() { tokio::time::sleep(Duration::from_millis(10)).await; @@ -205,13 +192,7 @@ async fn test_corrupted_data_causing_checksum_error() { // Apply actions for _ in 0..10 { - manager - .update( - nop_action(), - RegionRoleState::Leader(RegionLeaderState::Writable), - ) - .await - .unwrap(); + manager.update(nop_action(), false).await.unwrap(); } // Wait for the checkpoint to finish. @@ -302,10 +283,7 @@ async fn generate_checkpoint_with_compression_types( let (_env, mut manager) = build_manager(1, compress_type).await; for action in actions { - manager - .update(action, RegionRoleState::Leader(RegionLeaderState::Writable)) - .await - .unwrap(); + manager.update(action, false).await.unwrap(); while manager.checkpointer().is_doing_checkpoint() { tokio::time::sleep(Duration::from_millis(10)).await; @@ -361,10 +339,7 @@ async fn manifest_install_manifest_to() { let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await; let (files, actions) = generate_action_lists(10); for action in actions { - manager - .update(action, RegionRoleState::Leader(RegionLeaderState::Writable)) - .await - .unwrap(); + manager.update(action, false).await.unwrap(); } // Nothing to install @@ -402,10 +377,7 @@ async fn manifest_install_manifest_to_with_checkpoint() { let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await; let (files, actions) = generate_action_lists(10); for action in actions { - manager - .update(action, RegionRoleState::Leader(RegionLeaderState::Writable)) - .await - .unwrap(); + manager.update(action, false).await.unwrap(); while manager.checkpointer().is_doing_checkpoint() { tokio::time::sleep(Duration::from_millis(10)).await; @@ -477,13 +449,7 @@ async fn test_checkpoint_bypass_in_staging_mode() { // Apply actions in staging mode - checkpoint should be bypassed for _ in 0..15 { - manager - .update( - nop_action(), - RegionRoleState::Leader(RegionLeaderState::Staging), - ) - .await - .unwrap(); + manager.update(nop_action(), true).await.unwrap(); } assert!(!manager.checkpointer().is_doing_checkpoint()); @@ -498,13 +464,7 @@ async fn test_checkpoint_bypass_in_staging_mode() { ); // Now switch to normal mode and apply one more action - manager - .update( - nop_action(), - RegionRoleState::Leader(RegionLeaderState::Writable), - ) - .await - .unwrap(); + manager.update(nop_action(), false).await.unwrap(); // Wait for potential checkpoint while manager.checkpointer().is_doing_checkpoint() { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 1244647bfe11..2a85c4067736 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -22,7 +22,7 @@ pub(crate) mod version; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use common_telemetry::{error, info, warn}; use crossbeam_utils::atomic::AtomicCell; @@ -77,6 +77,8 @@ pub enum RegionLeaderState { Writable, /// The region is in staging mode - writable but no checkpoint/compaction. Staging, + /// The region is entering staging mode. - write requests will be stalled. + EnteringStaging, /// The region is altering. Altering, /// The region is dropping. @@ -138,6 +140,14 @@ pub struct MitoRegion { pub(crate) topic_latest_entry_id: AtomicU64, /// The total bytes written to the region. pub(crate) written_bytes: Arc, + /// The partition expression of the region in staging mode. + /// + /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated, + /// so we need to store the partition expression separately. + /// TODO(weny): + /// 1. Reload the staging partition expr during region open. + /// 2. Rejects requests with mismatching partition expr. + pub(crate) staging_partition_expr: Mutex>, /// manifest stats stats: ManifestStats, } @@ -326,11 +336,19 @@ impl MitoRegion { ) } + /// Sets the entering staging state. + pub(crate) fn set_entering_staging(&self) -> Result<()> { + self.compare_exchange_state( + RegionLeaderState::Writable, + RegionRoleState::Leader(RegionLeaderState::EnteringStaging), + ) + } + /// Exits the staging state back to writable. /// /// You should call this method in the worker loop. /// Transitions from Staging to Writable state. - fn exit_staging(&self) -> Result<()> { + pub fn exit_staging(&self) -> Result<()> { self.compare_exchange_state( RegionLeaderState::Staging, RegionRoleState::Leader(RegionLeaderState::Writable), @@ -457,10 +475,7 @@ impl MitoRegion { sst_format: current_version.options.sst_format.unwrap_or_default(), }); let result = manager - .update( - RegionMetaActionList::with_action(action), - RegionRoleState::Leader(RegionLeaderState::Writable), - ) + .update(RegionMetaActionList::with_action(action), false) .await; match result { @@ -492,6 +507,16 @@ impl MitoRegion { } } + /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`. + /// Otherwise, logs an error. + pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) { + if let Err(e) = + self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging)) + { + error!(e; "failed to switch region state to staging, expect state is {:?}", expect); + } + } + /// Returns the region statistic. pub(crate) fn region_statistic(&self) -> RegionStatistic { let version = self.version(); @@ -575,10 +600,19 @@ impl MitoRegion { .flat_map(|level| level.files().map(|file| file.file_id().file_id())) .collect::>(); - self.manifest_ctx - .manifest() + let manifest_files = self.manifest_ctx.manifest().await.files.clone(); + let staging_files = self + .manifest_ctx + .staging_manifest() .await - .files + .map(|m| m.files.clone()) + .unwrap_or_default(); + let files = manifest_files + .into_iter() + .chain(staging_files.into_iter()) + .collect::>(); + + files .values() .map(|meta| { let region_id = self.region_id; @@ -654,9 +688,8 @@ impl MitoRegion { }; // Submit merged actions using the manifest manager's update method - // Pass the target state (Writable) so it saves to normal directory, not staging - let target_state = RegionRoleState::Leader(RegionLeaderState::Writable); - let new_version = manager.update(merged_actions.clone(), target_state).await?; + // Pass the `false` so it saves to normal directory, not staging + let new_version = manager.update(merged_actions.clone(), false).await?; info!( "Successfully submitted merged staged manifests for region {}, new version: {}", @@ -731,6 +764,7 @@ impl ManifestContext { &self, expect_state: RegionLeaderState, action_list: RegionMetaActionList, + is_staging: bool, ) -> Result { // Acquires the write lock of the manifest manager. let mut manager = self.manifest_manager.write().await; @@ -806,7 +840,7 @@ impl ManifestContext { } // Now we can update the manifest. - let version = manager.update(action_list, current_state).await.inspect_err( + let version = manager.update(action_list, is_staging).await.inspect_err( |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id), )?; @@ -913,9 +947,17 @@ impl ManifestContext { } } + /// Returns the normal manifest of the region. pub(crate) async fn manifest(&self) -> Arc { self.manifest_manager.read().await.manifest() } + + /// Returns the staging manifest of the region. + pub(crate) async fn staging_manifest( + &self, + ) -> Option> { + self.manifest_manager.read().await.staging_manifest() + } } pub(crate) type ManifestContextRef = Arc; @@ -1213,8 +1255,8 @@ impl ManifestStats { #[cfg(test)] mod tests { - use std::sync::Arc; use std::sync::atomic::AtomicU64; + use std::sync::{Arc, Mutex}; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::create_temp_dir; @@ -1404,6 +1446,7 @@ mod tests { topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), stats: ManifestStats::default(), + staging_partition_expr: Mutex::new(None), }; // Test initial state diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f844319b70c7..5538d1bf21de 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -16,8 +16,8 @@ use std::any::TypeId; use std::collections::HashMap; -use std::sync::Arc; use std::sync::atomic::{AtomicI64, AtomicU64}; +use std::sync::{Arc, Mutex}; use std::time::Instant; use common_telemetry::{debug, error, info, warn}; @@ -334,6 +334,7 @@ impl RegionOpener { topic_latest_entry_id: AtomicU64::new(0), written_bytes: Arc::new(AtomicU64::new(0)), stats: self.stats, + staging_partition_expr: Mutex::new(None), })) } @@ -563,6 +564,8 @@ impl RegionOpener { topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), written_bytes: Arc::new(AtomicU64::new(0)), stats: self.stats.clone(), + // TODO(weny): reload the staging partition expr from the manifest. + staging_partition_expr: Mutex::new(None), }; let region = Arc::new(region); @@ -973,6 +976,7 @@ fn can_load_cache(state: RegionRoleState) -> bool { RegionRoleState::Leader(RegionLeaderState::Writable) | RegionRoleState::Leader(RegionLeaderState::Staging) | RegionRoleState::Leader(RegionLeaderState::Altering) + | RegionRoleState::Leader(RegionLeaderState::EnteringStaging) | RegionRoleState::Leader(RegionLeaderState::Editing) | RegionRoleState::Follower => true, // The region will be closed soon if it is downgrading. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4bac6b6266e1..0f54204ac69b 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -36,9 +36,10 @@ use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint} use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{ - AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest, - RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, - RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, + AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest, + RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, + RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + RegionTruncateRequest, }; use store_api::storage::{FileId, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -726,6 +727,11 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Catchup((v, None)), }), + RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::EnterStaging(v), + }), RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts { metadata: region_metadata, sender: sender.into(), @@ -823,6 +829,7 @@ pub(crate) enum DdlRequest { BuildIndex(RegionBuildIndexRequest), Truncate(RegionTruncateRequest), Catchup((RegionCatchupRequest, Option)), + EnterStaging(EnterStagingRequest), } /// Sender and Ddl request. @@ -859,6 +866,8 @@ pub(crate) enum BackgroundNotify { RegionChange(RegionChangeResult), /// Region edit result. RegionEdit(RegionEditResult), + /// Enter staging result. + EnterStaging(EnterStagingResult), } /// Notifies a flush job is finished. @@ -876,6 +885,8 @@ pub(crate) struct FlushFinished { pub(crate) edit: RegionEdit, /// Memtables to remove. pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, + /// Whether the region is in staging mode. + pub(crate) is_staging: bool, } impl FlushFinished { @@ -1000,6 +1011,19 @@ pub(crate) struct RegionChangeResult { pub(crate) new_options: Option, } +/// Notifies the region the result of entering staging. +#[derive(Debug)] +pub(crate) struct EnterStagingResult { + /// Region id. + pub(crate) region_id: RegionId, + /// The new partition expression to apply. + pub(crate) partition_expr: String, + /// Result sender. + pub(crate) sender: OptionOutputTx, + /// Result from the manifest manager. + pub(crate) result: Result<()>, +} + /// Request to edit a region directly. #[derive(Debug)] pub(crate) struct RegionEditRequest { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index c01f64556f65..fec2e4552ce8 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -776,6 +776,7 @@ impl IndexBuildTask { .update_manifest( RegionLeaderState::Writable, RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())), + false, ) .await?; info!( diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 8a78acd7bdea..bcb1db4331a3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -39,7 +39,7 @@ use common_meta::cache::{new_schema_cache, new_table_schema_cache}; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::memory::MemoryKvBackend; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use common_test_util::temp_dir::{TempDir, create_temp_dir}; use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions}; use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array}; @@ -50,6 +50,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use moka::future::CacheBuilder; use object_store::ObjectStore; +use object_store::layers::mock::MockLayer; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::services::Fs; use rskafka::client::partition::{Compression, UnknownTopicHandling}; @@ -228,6 +229,7 @@ pub struct TestEnv { file_ref_manager: FileReferenceManagerRef, kv_backend: KvBackendRef, partition_expr_fetcher: PartitionExprFetcherRef, + object_store_mock_layer: Option, } impl TestEnv { @@ -264,6 +266,7 @@ impl TestEnv { file_ref_manager: Arc::new(FileReferenceManager::new(None)), kv_backend, partition_expr_fetcher: noop_partition_expr_fetcher(), + object_store_mock_layer: None, } } @@ -273,6 +276,12 @@ impl TestEnv { self } + /// Sets the original `object_store_mock_layer`. + pub fn with_mock_layer(mut self, mock_layer: MockLayer) -> TestEnv { + self.object_store_mock_layer = Some(mock_layer); + self + } + pub fn get_object_store(&self) -> Option { self.object_store_manager .as_ref() @@ -569,7 +578,16 @@ impl TestEnv { let data_home = self.data_home.path(); let data_path = data_home.join("data").as_path().display().to_string(); let builder = Fs::default().root(&data_path); - let object_store = ObjectStore::new(builder).unwrap().finish(); + + let object_store = if let Some(mock_layer) = self.object_store_mock_layer.as_ref() { + debug!("create object store with mock layer"); + ObjectStore::new(builder) + .unwrap() + .layer(mock_layer.clone()) + .finish() + } else { + ObjectStore::new(builder).unwrap().finish() + }; ObjectStoreManager::new("default", object_store) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index cf97d2a63b60..d80a83264a0e 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -21,6 +21,7 @@ mod handle_close; mod handle_compaction; mod handle_create; mod handle_drop; +mod handle_enter_staging; mod handle_flush; mod handle_manifest; mod handle_open; @@ -1039,8 +1040,7 @@ impl RegionWorkerLoop { continue; } DdlRequest::Flush(req) => { - self.handle_flush_request(ddl.region_id, req, ddl.sender) - .await; + self.handle_flush_request(ddl.region_id, req, ddl.sender); continue; } DdlRequest::Compact(req) => { @@ -1063,6 +1063,15 @@ impl RegionWorkerLoop { .await; continue; } + DdlRequest::EnterStaging(req) => { + self.handle_enter_staging_request( + ddl.region_id, + req.partition_expr, + ddl.sender, + ) + .await; + continue; + } }; ddl.sender.send(res); @@ -1111,6 +1120,7 @@ impl RegionWorkerLoop { BackgroundNotify::RegionChange(req) => { self.handle_manifest_region_change_result(req).await } + BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await, BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await, } } @@ -1272,6 +1282,13 @@ impl WorkerListener { } } + pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_enter_staging_result_begin(_region_id).await; + } + } + pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) { #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 39a1fa665a5d..a8a4a3f46c50 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -113,7 +113,13 @@ impl RegionWorkerLoop { info!("Flush region: {} before alteration", region_id); // Try to submit a flush task. - let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone()); + let task = self.new_flush_task( + ®ion, + FlushReason::Alter, + None, + self.config.clone(), + region.is_staging(), + ); if let Err(e) = self.flush_scheduler .schedule_flush(region.region_id, ®ion.version_control, task) diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs new file mode 100644 index 000000000000..6dee72525ebc --- /dev/null +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -0,0 +1,249 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use common_telemetry::{error, info, warn}; +use store_api::logstore::LogStore; +use store_api::region_request::EnterStagingRequest; +use store_api::storage::RegionId; + +use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu}; +use crate::flush::FlushReason; +use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList}; +use crate::region::{MitoRegionRef, RegionLeaderState}; +use crate::request::{ + BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest, + WorkerRequest, WorkerRequestWithTime, +}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_enter_staging_request( + &mut self, + region_id: RegionId, + partition_expr: String, + mut sender: OptionOutputTx, + ) { + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { + return; + }; + + // If the region is already in staging mode, verify the partition expr matches. + if region.is_staging() { + let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone(); + // If the partition expr mismatch, return error. + if staging_partition_expr.as_ref() != Some(&partition_expr) { + sender.send(Err(StagingPartitionExprMismatchSnafu { + manifest_expr: staging_partition_expr, + request_expr: partition_expr, + } + .build())); + return; + } + + // If the partition expr matches, return success. + sender.send(Ok(0)); + return; + } + + let version = region.version(); + if !version.memtables.is_empty() { + // If memtable is not empty, we can't enter staging directly and need to flush + // all memtables first. + info!("Flush region: {} before entering staging", region_id); + debug_assert!(!region.is_staging()); + let task = self.new_flush_task( + ®ion, + FlushReason::EnterStaging, + None, + self.config.clone(), + region.is_staging(), + ); + if let Err(e) = + self.flush_scheduler + .schedule_flush(region.region_id, ®ion.version_control, task) + { + // Unable to flush the region, send error to waiter. + sender.send(Err(e)); + return; + } + + // Safety: We have requested flush. + self.flush_scheduler + .add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender, + request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }), + }); + + return; + } + + self.handle_enter_staging(region, partition_expr, sender); + } + + async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> { + let now = Instant::now(); + // First step: clear all staging manifest files. + { + let mut manager = region.manifest_ctx.manifest_manager.write().await; + manager + .clear_staging_manifest_and_dir() + .await + .inspect_err(|e| { + error!( + e; + "Failed to clear staging manifest files for region {}", + region.region_id + ); + })?; + + info!( + "Cleared all staging manifest files for region {}, elapsed: {:?}", + region.region_id, + now.elapsed(), + ); + } + + // Second step: write new staging manifest. + let mut new_meta = (*region.metadata()).clone(); + new_meta.partition_expr = Some(partition_expr.clone()); + let sst_format = region.version().options.sst_format.unwrap_or_default(); + let change = RegionChange { + metadata: Arc::new(new_meta), + sst_format, + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change)); + region + .manifest_ctx + .update_manifest(RegionLeaderState::EnteringStaging, action_list, true) + .await?; + + Ok(()) + } + + fn handle_enter_staging( + &self, + region: MitoRegionRef, + partition_expr: String, + sender: OptionOutputTx, + ) { + if let Err(e) = region.set_entering_staging() { + sender.send(Err(e)); + return; + } + + let listener = self.listener.clone(); + let request_sender = self.sender.clone(); + common_runtime::spawn_global(async move { + let now = Instant::now(); + let result = Self::enter_staging(®ion, partition_expr.clone()).await; + match result { + Ok(_) => { + info!( + "Created staging manifest for region {}, elapsed: {:?}", + region.region_id, + now.elapsed(), + ); + } + Err(ref e) => { + // Unset the staging manifest + region + .manifest_ctx + .manifest_manager + .write() + .await + .unset_staging_manifest(); + error!( + "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}", + region.region_id, + e, + now.elapsed(), + ); + } + } + + let notify = WorkerRequest::Background { + region_id: region.region_id, + notify: BackgroundNotify::EnterStaging(EnterStagingResult { + region_id: region.region_id, + sender, + result, + partition_expr, + }), + }; + listener + .on_enter_staging_result_begin(region.region_id) + .await; + + if let Err(res) = request_sender + .send(WorkerRequestWithTime::new(notify)) + .await + { + warn!( + "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}", + region.region_id, res + ); + } + }); + } + + /// Handles enter staging result. + pub(crate) async fn handle_enter_staging_result( + &mut self, + enter_staging_result: EnterStagingResult, + ) { + let region = match self.regions.get_region(enter_staging_result.region_id) { + Some(region) => region, + None => { + self.reject_region_stalled_requests(&enter_staging_result.region_id); + enter_staging_result.sender.send( + RegionNotFoundSnafu { + region_id: enter_staging_result.region_id, + } + .fail(), + ); + return; + } + }; + + if enter_staging_result.result.is_ok() { + info!( + "Updating region {} staging partition expr to {}", + region.region_id, enter_staging_result.partition_expr + ); + Self::update_region_staging_partition_expr( + ®ion, + enter_staging_result.partition_expr, + ); + region.switch_state_to_staging(RegionLeaderState::EnteringStaging); + } else { + region.switch_state_to_writable(RegionLeaderState::EnteringStaging); + } + enter_staging_result + .sender + .send(enter_staging_result.result.map(|_| 0)); + // Handles the stalled requests. + self.handle_region_stalled_requests(&enter_staging_result.region_id) + .await; + } + + fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) { + let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap(); + debug_assert!(staging_partition_expr.is_none()); + *staging_partition_expr = Some(partition_expr); + } +} diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 04dbb4ae7877..8b9e750ffb2e 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -76,8 +76,13 @@ impl RegionWorkerLoop { if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. - let task = - self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone()); + let task = self.new_flush_task( + region, + FlushReason::EngineFull, + None, + self.config.clone(), + region.is_staging(), + ); self.flush_scheduler.schedule_flush( region.region_id, ®ion.version_control, @@ -91,8 +96,13 @@ impl RegionWorkerLoop { if let Some(region) = max_mem_region && !self.flush_scheduler.is_flush_requested(region.region_id) { - let task = - self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone()); + let task = self.new_flush_task( + region, + FlushReason::EngineFull, + None, + self.config.clone(), + region.is_staging(), + ); self.flush_scheduler .schedule_flush(region.region_id, ®ion.version_control, task)?; } @@ -107,6 +117,7 @@ impl RegionWorkerLoop { reason: FlushReason, row_group_size: Option, engine_config: Arc, + is_staging: bool, ) -> RegionFlushTask { RegionFlushTask { region_id: region.region_id, @@ -121,13 +132,14 @@ impl RegionWorkerLoop { manifest_ctx: region.manifest_ctx.clone(), index_options: region.version().options.index_options.clone(), flush_semaphore: self.flush_semaphore.clone(), + is_staging, } } } impl RegionWorkerLoop { /// Handles manual flush request. - pub(crate) async fn handle_flush_request( + pub(crate) fn handle_flush_request( &mut self, region_id: RegionId, request: RegionFlushRequest, @@ -147,8 +159,13 @@ impl RegionWorkerLoop { FlushReason::Manual }; - let mut task = - self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); + let mut task = self.new_flush_task( + ®ion, + reason, + request.row_group_size, + self.config.clone(), + region.is_staging(), + ); task.push_sender(sender); if let Err(e) = self.flush_scheduler @@ -178,6 +195,7 @@ impl RegionWorkerLoop { FlushReason::Periodically, None, self.config.clone(), + region.is_staging(), ); self.flush_scheduler.schedule_flush( region.region_id, @@ -208,11 +226,8 @@ impl RegionWorkerLoop { } }; - // Check if region is currently in staging mode - let is_staging = region.manifest_ctx.current_state() - == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging); - - if is_staging { + if request.is_staging { + // Skip the region metadata update. info!( "Skipping region metadata update for region {} in staging mode", region_id diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index c91c7adc6b61..433c440639e3 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -346,6 +346,7 @@ impl RegionWorkerLoop { let request_sender = self.sender.clone(); let manifest_ctx = region.manifest_ctx.clone(); + let is_staging = region.is_staging(); // Updates manifest in background. common_runtime::spawn_global(async move { @@ -354,7 +355,7 @@ impl RegionWorkerLoop { RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); let result = manifest_ctx - .update_manifest(RegionLeaderState::Truncating, action_list) + .update_manifest(RegionLeaderState::Truncating, action_list, is_staging) .await .map(|_| ()); @@ -391,6 +392,7 @@ impl RegionWorkerLoop { } let listener = self.listener.clone(); let request_sender = self.sender.clone(); + let is_staging = region.is_staging(); // Now the region is in altering state. common_runtime::spawn_global(async move { let new_meta = change.metadata.clone(); @@ -398,7 +400,7 @@ impl RegionWorkerLoop { let result = region .manifest_ctx - .update_manifest(RegionLeaderState::Altering, action_list) + .update_manifest(RegionLeaderState::Altering, action_list, is_staging) .await .map(|_| ()); let notify = WorkerRequest::Background { @@ -463,6 +465,7 @@ async fn edit_region( listener: WorkerListener, ) -> Result<()> { let region_id = region.region_id; + let is_staging = region.is_staging(); if let Some(write_cache) = cache_manager.write_cache() { for file_meta in &edit.files_to_add { let write_cache = write_cache.clone(); @@ -532,7 +535,7 @@ async fn edit_region( let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); region .manifest_ctx - .update_manifest(RegionLeaderState::Editing, action_list) + .update_manifest(RegionLeaderState::Editing, action_list, is_staging) .await .map(|_| ()) } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index e86aa676305e..32309f5e1333 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -241,6 +241,12 @@ impl RegionWorkerLoop { // No such region. continue; }; + #[cfg(test)] + debug!( + "Handling write request for region {}, state: {:?}", + region_id, + region.state() + ); match region.state() { RegionRoleState::Leader(RegionLeaderState::Writable) | RegionRoleState::Leader(RegionLeaderState::Staging) => { @@ -263,6 +269,16 @@ impl RegionWorkerLoop { self.stalled_requests.push(sender_req); continue; } + RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => { + debug!( + "Region {} is entering staging, add request to pending writes", + region.region_id + ); + self.stalling_count.add(1); + WRITE_STALL_TOTAL.inc(); + self.stalled_requests.push(sender_req); + continue; + } state => { // The region is not writable. sender_req.sender.send( diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 8a1a8775670e..cb02ca149940 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -9,6 +9,7 @@ workspace = true [features] services-memory = ["opendal/services-memory"] +testing = ["derive_builder"] [dependencies] bytes.workspace = true @@ -16,6 +17,7 @@ common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true +derive_builder = { workspace = true, optional = true } futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true diff --git a/src/object-store/src/layers.rs b/src/object-store/src/layers.rs index 7b111927e209..00b18a70de25 100644 --- a/src/object-store/src/layers.rs +++ b/src/object-store/src/layers.rs @@ -13,6 +13,8 @@ // limitations under the License. mod lru_cache; +#[cfg(feature = "testing")] +pub mod mock; pub use lru_cache::*; pub use opendal::layers::*; diff --git a/src/object-store/src/layers/mock.rs b/src/object-store/src/layers/mock.rs new file mode 100644 index 000000000000..0ee0f73b21eb --- /dev/null +++ b/src/object-store/src/layers/mock.rs @@ -0,0 +1,217 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use derive_builder::Builder; +pub use oio::*; +pub use opendal::raw::{ + Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, + RpWrite, oio, +}; +pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result}; + +pub type MockWriterFactory = Arc oio::Writer + Send + Sync>; +pub type MockReaderFactory = Arc oio::Reader + Send + Sync>; +pub type MockListerFactory = Arc oio::Lister + Send + Sync>; +pub type MockDeleterFactory = Arc oio::Deleter + Send + Sync>; + +#[derive(Builder)] +pub struct MockLayer { + #[builder(setter(strip_option), default)] + writer_factory: Option, + #[builder(setter(strip_option), default)] + reader_factory: Option, + #[builder(setter(strip_option), default)] + lister_factory: Option, + #[builder(setter(strip_option), default)] + deleter_factory: Option, +} + +impl Clone for MockLayer { + fn clone(&self) -> Self { + Self { + writer_factory: self.writer_factory.clone(), + reader_factory: self.reader_factory.clone(), + lister_factory: self.lister_factory.clone(), + deleter_factory: self.deleter_factory.clone(), + } + } +} + +impl Layer for MockLayer { + type LayeredAccess = MockAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + MockAccessor { + inner, + writer_factory: self.writer_factory.clone(), + reader_factory: self.reader_factory.clone(), + lister_factory: self.lister_factory.clone(), + deleter_factory: self.deleter_factory.clone(), + } + } +} + +pub struct MockAccessor { + inner: A, + writer_factory: Option, + reader_factory: Option, + lister_factory: Option, + deleter_factory: Option, +} + +impl Debug for MockAccessor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MockAccessor") + .field("inner", &self.inner) + .finish() + } +} + +pub struct MockReader { + inner: oio::Reader, +} + +impl oio::Read for MockReader { + async fn read(&mut self) -> Result { + self.inner.read().await + } +} + +pub struct MockWriter { + inner: oio::Writer, +} + +impl oio::Write for MockWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.inner.write(bs).await + } + + async fn close(&mut self) -> Result { + self.inner.close().await + } + + async fn abort(&mut self) -> Result<()> { + self.inner.abort().await + } +} + +pub struct MockLister { + inner: oio::Lister, +} + +impl oio::List for MockLister { + async fn next(&mut self) -> Result> { + self.inner.next().await + } +} + +pub struct MockDeleter { + inner: oio::Deleter, +} + +impl oio::Delete for MockDeleter { + fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args) + } + + async fn flush(&mut self) -> Result { + self.inner.flush().await + } +} + +impl LayeredAccess for MockAccessor { + type Inner = A; + type Reader = MockReader; + type Writer = MockWriter; + type Lister = MockLister; + type Deleter = MockDeleter; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + if let Some(reader_factory) = self.reader_factory.as_ref() { + let (rp_read, reader) = self.inner.read(path, args.clone()).await?; + let reader = reader_factory(path, args, Box::new(reader)); + Ok((rp_read, MockReader { inner: reader })) + } else { + self.inner.read(path, args).await.map(|(rp_read, reader)| { + ( + rp_read, + MockReader { + inner: Box::new(reader), + }, + ) + }) + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if let Some(writer_factory) = self.writer_factory.as_ref() { + let (rp_write, writer) = self.inner.write(path, args.clone()).await?; + let writer = writer_factory(path, args, Box::new(writer)); + Ok((rp_write, MockWriter { inner: writer })) + } else { + self.inner + .write(path, args) + .await + .map(|(rp_write, writer)| { + ( + rp_write, + MockWriter { + inner: Box::new(writer), + }, + ) + }) + } + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + if let Some(deleter_factory) = self.deleter_factory.as_ref() { + let (rp_delete, deleter) = self.inner.delete().await?; + let deleter = deleter_factory(Box::new(deleter)); + Ok((rp_delete, MockDeleter { inner: deleter })) + } else { + self.inner.delete().await.map(|(rp_delete, deleter)| { + ( + rp_delete, + MockDeleter { + inner: Box::new(deleter), + }, + ) + }) + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + if let Some(lister_factory) = self.lister_factory.as_ref() { + let (rp_list, lister) = self.inner.list(path, args.clone()).await?; + let lister = lister_factory(path, args, Box::new(lister)); + Ok((rp_list, MockLister { inner: lister })) + } else { + self.inner.list(path, args).await.map(|(rp_list, lister)| { + ( + rp_list, + MockLister { + inner: Box::new(lister), + }, + ) + }) + } + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index bbd532c17c7d..b582db0a952c 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -151,6 +151,7 @@ pub enum RegionRequest { Truncate(RegionTruncateRequest), Catchup(RegionCatchupRequest), BulkInserts(RegionBulkInsertsRequest), + EnterStaging(EnterStagingRequest), } impl RegionRequest { @@ -1416,6 +1417,17 @@ impl RegionBulkInsertsRequest { } } +/// Request to stage a region with a new region rule(partition expression). +/// +/// This request transitions a region into the staging mode. +/// It first flushes the memtable for the old region rule if it is not empty, +/// then enters the staging mode with the new region rule. +#[derive(Debug, Clone)] +pub struct EnterStagingRequest { + /// The partition expression of the staging region. + pub partition_expr: String, +} + impl fmt::Display for RegionRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -1432,6 +1444,7 @@ impl fmt::Display for RegionRequest { RegionRequest::Truncate(_) => write!(f, "Truncate"), RegionRequest::Catchup(_) => write!(f, "Catchup"), RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"), + RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"), } } }