Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,8 @@ impl RegionServerInner {
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_) => RegionChange::None,
| RegionRequest::BuildIndex(_)
| RegionRequest::EnterStaging(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

Expand Down
8 changes: 8 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod options;
mod put;
mod read;
mod region_metadata;
mod staging;
mod state;
mod sync;

Expand Down Expand Up @@ -211,6 +212,13 @@ impl RegionEngine for MetricEngine {
let mut extension_return_value = HashMap::new();

let result = match request {
RegionRequest::EnterStaging(_) => {
if self.inner.is_physical_region(region_id) {
self.handle_enter_staging_request(region_id, request).await
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner
Expand Down
54 changes: 54 additions & 0 deletions src/metric-engine/src/engine/staging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_base::AffectedRows;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngine;
use crate::error::{MitoEnterStagingOperationSnafu, Result};
use crate::utils;

impl MetricEngine {
/// Handles the enter staging request for the given region.
pub(crate) async fn handle_enter_staging_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);

// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
self.inner
.mito
.handle_request(
metadata_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: String::new(),
}),
)
.await
.context(MitoEnterStagingOperationSnafu)?;

self.inner
.mito
.handle_request(data_region_id, request)
.await
.context(MitoEnterStagingOperationSnafu)
.map(|response| response.affected_rows)
}
}
8 changes: 8 additions & 0 deletions src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Mito enter staging operation fails"))]
MitoEnterStagingOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
Expand Down Expand Up @@ -360,6 +367,7 @@ impl ErrorExt for Error {
| MitoWriteOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoSyncOperation { source, .. }
| MitoEnterStagingOperation { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true
object-store = { workspace = true, features = ["testing"] }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ impl Compactor for DefaultCompactor {
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
.manifest_ctx
.update_manifest(RegionLeaderState::Writable, action_list)
.update_manifest(RegionLeaderState::Writable, action_list, false)
.await?;

Ok(edit)
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CompactionTaskImpl {
};
if let Err(e) = compaction_region
.manifest_ctx
.update_manifest(current_region_state, action_list)
.update_manifest(current_region_state, action_list, false)
.await
{
warn!(
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
check_ttl(&engine, &Duration::from_secs(500));
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();

Expand Down Expand Up @@ -952,6 +952,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
.await
.unwrap();
});
// Make sure the loop is handling the alter request.
tokio::time::sleep(Duration::from_millis(100)).await;

let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
Expand All @@ -962,6 +964,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
};
put_rows(&engine_cloned, region_id, rows).await;
});
// Make sure the loop is handling the put request.
tokio::time::sleep(Duration::from_millis(100)).await;

listener.wake_notify();
alter_job.await.unwrap();
Expand Down
34 changes: 34 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that region starts to send a region change result to worker.
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}

/// Notifies the listener that region starts to send a enter staging result to worker.
async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {}

/// Notifies the listener that the index build task is executed successfully.
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}

Expand Down Expand Up @@ -307,6 +310,37 @@ impl EventListener for NotifyRegionChangeResultListener {
region_id
);
self.notify.notified().await;
info!(
"Continue to sending region change result for region {}",
region_id
);
}
}

#[derive(Default)]
pub struct NotifyEnterStagingResultListener {
notify: Notify,
}

impl NotifyEnterStagingResultListener {
/// Continue to sending enter staging result.
pub fn wake_notify(&self) {
self.notify.notify_one();
}
}

#[async_trait]
impl EventListener for NotifyEnterStagingResultListener {
async fn on_enter_staging_result_begin(&self, region_id: RegionId) {
info!(
"Wait on notify to start notify enter staging result for region {}",
region_id
);
self.notify.notified().await;
info!(
"Continue to sending enter staging result for region {}",
region_id
);
}
}

Expand Down
Loading