Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ criterion = { version = "0.7", features = ["async_futures"] }
# compile time and binary size.
deltalake = { version = "0.26", features = ["s3", "gcs", "datafusion"] }
faiss = { version = "0.12.2-alpha.0", features = ["static"] }
foyer = { version = "0.20.0", features = ["serde", "tracing", "nightly"] }
# TODO(MrCroxx): Switch to a release version.
foyer = { git = "https://github.com/foyer-rs/foyer", rev = "f8c68089548f3635ac641b03ae15b55e16f344e4", features = ["serde", "tracing", "nightly"] }
futures-async-stream = "0.2.9"
governor = { version = "0.10", default-features = false, features = ["std"] }
hashbrown = { version = "0.16", features = [
Expand All @@ -166,6 +167,9 @@ hashbrown = { version = "0.16", features = [
hashbrown0_14 = { package = "hashbrown", version = "0.14", features = [
"nightly",
] }
hashbrown0_15 = { package = "hashbrown", version = "0.15", features = [
"nightly",
] }
hytra = "0.1"
# branch dev_rebase_main_20250325
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0b5d05b20e8a5277a47585141082a7a638238ae1", features = [
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hashbrown = { workspace = true }
hashbrown0_14 = { workspace = true }
hashbrown0_15 = { workspace = true }
hex = "0.4.3"
http = "1"
humantime = "2.3"
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::atomic::Ordering;
pub use ahash::RandomState;
use hashbrown::HashTable;
use hashbrown::hash_table::Entry;
use hashbrown0_14 as _;
use {hashbrown0_14 as _, hashbrown0_15 as _};

use crate::sequence::{AtomicSequence, Sequence, Sequencer};

Expand Down
18 changes: 6 additions & 12 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,17 +496,14 @@ impl MonitorService for MonitorServiceImpl {
options =
options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
options = options
.with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
options = options
.with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
options = options
.with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
options = options.with_record_hybrid_get_or_fetch_threshold(Duration::from_millis(
threshold as _,
));
}
cache.update_tracing_options(options);
}
Expand All @@ -526,17 +523,14 @@ impl MonitorService for MonitorServiceImpl {
options =
options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
options = options
.with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
options = options
.with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
options = options
.with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
options = options.with_record_hybrid_get_or_fetch_threshold(Duration::from_millis(
threshold as _,
));
}
cache.update_tracing_options(options);
}
Expand Down
29 changes: 13 additions & 16 deletions src/storage/benches/bench_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,17 @@ impl FoyerCache {
#[async_trait]
impl CacheBase for FoyerCache {
async fn try_get_with(&self, sst_object_id: u64, block_idx: u64) -> HummockResult<Arc<Block>> {
let latency = self.fake_io_latency;
let entry = self
.inner
.fetch((sst_object_id, block_idx), || {
let latency = self.fake_io_latency;
async move {
get_fake_block(sst_object_id, block_idx, latency)
.await
.map(Arc::new)
.map_err(foyer::Error::other)
}
.get_or_fetch(&(sst_object_id, block_idx), move |_| async move {
get_fake_block(sst_object_id, block_idx, latency)
.await
.map(Arc::new)
.map_err(foyer::MemoryError::other)
})
.await
.map_err(foyer::Error::from)
.map_err(HummockError::foyer_error)?;
Ok(entry.value().clone())
}
Expand Down Expand Up @@ -220,16 +219,14 @@ impl FoyerHybridCache {
#[async_trait]
impl CacheBase for FoyerHybridCache {
async fn try_get_with(&self, sst_object_id: u64, block_idx: u64) -> HummockResult<Arc<Block>> {
let latency = self.fake_io_latency;
let entry = self
.inner
.fetch((sst_object_id, block_idx), || {
let latency = self.fake_io_latency;
async move {
get_fake_block(sst_object_id, block_idx, latency)
.await
.map(Arc::new)
.map_err(foyer::Error::other)
}
.get_or_fetch(&(sst_object_id, block_idx), move |_| async move {
get_fake_block(sst_object_id, block_idx, latency)
.await
.map(Arc::new)
.map_err(foyer::Error::other)
})
.await
.map_err(HummockError::foyer_error)?;
Expand Down
20 changes: 10 additions & 10 deletions src/storage/src/hummock/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::ops::Deref;
use std::sync::Arc;

use await_tree::{InstrumentAwait, SpanExt};
use foyer::{FetchState, HybridCacheEntry, HybridFetch};
use foyer::{FetchState, HybridCacheEntry, HybridGetOrFetch};
use risingwave_common::config::EvictionConfig;

use super::{Block, HummockResult, SstableBlockIndex};
Expand Down Expand Up @@ -81,27 +81,27 @@ pub struct BlockCacheConfig {

pub enum BlockResponse {
Block(BlockHolder),
Entry(HybridFetch<SstableBlockIndex, Box<Block>>),
Fetch(HybridGetOrFetch<SstableBlockIndex, Box<Block>>),
}

impl BlockResponse {
pub async fn wait(self) -> HummockResult<BlockHolder> {
let entry = match self {
let fetch = match self {
BlockResponse::Block(block) => return Ok(block),
BlockResponse::Entry(entry) => entry,
BlockResponse::Fetch(fetch) => fetch,
};
match entry.state() {
FetchState::Hit => entry
match fetch.state() {
FetchState::Hit => fetch
.await
.map(BlockHolder::from_hybrid_cache_entry)
.map_err(HummockError::foyer_error),
FetchState::Wait => entry
.instrument_await("wait_pending_fetch_block".verbose())
_ if fetch.is_leader() => fetch
.instrument_await("fetch_block".verbose())
.await
.map(BlockHolder::from_hybrid_cache_entry)
.map_err(HummockError::foyer_error),
FetchState::Miss => entry
.instrument_await("fetch_block".verbose())
_ => fetch
.instrument_await("wait_pending_fetch_block".verbose())
.await
.map(BlockHolder::from_hybrid_cache_entry)
.map_err(HummockError::foyer_error),
Expand Down
Loading
Loading