Skip to content

Commit 01efe89

Browse files
committed
chore: apply suggestions from CR
Signed-off-by: WenyXu <[email protected]>
1 parent 904f005 commit 01efe89

File tree

7 files changed

+70
-33
lines changed

7 files changed

+70
-33
lines changed

src/metric-engine/src/engine.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod options;
2323
mod put;
2424
mod read;
2525
mod region_metadata;
26+
mod staging;
2627
mod state;
2728
mod sync;
2829

@@ -210,12 +211,7 @@ impl RegionEngine for MetricEngine {
210211
let result = match request {
211212
RegionRequest::EnterStaging(_) => {
212213
if self.inner.is_physical_region(region_id) {
213-
self.inner
214-
.mito
215-
.handle_request(region_id, request)
216-
.await
217-
.context(error::MitoEnterStagingOperationSnafu)
218-
.map(|response| response.affected_rows)
214+
self.handle_enter_staging_request(region_id, request).await
219215
} else {
220216
UnsupportedRegionRequestSnafu { request }.fail()
221217
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_base::AffectedRows;
16+
use snafu::ResultExt;
17+
use store_api::region_engine::RegionEngine;
18+
use store_api::region_request::{EnterStagingRequest, RegionRequest};
19+
use store_api::storage::RegionId;
20+
21+
use crate::engine::MetricEngine;
22+
use crate::error::{MitoEnterStagingOperationSnafu, Result};
23+
use crate::utils;
24+
25+
impl MetricEngine {
26+
/// Handles the enter staging request for the given region.
27+
pub(crate) async fn handle_enter_staging_request(
28+
&self,
29+
region_id: RegionId,
30+
request: RegionRequest,
31+
) -> Result<AffectedRows> {
32+
let metadata_region_id = utils::to_metadata_region_id(region_id);
33+
let data_region_id = utils::to_data_region_id(region_id);
34+
35+
// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
36+
self.inner
37+
.mito
38+
.handle_request(
39+
metadata_region_id,
40+
RegionRequest::EnterStaging(EnterStagingRequest {
41+
partition_expr: String::new(),
42+
}),
43+
)
44+
.await
45+
.context(MitoEnterStagingOperationSnafu)?;
46+
47+
self.inner
48+
.mito
49+
.handle_request(data_region_id, request)
50+
.await
51+
.context(MitoEnterStagingOperationSnafu)
52+
.map(|response| response.affected_rows)
53+
}
54+
}

src/mito2/src/engine/staging_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
471471
// One file for flush operation
472472
normal_count_before,
473473
2,
474-
"Normal manifest directory should initially contain 2 file"
474+
"Normal manifest directory should initially contain 2 files"
475475
);
476476

477477
// Try read data before exiting staging, SST files should be invisible

src/mito2/src/manifest/manager.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ pub struct RegionManifestManager {
151151
last_version: Arc<AtomicU64>,
152152
checkpointer: Checkpointer,
153153
manifest: Arc<RegionManifest>,
154+
// Staging manifest is used to store the manifest of the staging region before it becomes available.
155+
// It is initially inherited from the previous manifest(i.e., `self.manifest`).
156+
// When the staging manifest becomes available, it will be used to construct the new manifest.
154157
staging_manifest: Option<Arc<RegionManifest>>,
155158
stats: ManifestStats,
156159
stopped: bool,
@@ -557,7 +560,6 @@ impl RegionManifestManager {
557560

558561
if is_staging {
559562
let new_manifest = manifest_builder.try_build()?;
560-
debug!("Built staging manifest: {:?}", new_manifest);
561563
self.staging_manifest = Some(Arc::new(new_manifest));
562564

563565
info!(

src/mito2/src/region.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -516,8 +516,13 @@ impl MitoRegion {
516516
}
517517

518518
/// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
519-
pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) -> Result<()> {
520-
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
519+
/// Otherwise, logs an error.
520+
pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
521+
if let Err(e) =
522+
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
523+
{
524+
error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
525+
}
521526
}
522527

523528
/// Returns the region statistic.
@@ -610,15 +615,10 @@ impl MitoRegion {
610615
.await
611616
.map(|m| m.files.clone())
612617
.unwrap_or_default();
613-
debug!(
614-
"manifest files: {:?}, staging files: {:?}",
615-
manifest_files, staging_files
616-
);
617618
let files = manifest_files
618619
.into_iter()
619620
.chain(staging_files.into_iter())
620621
.collect::<HashMap<_, _>>();
621-
debug!("files: {:?}", files);
622622

623623
files
624624
.values()

src/mito2/src/worker/handle_enter_staging.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -201,22 +201,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
201201
}
202202
};
203203

204-
let clean_staging_manifests = |region: MitoRegionRef| {
205-
common_runtime::spawn_global(async move {
206-
let mut manager = region.manifest_ctx.manifest_manager.write().await;
207-
if let Err(e) = manager.clear_staging_manifests().await {
208-
error!(e; "Failed to clear staging manifests after failed to switch region state to staging");
209-
}
210-
});
211-
};
212-
213-
if let Err(e) = region.switch_state_to_staging(RegionLeaderState::EnteringStaging) {
214-
error!(e; "Failed to switch region state to staging");
215-
enter_staging_result.sender.send(Err(e));
216-
clean_staging_manifests(region);
217-
return;
218-
}
219-
220204
if enter_staging_result.result.is_ok() {
221205
info!(
222206
"Updating region {} staging partition expr to {}",
@@ -226,8 +210,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
226210
&region,
227211
enter_staging_result.partition_expr,
228212
);
213+
region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
229214
} else {
230-
clean_staging_manifests(region);
215+
region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
231216
}
232217
enter_staging_result
233218
.sender

src/store-api/src/region_request.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1410,7 +1410,7 @@ impl RegionBulkInsertsRequest {
14101410
/// Request to stage a region with a new region rule(partition expression).
14111411
///
14121412
/// This request transitions a region into the staging mode.
1413-
/// It first flushes the memtable for the old region rule,
1413+
/// It first flushes the memtable for the old region rule if it is not empty,
14141414
/// then enters the staging mode with the new region rule.
14151415
#[derive(Debug, Clone)]
14161416
pub struct EnterStagingRequest {

0 commit comments

Comments
 (0)