Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,8 @@ impl RegionServerInner {
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_) => RegionChange::None,
| RegionRequest::BuildIndex(_)
| RegionRequest::EnterStaging(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

Expand Down
8 changes: 8 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod options;
mod put;
mod read;
mod region_metadata;
mod staging;
mod state;
mod sync;

Expand Down Expand Up @@ -211,6 +212,13 @@ impl RegionEngine for MetricEngine {
let mut extension_return_value = HashMap::new();

let result = match request {
RegionRequest::EnterStaging(_) => {
if self.inner.is_physical_region(region_id) {
self.handle_enter_staging_request(region_id, request).await
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner
Expand Down
54 changes: 54 additions & 0 deletions src/metric-engine/src/engine/staging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_base::AffectedRows;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngine;
use crate::error::{MitoEnterStagingOperationSnafu, Result};
use crate::utils;

impl MetricEngine {
/// Handles the enter staging request for the given region.
pub(crate) async fn handle_enter_staging_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);

// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
self.inner
.mito
.handle_request(
metadata_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: String::new(),
}),
)
.await
.context(MitoEnterStagingOperationSnafu)?;

self.inner
.mito
.handle_request(data_region_id, request)
.await
.context(MitoEnterStagingOperationSnafu)
.map(|response| response.affected_rows)
}
}
8 changes: 8 additions & 0 deletions src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Mito enter staging operation fails"))]
MitoEnterStagingOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
Expand Down Expand Up @@ -360,6 +367,7 @@ impl ErrorExt for Error {
| MitoWriteOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoSyncOperation { source, .. }
| MitoEnterStagingOperation { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true
object-store = { workspace = true, features = ["testing"] }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ impl Compactor for DefaultCompactor {
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
.manifest_ctx
.update_manifest(RegionLeaderState::Writable, action_list)
.update_manifest(RegionLeaderState::Writable, action_list, false)
.await?;

Ok(edit)
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CompactionTaskImpl {
};
if let Err(e) = compaction_region
.manifest_ctx
.update_manifest(current_region_state, action_list)
.update_manifest(current_region_state, action_list, false)
.await
{
warn!(
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
check_ttl(&engine, &Duration::from_secs(500));
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();

Expand Down Expand Up @@ -952,6 +952,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
.await
.unwrap();
});
// Make sure the loop is handling the alter request.
tokio::time::sleep(Duration::from_millis(100)).await;

let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
Expand All @@ -962,6 +964,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
};
put_rows(&engine_cloned, region_id, rows).await;
});
// Make sure the loop is handling the put request.
tokio::time::sleep(Duration::from_millis(100)).await;

listener.wake_notify();
alter_job.await.unwrap();
Expand Down
34 changes: 34 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that region starts to send a region change result to worker.
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}

/// Notifies the listener that region starts to send a enter staging result to worker.
async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {}

/// Notifies the listener that the index build task is executed successfully.
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}

Expand Down Expand Up @@ -307,6 +310,37 @@ impl EventListener for NotifyRegionChangeResultListener {
region_id
);
self.notify.notified().await;
info!(
"Continue to sending region change result for region {}",
region_id
);
}
}

#[derive(Default)]
pub struct NotifyEnterStagingResultListener {
notify: Notify,
}

impl NotifyEnterStagingResultListener {
/// Continue to sending enter staging result.
pub fn wake_notify(&self) {
self.notify.notify_one();
}
}

#[async_trait]
impl EventListener for NotifyEnterStagingResultListener {
async fn on_enter_staging_result_begin(&self, region_id: RegionId) {
info!(
"Wait on notify to start notify enter staging result for region {}",
region_id
);
self.notify.notified().await;
info!(
"Continue to sending enter staging result for region {}",
region_id
);
}
}

Expand Down
Loading
Loading