diff --git a/crates/core/common/src/catalog/logical.rs b/crates/core/common/src/catalog/logical.rs index 202313cf1..9f553b687 100644 --- a/crates/core/common/src/catalog/logical.rs +++ b/crates/core/common/src/catalog/logical.rs @@ -1,6 +1,5 @@ use std::{ collections::{BTreeMap, BTreeSet}, - fmt, sync::Arc, }; @@ -16,12 +15,15 @@ use datasets_common::{ use js_runtime::isolate_pool::IsolatePool; use serde::Deserialize; -use crate::{BlockNum, SPECIAL_BLOCK_NUM, js_udf::JsUdf, sql::TableReference}; +use crate::{BlockNum, SPECIAL_BLOCK_NUM, js_udf::JsUdf}; pub mod for_admin_api; pub mod for_dump; pub mod for_manifest_validation; pub mod for_query; +pub mod table; + +pub use table::LogicalTable; /// Identifies a dataset and its data schema. #[derive(Clone, Debug)] @@ -114,72 +116,6 @@ impl Table { } } -/// A table that holds a reference to its dataset. -#[derive(Debug, Clone)] -pub struct ResolvedTable { - table: Table, - /// The dataset reference portion of SQL table references. - /// - /// SQL table references have the format `.` (e.g., `anvil_rpc.blocks`). - /// This field stores the string form of the `` portion - the schema under - /// which this table is registered in the catalog and referenced in SQL queries. - sql_table_ref_schema: String, - dataset_reference: HashReference, - dataset_start_block: Option, -} - -impl fmt::Display for ResolvedTable { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.table_ref()) - } -} - -impl ResolvedTable { - pub fn new( - table: Table, - sql_table_ref_schema: String, - dataset_reference: HashReference, - dataset_start_block: Option, - ) -> Self { - Self { - table, - sql_table_ref_schema, - dataset_reference, - dataset_start_block, - } - } - - pub fn table(&self) -> &Table { - &self.table - } - - pub fn table_ref(&self) -> TableReference { - TableReference::partial(self.sql_table_ref_schema.clone(), self.table.name.clone()) - } - - pub fn dataset_reference(&self) -> &HashReference { - &self.dataset_reference - } - - pub fn dataset_start_block(&self) -> Option { - self.dataset_start_block - } - - /// Bare table name - pub fn name(&self) -> &TableName { - &self.table.name - } - - /// Returns the dataset reference portion of SQL table references. - /// - /// SQL table references have the format `.
` (e.g., `anvil_rpc.blocks`). - /// This returns the string form of the `` portion - the schema under which - /// this table is registered in the catalog and referenced in SQL queries. - pub fn sql_table_ref_schema(&self) -> &str { - &self.sql_table_ref_schema - } -} - #[derive(Debug, Clone, Deserialize)] pub struct Function { pub name: String, @@ -198,13 +134,13 @@ pub struct FunctionSource { #[derive(Clone, Debug)] pub struct LogicalCatalog { - pub tables: Vec, + pub tables: Vec, /// UDFs specific to the datasets corresponding to the resolved tables. pub udfs: Vec, } impl LogicalCatalog { - pub fn from_tables<'a>(tables: impl Iterator) -> Self { + pub fn from_tables<'a>(tables: impl Iterator) -> Self { Self { tables: tables.cloned().collect(), udfs: Vec::new(), diff --git a/crates/core/common/src/catalog/logical/for_admin_api.rs b/crates/core/common/src/catalog/logical/for_admin_api.rs index 9e5731d49..d620c1999 100644 --- a/crates/core/common/src/catalog/logical/for_admin_api.rs +++ b/crates/core/common/src/catalog/logical/for_admin_api.rs @@ -29,7 +29,7 @@ use crate::{ BoxError, catalog::{ dataset_access::DatasetAccess, - logical::{LogicalCatalog, ResolvedTable}, + logical::{LogicalCatalog, LogicalTable}, }, js_udf::JsUdf, sql::{FunctionReference, TableReference}, @@ -52,14 +52,14 @@ pub type TableReferencesMap = BTreeMap< /// ## Process /// /// 1. Flattens table references from the references map -/// 2. Resolves all table references to ResolvedTable instances +/// 2. Resolves all table references to LogicalTable instances /// 3. Flattens function references from the references map /// 4. Resolves all function references to ScalarUDF instances /// 5. Creates and returns a LogicalCatalog /// /// ## Related Functions /// -/// - [`resolve_tables`] - Resolves table references to ResolvedTable instances +/// - [`resolve_tables`] - Resolves table references to LogicalTable instances /// - [`resolve_udfs`] - Resolves function references to UDFs pub async fn create( dataset_store: &impl DatasetAccess, @@ -99,7 +99,7 @@ pub async fn create( #[derive(Debug, thiserror::Error)] pub enum CreateLogicalCatalogError { - /// Failed to resolve table references to ResolvedTable instances + /// Failed to resolve table references to LogicalTable instances #[error(transparent)] ResolveTables(ResolveTablesError), @@ -108,18 +108,18 @@ pub enum CreateLogicalCatalogError { ResolveUdfs(ResolveUdfsError), } -/// Resolves table references to ResolvedTable instances using pre-resolved dependencies. +/// Resolves table references to LogicalTable instances using pre-resolved dependencies. /// /// Processes each table reference across all tables, looks up datasets by hash, finds tables -/// within datasets, and creates ResolvedTable instances for catalog construction. +/// within datasets, and creates LogicalTable instances for catalog construction. async fn resolve_tables<'a>( dataset_store: &impl DatasetAccess, manifest_deps: &BTreeMap, refs: impl IntoIterator)> + 'a, -) -> Result, ResolveTablesError> { +) -> Result, ResolveTablesError> { // Use hash-based map to deduplicate datasets across ALL tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap, ResolvedTable>> = + // Inner map: table_ref -> LogicalTable (deduplicates table references) + let mut tables: BTreeMap, LogicalTable>> = BTreeMap::new(); // Process all table references - fail fast on first error @@ -170,12 +170,10 @@ async fn resolve_tables<'a>( reference: dataset_ref.clone(), })?; - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), + let resolved_table = LogicalTable::new( schema.to_string(), dataset_ref.clone(), - dataset.start_block, + dataset_table.clone(), ); // Insert into vacant entry @@ -184,7 +182,7 @@ async fn resolve_tables<'a>( } } - // Flatten to Vec + // Flatten to Vec Ok(tables .into_values() .flat_map(|map| map.into_values()) diff --git a/crates/core/common/src/catalog/logical/for_dump.rs b/crates/core/common/src/catalog/logical/for_dump.rs index 06a7bc772..76edad403 100644 --- a/crates/core/common/src/catalog/logical/for_dump.rs +++ b/crates/core/common/src/catalog/logical/for_dump.rs @@ -20,7 +20,7 @@ use datasets_common::{ use js_runtime::isolate_pool::IsolatePool; use crate::{ - BoxError, ResolvedTable, + BoxError, LogicalTable, catalog::{dataset_access::DatasetAccess, logical::LogicalCatalog}, js_udf::JsUdf, sql::{FunctionReference, TableReference}, @@ -46,7 +46,7 @@ pub type ResolvedReferences = ( /// /// The function: /// 1. Destructures the references tuple into table and function references -/// 2. Resolves table references to ResolvedTable instances using pre-resolved dependencies +/// 2. Resolves table references to LogicalTable instances using pre-resolved dependencies /// 3. Resolves function references to ScalarUDF instances /// 4. Returns a LogicalCatalog containing tables and UDFs pub async fn create( @@ -74,18 +74,18 @@ pub async fn create( Ok(LogicalCatalog { tables, udfs }) } -/// Resolves table references to ResolvedTable instances using pre-resolved dependencies. +/// Resolves table references to LogicalTable instances using pre-resolved dependencies. /// /// Processes each table reference, looks up the dataset by hash, finds the table -/// within the dataset, and creates a ResolvedTable for catalog construction. +/// within the dataset, and creates a LogicalTable for catalog construction. async fn resolve_tables( dataset_store: &impl DatasetAccess, manifest_deps: &BTreeMap, refs: impl IntoIterator>, -) -> Result, ResolveTablesError> { +) -> Result, ResolveTablesError> { // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap, ResolvedTable>> = + // Inner map: table_ref -> LogicalTable (deduplicates table references) + let mut tables: BTreeMap, LogicalTable>> = BTreeMap::new(); for table_ref in refs { @@ -131,12 +131,11 @@ async fn resolve_tables( reference: dataset_ref.clone(), })?; - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), + // Create LogicalTable + let resolved_table = LogicalTable::new( schema.to_string(), dataset_ref.clone(), - dataset.start_block, + dataset_table.clone(), ); // Insert into vacant entry @@ -145,7 +144,7 @@ async fn resolve_tables( } } - // Flatten to Vec + // Flatten to Vec Ok(tables .into_values() .flat_map(|map| map.into_values()) @@ -285,7 +284,7 @@ async fn resolve_udfs( /// a logical catalog for derived dataset execution. #[derive(Debug, thiserror::Error)] pub enum CreateCatalogError { - /// Failed to resolve table references to ResolvedTable instances. + /// Failed to resolve table references to LogicalTable instances. #[error(transparent)] ResolveTables(ResolveTablesError), diff --git a/crates/core/common/src/catalog/logical/for_manifest_validation.rs b/crates/core/common/src/catalog/logical/for_manifest_validation.rs index c53b6452e..cf7c0c9c7 100644 --- a/crates/core/common/src/catalog/logical/for_manifest_validation.rs +++ b/crates/core/common/src/catalog/logical/for_manifest_validation.rs @@ -29,7 +29,7 @@ use crate::{ BoxError, catalog::{ dataset_access::DatasetAccess, - logical::{LogicalCatalog, ResolvedTable}, + logical::{LogicalCatalog, LogicalTable}, }, js_udf::JsUdf, sql::{FunctionReference, TableReference}, @@ -50,7 +50,7 @@ pub type TableReferencesMap = BTreeMap< /// resolving dependency aliases to datasets for schema-only validation (no physical data access). /// /// Delegates to specialized helpers: -/// - [`resolve_tables`] - Resolves table references to `ResolvedTable` instances +/// - [`resolve_tables`] - Resolves table references to `LogicalTable` instances /// - [`resolve_udfs`] - Resolves function references to UDFs pub async fn create( dataset_store: &impl DatasetAccess, @@ -90,7 +90,7 @@ pub async fn create( #[derive(Debug, thiserror::Error)] pub enum CreateLogicalCatalogError { - /// Failed to resolve table references to ResolvedTable instances + /// Failed to resolve table references to LogicalTable instances #[error(transparent)] ResolveTables(ResolveTablesError), @@ -99,13 +99,13 @@ pub enum CreateLogicalCatalogError { ResolveUdfs(ResolveUdfsError), } -/// Resolves table references to ResolvedTable instances using pre-resolved dependencies. +/// Resolves table references to LogicalTable instances using pre-resolved dependencies. async fn resolve_tables<'a>( dataset_store: &impl DatasetAccess, manifest_deps: &BTreeMap, refs: impl IntoIterator)> + 'a, -) -> Result, ResolveTablesError> { - let mut tables: BTreeMap, ResolvedTable>> = +) -> Result, ResolveTablesError> { + let mut tables: BTreeMap, LogicalTable>> = BTreeMap::new(); for (table_name, table_ref) in refs { @@ -151,11 +151,10 @@ async fn resolve_tables<'a>( reference: dataset_ref.clone(), })?; - let resolved_table = ResolvedTable::new( - dataset_table.clone(), + let resolved_table = LogicalTable::new( schema.to_string(), dataset_ref.clone(), - dataset.start_block, + dataset_table.clone(), ); entry.insert(resolved_table); diff --git a/crates/core/common/src/catalog/logical/for_query.rs b/crates/core/common/src/catalog/logical/for_query.rs index 0b9ce43fa..2e3b8f7a8 100644 --- a/crates/core/common/src/catalog/logical/for_query.rs +++ b/crates/core/common/src/catalog/logical/for_query.rs @@ -13,7 +13,7 @@ use datasets_common::{ use js_runtime::isolate_pool::IsolatePool; use crate::{ - BoxError, ResolvedTable, + BoxError, LogicalTable, catalog::{dataset_access::DatasetAccess, logical::LogicalCatalog}, sql::{FunctionReference, TableReference}, }; @@ -66,17 +66,17 @@ pub async fn create( Ok(LogicalCatalog { tables, udfs }) } -/// Resolves table references to ResolvedTable instances using dynamic resolution. +/// Resolves table references to LogicalTable instances using dynamic resolution. /// /// Processes each table reference, resolves the dataset reference to a hash, -/// loads the dataset, finds the table, and creates a ResolvedTable for catalog construction. +/// loads the dataset, finds the table, and creates a LogicalTable for catalog construction. async fn resolve_tables( dataset_store: &impl DatasetAccess, refs: impl IntoIterator>, -) -> Result, ResolveTablesError> { +) -> Result, ResolveTablesError> { // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap, ResolvedTable>> = + // Inner map: table_ref -> LogicalTable (deduplicates table references) + let mut tables: BTreeMap, LogicalTable>> = BTreeMap::new(); for table_ref in refs { @@ -131,12 +131,10 @@ async fn resolve_tables( reference: dataset_ref.clone(), })?; - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), + let resolved_table = LogicalTable::new( schema.to_string(), dataset_ref.clone(), - dataset.start_block, + dataset_table.clone(), ); // Insert into vacant entry @@ -145,7 +143,7 @@ async fn resolve_tables( } } - // Flatten to Vec + // Flatten to Vec Ok(tables .into_values() .flat_map(|map| map.into_values()) @@ -244,7 +242,7 @@ async fn resolve_udfs( /// a logical catalog for Arrow Flight query planning (GetFlightInfo). #[derive(Debug, thiserror::Error)] pub enum CreateCatalogError { - /// Failed to resolve table references to ResolvedTable instances. + /// Failed to resolve table references to LogicalTable instances. #[error(transparent)] ResolveTables(ResolveTablesError), diff --git a/crates/core/common/src/catalog/logical/table.rs b/crates/core/common/src/catalog/logical/table.rs new file mode 100644 index 000000000..f21e7c91b --- /dev/null +++ b/crates/core/common/src/catalog/logical/table.rs @@ -0,0 +1,64 @@ +use std::fmt; + +use datasets_common::{hash_reference::HashReference, table_name::TableName}; + +use crate::{catalog::logical::Table, sql::TableReference}; + +/// A table that holds a reference to its dataset. +#[derive(Debug, Clone)] +pub struct LogicalTable { + /// The dataset reference portion of SQL table references. + /// + /// SQL table references have the format `.
` (e.g., `anvil_rpc.blocks`). + /// This field stores the string form of the `` portion - the schema under + /// which this table is registered in the catalog and referenced in SQL queries. + sql_table_ref_schema: String, + dataset_reference: HashReference, + table: Table, +} + +impl LogicalTable { + pub fn new( + sql_table_ref_schema: String, + dataset_reference: HashReference, + table: Table, + ) -> Self { + Self { + table, + sql_table_ref_schema, + dataset_reference, + } + } + + pub fn table(&self) -> &Table { + &self.table + } + + pub fn table_ref(&self) -> TableReference { + TableReference::partial(self.sql_table_ref_schema.clone(), self.table.name.clone()) + } + + pub fn dataset_reference(&self) -> &HashReference { + &self.dataset_reference + } + + /// Bare table name + pub fn name(&self) -> &TableName { + &self.table.name + } + + /// Returns the dataset reference portion of SQL table references. + /// + /// SQL table references have the format `.
` (e.g., `anvil_rpc.blocks`). + /// This returns the string form of the `` portion - the schema under which + /// this table is registered in the catalog and referenced in SQL queries. + pub fn sql_table_ref_schema(&self) -> &str { + &self.sql_table_ref_schema + } +} + +impl fmt::Display for LogicalTable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.table_ref()) + } +} diff --git a/crates/core/common/src/catalog/physical/for_dump.rs b/crates/core/common/src/catalog/physical/for_dump.rs index 622101b23..e3cffb8eb 100644 --- a/crates/core/common/src/catalog/physical/for_dump.rs +++ b/crates/core/common/src/catalog/physical/for_dump.rs @@ -7,6 +7,7 @@ use amp_data_store::DataStore; use datasets_common::{hash_reference::HashReference, table_name::TableName}; use crate::catalog::{ + dataset_access::DatasetAccess, logical::LogicalCatalog, physical::{Catalog, PhysicalTable}, }; @@ -18,6 +19,7 @@ use crate::catalog::{ /// /// ## Parameters /// +/// - `dataset_store`: Used to retrieve dataset metadata including start_block /// - `data_store`: Used to query metadata database for physical parquet locations /// - `logical`: Pre-created logical catalog containing table schemas and UDFs /// @@ -31,8 +33,10 @@ use crate::catalog::{ /// The function: /// 1. Iterates through tables in the logical catalog /// 2. Queries metadata database for physical parquet locations -/// 3. Constructs physical catalog for query execution +/// 3. Retrieves dataset metadata to get start_block +/// 4. Constructs physical catalog for query execution pub async fn create( + dataset_store: &impl DatasetAccess, data_store: &DataStore, logical: LogicalCatalog, ) -> Result { @@ -54,10 +58,19 @@ pub async fn create( table: table_name.clone(), })?; + // Retrieve dataset to get start_block + let dataset = dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|source| CreateCatalogError::DatasetRetrieval { + dataset: dataset_ref.clone(), + source, + })?; + let physical_table = PhysicalTable::from_active_revision( data_store.clone(), table.dataset_reference().clone(), - table.dataset_start_block(), + dataset.start_block, table.table().clone(), revision, table.sql_table_ref_schema().to_string(), @@ -100,4 +113,15 @@ pub enum CreateCatalogError { /// The name of the table that has not been synced table: TableName, }, + + /// Failed to retrieve dataset metadata. + /// + /// This occurs when retrieving the dataset to extract start_block fails. + #[error("Failed to retrieve dataset {dataset}")] + DatasetRetrieval { + /// The hash reference of the dataset + dataset: HashReference, + #[source] + source: crate::BoxError, + }, } diff --git a/crates/core/common/src/catalog/physical/for_query.rs b/crates/core/common/src/catalog/physical/for_query.rs index 159d57f6f..67bbaf695 100644 --- a/crates/core/common/src/catalog/physical/for_query.rs +++ b/crates/core/common/src/catalog/physical/for_query.rs @@ -9,6 +9,7 @@ use amp_data_store::DataStore; use datasets_common::{hash_reference::HashReference, table_name::TableName}; use crate::catalog::{ + dataset_access::DatasetAccess, logical::LogicalCatalog, physical::{Catalog, PhysicalTable}, }; @@ -20,6 +21,7 @@ use crate::catalog::{ /// /// ## Parameters /// +/// - `dataset_store`: Used to retrieve dataset metadata including start_block /// - `data_store`: Used to query metadata database for physical parquet locations /// - `logical`: Pre-created logical catalog containing table schemas and UDFs /// @@ -36,8 +38,10 @@ use crate::catalog::{ /// The function: /// 1. Iterates through tables in the logical catalog /// 2. Queries metadata database for physical parquet locations -/// 3. Constructs physical catalog for query execution +/// 3. Retrieves dataset metadata to get start_block +/// 4. Constructs physical catalog for query execution pub async fn create( + dataset_store: &impl DatasetAccess, data_store: &DataStore, logical: LogicalCatalog, ) -> Result { @@ -58,10 +62,19 @@ pub async fn create( table: table.name().clone(), })?; + // Retrieve dataset to get start_block + let dataset = dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|source| CreateCatalogError::DatasetRetrieval { + dataset: dataset_ref.clone(), + source, + })?; + let physical_table = PhysicalTable::from_active_revision( data_store.clone(), table.dataset_reference().clone(), - table.dataset_start_block(), + dataset.start_block, table.table().clone(), revision, table.sql_table_ref_schema().to_string(), @@ -104,4 +117,15 @@ pub enum CreateCatalogError { /// The name of the table that has not been synced table: TableName, }, + + /// Failed to retrieve dataset metadata. + /// + /// This occurs when retrieving the dataset to extract start_block fails. + #[error("Failed to retrieve dataset {dataset}")] + DatasetRetrieval { + /// The hash reference of the dataset + dataset: HashReference, + #[source] + source: crate::BoxError, + }, } diff --git a/crates/core/common/src/planning_context.rs b/crates/core/common/src/planning_context.rs index b7701bb15..932d0d104 100644 --- a/crates/core/common/src/planning_context.rs +++ b/crates/core/common/src/planning_context.rs @@ -18,7 +18,7 @@ use datafusion::{ use tracing::instrument; use crate::{ - BoxError, LogicalCatalog, QueryContext, ResolvedTable, + BoxError, LogicalCatalog, LogicalTable, QueryContext, plan_visitors::{is_incremental, propagate_block_num}, query_context::{Error, default_catalog_name}, sql::TableReference, @@ -80,7 +80,7 @@ impl PlanningContext { } } - pub fn catalog(&self) -> &[ResolvedTable] { + pub fn catalog(&self) -> &[LogicalTable] { &self.catalog.tables } @@ -104,7 +104,7 @@ impl PlanningContext { } #[derive(Clone, Debug)] -struct PlanningTable(ResolvedTable); +struct PlanningTable(LogicalTable); #[async_trait] impl TableProvider for PlanningTable { diff --git a/crates/core/dump/src/derived_dataset.rs b/crates/core/dump/src/derived_dataset.rs index 1a7403008..b4c02a4e1 100644 --- a/crates/core/dump/src/derived_dataset.rs +++ b/crates/core/dump/src/derived_dataset.rs @@ -480,7 +480,7 @@ async fn dump_table( (table_refs, func_refs), ) .await?; - physical_catalog::create(&ctx.data_store, logical).await? + physical_catalog::create(&ctx.dataset_store, &ctx.data_store, logical).await? }; let planning_ctx = PlanningContext::new(catalog.logical().clone()); let manifest_start_block = manifest.start_block; diff --git a/crates/core/dump/src/raw_dataset.rs b/crates/core/dump/src/raw_dataset.rs index 2fed9bff3..1338c1951 100644 --- a/crates/core/dump/src/raw_dataset.rs +++ b/crates/core/dump/src/raw_dataset.rs @@ -91,7 +91,7 @@ use std::{ use amp_data_store::DataStore; use common::{ - BlockNum, BlockStreamer, BoxError, LogicalCatalog, ResolvedTable, + BlockNum, BlockStreamer, BoxError, LogicalCatalog, LogicalTable, catalog::physical::{Catalog, PhysicalTable}, metadata::segments::merge_ranges, }; @@ -195,11 +195,10 @@ pub async fn dump( let resolved_tables: Vec<_> = tables .iter() .map(|(t, _)| { - ResolvedTable::new( - t.table().clone(), + LogicalTable::new( t.sql_table_ref_schema().to_string(), dataset_reference.clone(), - dataset.start_block, + t.table().clone(), ) }) .collect(); diff --git a/crates/core/dump/src/streaming_query.rs b/crates/core/dump/src/streaming_query.rs index 493ad7c92..c00f3940f 100644 --- a/crates/core/dump/src/streaming_query.rs +++ b/crates/core/dump/src/streaming_query.rs @@ -10,8 +10,8 @@ use alloy::{hex::ToHexExt as _, primitives::BlockHash}; use amp_data_store::DataStore; use amp_dataset_store::DatasetStore; use common::{ - BlockNum, BoxError, Dataset, DetachedLogicalPlan, LogicalCatalog, PlanningContext, - QueryContext, ResolvedTable, SPECIAL_BLOCK_NUM, + BlockNum, BoxError, Dataset, DetachedLogicalPlan, LogicalCatalog, LogicalTable, + PlanningContext, QueryContext, SPECIAL_BLOCK_NUM, arrow::{array::RecordBatch, datatypes::SchemaRef}, catalog::physical::{Catalog, PhysicalTable}, incrementalizer::incrementalize_plan, @@ -383,11 +383,10 @@ impl StreamingQuery { // Construct a catalog for the single `blocks_table`. let catalog = { let table = &self.blocks_table; - let resolved_table = ResolvedTable::new( - table.table().clone(), + let resolved_table = LogicalTable::new( table.sql_table_ref_schema().to_string(), table.dataset_reference().clone(), - table.dataset_start_block(), + table.table().clone(), ); let logical = LogicalCatalog::from_tables(std::iter::once(&resolved_table)); Catalog::new(vec![self.blocks_table.clone()], logical) diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index ed807e35c..8b2f69ee2 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -129,7 +129,7 @@ impl Service { ) .await .map_err(Error::CreateLogicalCatalogError)?; - create_physical_catalog(&self.data_store, logical) + create_physical_catalog(&self.dataset_store, &self.data_store, logical) .await .map_err(Error::PhysicalCatalogError) }?; diff --git a/tests/src/testlib/fixtures/snapshot_ctx.rs b/tests/src/testlib/fixtures/snapshot_ctx.rs index 3af8d8a43..5a65cd4c0 100644 --- a/tests/src/testlib/fixtures/snapshot_ctx.rs +++ b/tests/src/testlib/fixtures/snapshot_ctx.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use amp_data_store::DataStore; use common::{ - BoxError, LogicalCatalog, QueryContext, ResolvedTable, + BoxError, LogicalCatalog, LogicalTable, QueryContext, catalog::physical::{Catalog, PhysicalTable}, }; use server::config::Config; @@ -40,11 +40,10 @@ impl SnapshotContext { let resolved_tables: Vec<_> = tables .iter() .map(|t| { - ResolvedTable::new( - t.table().clone(), + LogicalTable::new( t.sql_table_ref_schema().to_string(), t.dataset_reference().clone(), - t.dataset_start_block(), + t.table().clone(), ) }) .collect(); diff --git a/tests/src/testlib/helpers.rs b/tests/src/testlib/helpers.rs index 9dd70641c..62e8dacaf 100644 --- a/tests/src/testlib/helpers.rs +++ b/tests/src/testlib/helpers.rs @@ -12,7 +12,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use amp_data_store::DataStore; use amp_dataset_store::DatasetStore; use common::{ - BoxError, LogicalCatalog, ResolvedTable, + BoxError, LogicalCatalog, LogicalTable, arrow::array::RecordBatch, catalog::physical::{Catalog, PhysicalTable}, metadata::segments::BlockRange, @@ -251,7 +251,7 @@ pub async fn restore_dataset_snapshot( "Restored tables via Admin API, loading PhysicalTable objects" ); - // 2. Load the dataset to get ResolvedTables + // 2. Load the dataset to get LogicalTables let dataset_ref = dataset_store .resolve_revision(dataset_ref) .await? @@ -466,11 +466,10 @@ pub async fn catalog_for_dataset( let resolved_tables: Vec<_> = tables .iter() .map(|t| { - ResolvedTable::new( - t.table().clone(), + LogicalTable::new( t.sql_table_ref_schema().to_string(), dataset_ref.clone(), - dataset.start_block, + t.table().clone(), ) }) .collect(); diff --git a/tests/src/tests/it_reorg.rs b/tests/src/tests/it_reorg.rs index 5cb123ca1..e7f61888d 100644 --- a/tests/src/tests/it_reorg.rs +++ b/tests/src/tests/it_reorg.rs @@ -484,9 +484,13 @@ impl ReorgTestCtx { ) .await .expect("Failed to create logical catalog"); - physical_catalog::create(test_env.daemon_server().data_store(), logical) - .await - .expect("Failed to create physical catalog for SQL query") + physical_catalog::create( + test_env.daemon_server().dataset_store(), + test_env.daemon_server().data_store(), + logical, + ) + .await + .expect("Failed to create physical catalog for SQL query") }; let table = catalog .tables()