Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 6 additions & 70 deletions crates/core/common/src/catalog/logical.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
sync::Arc,
};

Expand All @@ -16,12 +15,15 @@ use datasets_common::{
use js_runtime::isolate_pool::IsolatePool;
use serde::Deserialize;

use crate::{BlockNum, SPECIAL_BLOCK_NUM, js_udf::JsUdf, sql::TableReference};
use crate::{BlockNum, SPECIAL_BLOCK_NUM, js_udf::JsUdf};

pub mod for_admin_api;
pub mod for_dump;
pub mod for_manifest_validation;
pub mod for_query;
pub mod table;

pub use table::LogicalTable;

/// Identifies a dataset and its data schema.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -114,72 +116,6 @@ impl Table {
}
}

/// A table that holds a reference to its dataset.
#[derive(Debug, Clone)]
pub struct ResolvedTable {
table: Table,
/// The dataset reference portion of SQL table references.
///
/// SQL table references have the format `<dataset_ref>.<table>` (e.g., `anvil_rpc.blocks`).
/// This field stores the string form of the `<dataset_ref>` portion - the schema under
/// which this table is registered in the catalog and referenced in SQL queries.
sql_table_ref_schema: String,
dataset_reference: HashReference,
dataset_start_block: Option<BlockNum>,
}

impl fmt::Display for ResolvedTable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.table_ref())
}
}

impl ResolvedTable {
pub fn new(
table: Table,
sql_table_ref_schema: String,
dataset_reference: HashReference,
dataset_start_block: Option<BlockNum>,
) -> Self {
Self {
table,
sql_table_ref_schema,
dataset_reference,
dataset_start_block,
}
}

pub fn table(&self) -> &Table {
&self.table
}

pub fn table_ref(&self) -> TableReference {
TableReference::partial(self.sql_table_ref_schema.clone(), self.table.name.clone())
}

pub fn dataset_reference(&self) -> &HashReference {
&self.dataset_reference
}

pub fn dataset_start_block(&self) -> Option<BlockNum> {
self.dataset_start_block
}

/// Bare table name
pub fn name(&self) -> &TableName {
&self.table.name
}

/// Returns the dataset reference portion of SQL table references.
///
/// SQL table references have the format `<dataset_ref>.<table>` (e.g., `anvil_rpc.blocks`).
/// This returns the string form of the `<dataset_ref>` portion - the schema under which
/// this table is registered in the catalog and referenced in SQL queries.
pub fn sql_table_ref_schema(&self) -> &str {
&self.sql_table_ref_schema
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Function {
pub name: String,
Expand All @@ -198,13 +134,13 @@ pub struct FunctionSource {

#[derive(Clone, Debug)]
pub struct LogicalCatalog {
pub tables: Vec<ResolvedTable>,
pub tables: Vec<LogicalTable>,
/// UDFs specific to the datasets corresponding to the resolved tables.
pub udfs: Vec<ScalarUDF>,
}

impl LogicalCatalog {
pub fn from_tables<'a>(tables: impl Iterator<Item = &'a ResolvedTable>) -> Self {
pub fn from_tables<'a>(tables: impl Iterator<Item = &'a LogicalTable>) -> Self {
Self {
tables: tables.cloned().collect(),
udfs: Vec::new(),
Expand Down
26 changes: 12 additions & 14 deletions crates/core/common/src/catalog/logical/for_admin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
BoxError,
catalog::{
dataset_access::DatasetAccess,
logical::{LogicalCatalog, ResolvedTable},
logical::{LogicalCatalog, LogicalTable},
},
js_udf::JsUdf,
sql::{FunctionReference, TableReference},
Expand All @@ -52,14 +52,14 @@ pub type TableReferencesMap = BTreeMap<
/// ## Process
///
/// 1. Flattens table references from the references map
/// 2. Resolves all table references to ResolvedTable instances
/// 2. Resolves all table references to LogicalTable instances
/// 3. Flattens function references from the references map
/// 4. Resolves all function references to ScalarUDF instances
/// 5. Creates and returns a LogicalCatalog
///
/// ## Related Functions
///
/// - [`resolve_tables`] - Resolves table references to ResolvedTable instances
/// - [`resolve_tables`] - Resolves table references to LogicalTable instances
/// - [`resolve_udfs`] - Resolves function references to UDFs
pub async fn create(
dataset_store: &impl DatasetAccess,
Expand Down Expand Up @@ -99,7 +99,7 @@ pub async fn create(

#[derive(Debug, thiserror::Error)]
pub enum CreateLogicalCatalogError {
/// Failed to resolve table references to ResolvedTable instances
/// Failed to resolve table references to LogicalTable instances
#[error(transparent)]
ResolveTables(ResolveTablesError),

Expand All @@ -108,18 +108,18 @@ pub enum CreateLogicalCatalogError {
ResolveUdfs(ResolveUdfsError),
}

/// Resolves table references to ResolvedTable instances using pre-resolved dependencies.
/// Resolves table references to LogicalTable instances using pre-resolved dependencies.
///
/// Processes each table reference across all tables, looks up datasets by hash, finds tables
/// within datasets, and creates ResolvedTable instances for catalog construction.
/// within datasets, and creates LogicalTable instances for catalog construction.
async fn resolve_tables<'a>(
dataset_store: &impl DatasetAccess,
manifest_deps: &BTreeMap<DepAlias, HashReference>,
refs: impl IntoIterator<Item = (&'a TableName, &'a TableReference<DepAlias>)> + 'a,
) -> Result<Vec<ResolvedTable>, ResolveTablesError> {
) -> Result<Vec<LogicalTable>, ResolveTablesError> {
// Use hash-based map to deduplicate datasets across ALL tables
// Inner map: table_ref -> ResolvedTable (deduplicates table references)
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, ResolvedTable>> =
// Inner map: table_ref -> LogicalTable (deduplicates table references)
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, LogicalTable>> =
BTreeMap::new();

// Process all table references - fail fast on first error
Expand Down Expand Up @@ -170,12 +170,10 @@ async fn resolve_tables<'a>(
reference: dataset_ref.clone(),
})?;

// Create ResolvedTable
let resolved_table = ResolvedTable::new(
dataset_table.clone(),
let resolved_table = LogicalTable::new(
schema.to_string(),
dataset_ref.clone(),
dataset.start_block,
dataset_table.clone(),
);

// Insert into vacant entry
Expand All @@ -184,7 +182,7 @@ async fn resolve_tables<'a>(
}
}

// Flatten to Vec<ResolvedTable>
// Flatten to Vec<LogicalTable>
Ok(tables
.into_values()
.flat_map(|map| map.into_values())
Expand Down
25 changes: 12 additions & 13 deletions crates/core/common/src/catalog/logical/for_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datasets_common::{
use js_runtime::isolate_pool::IsolatePool;

use crate::{
BoxError, ResolvedTable,
BoxError, LogicalTable,
catalog::{dataset_access::DatasetAccess, logical::LogicalCatalog},
js_udf::JsUdf,
sql::{FunctionReference, TableReference},
Expand All @@ -46,7 +46,7 @@ pub type ResolvedReferences = (
///
/// The function:
/// 1. Destructures the references tuple into table and function references
/// 2. Resolves table references to ResolvedTable instances using pre-resolved dependencies
/// 2. Resolves table references to LogicalTable instances using pre-resolved dependencies
/// 3. Resolves function references to ScalarUDF instances
/// 4. Returns a LogicalCatalog containing tables and UDFs
pub async fn create(
Expand Down Expand Up @@ -74,18 +74,18 @@ pub async fn create(
Ok(LogicalCatalog { tables, udfs })
}

/// Resolves table references to ResolvedTable instances using pre-resolved dependencies.
/// Resolves table references to LogicalTable instances using pre-resolved dependencies.
///
/// Processes each table reference, looks up the dataset by hash, finds the table
/// within the dataset, and creates a ResolvedTable for catalog construction.
/// within the dataset, and creates a LogicalTable for catalog construction.
async fn resolve_tables(
dataset_store: &impl DatasetAccess,
manifest_deps: &BTreeMap<DepAlias, HashReference>,
refs: impl IntoIterator<Item = TableReference<DepAlias>>,
) -> Result<Vec<ResolvedTable>, ResolveTablesError> {
) -> Result<Vec<LogicalTable>, ResolveTablesError> {
// Use hash-based map to deduplicate datasets and collect resolved tables
// Inner map: table_ref -> ResolvedTable (deduplicates table references)
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, ResolvedTable>> =
// Inner map: table_ref -> LogicalTable (deduplicates table references)
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, LogicalTable>> =
BTreeMap::new();

for table_ref in refs {
Expand Down Expand Up @@ -131,12 +131,11 @@ async fn resolve_tables(
reference: dataset_ref.clone(),
})?;

// Create ResolvedTable
let resolved_table = ResolvedTable::new(
dataset_table.clone(),
// Create LogicalTable
let resolved_table = LogicalTable::new(
schema.to_string(),
dataset_ref.clone(),
dataset.start_block,
dataset_table.clone(),
);

// Insert into vacant entry
Expand All @@ -145,7 +144,7 @@ async fn resolve_tables(
}
}

// Flatten to Vec<ResolvedTable>
// Flatten to Vec<LogicalTable>
Ok(tables
.into_values()
.flat_map(|map| map.into_values())
Expand Down Expand Up @@ -285,7 +284,7 @@ async fn resolve_udfs(
/// a logical catalog for derived dataset execution.
#[derive(Debug, thiserror::Error)]
pub enum CreateCatalogError {
/// Failed to resolve table references to ResolvedTable instances.
/// Failed to resolve table references to LogicalTable instances.
#[error(transparent)]
ResolveTables(ResolveTablesError),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
BoxError,
catalog::{
dataset_access::DatasetAccess,
logical::{LogicalCatalog, ResolvedTable},
logical::{LogicalCatalog, LogicalTable},
},
js_udf::JsUdf,
sql::{FunctionReference, TableReference},
Expand All @@ -50,7 +50,7 @@ pub type TableReferencesMap = BTreeMap<
/// resolving dependency aliases to datasets for schema-only validation (no physical data access).
///
/// Delegates to specialized helpers:
/// - [`resolve_tables`] - Resolves table references to `ResolvedTable` instances
/// - [`resolve_tables`] - Resolves table references to `LogicalTable` instances
/// - [`resolve_udfs`] - Resolves function references to UDFs
pub async fn create(
dataset_store: &impl DatasetAccess,
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn create(

#[derive(Debug, thiserror::Error)]
pub enum CreateLogicalCatalogError {
/// Failed to resolve table references to ResolvedTable instances
/// Failed to resolve table references to LogicalTable instances
#[error(transparent)]
ResolveTables(ResolveTablesError),

Expand All @@ -99,13 +99,13 @@ pub enum CreateLogicalCatalogError {
ResolveUdfs(ResolveUdfsError),
}

/// Resolves table references to ResolvedTable instances using pre-resolved dependencies.
/// Resolves table references to LogicalTable instances using pre-resolved dependencies.
async fn resolve_tables<'a>(
dataset_store: &impl DatasetAccess,
manifest_deps: &BTreeMap<DepAlias, HashReference>,
refs: impl IntoIterator<Item = (&'a TableName, &'a TableReference<DepAlias>)> + 'a,
) -> Result<Vec<ResolvedTable>, ResolveTablesError> {
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, ResolvedTable>> =
) -> Result<Vec<LogicalTable>, ResolveTablesError> {
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, LogicalTable>> =
BTreeMap::new();

for (table_name, table_ref) in refs {
Expand Down Expand Up @@ -151,11 +151,10 @@ async fn resolve_tables<'a>(
reference: dataset_ref.clone(),
})?;

let resolved_table = ResolvedTable::new(
dataset_table.clone(),
let resolved_table = LogicalTable::new(
schema.to_string(),
dataset_ref.clone(),
dataset.start_block,
dataset_table.clone(),
);

entry.insert(resolved_table);
Expand Down
22 changes: 10 additions & 12 deletions crates/core/common/src/catalog/logical/for_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datasets_common::{
use js_runtime::isolate_pool::IsolatePool;

use crate::{
BoxError, ResolvedTable,
BoxError, LogicalTable,
catalog::{dataset_access::DatasetAccess, logical::LogicalCatalog},
sql::{FunctionReference, TableReference},
};
Expand Down Expand Up @@ -66,17 +66,17 @@ pub async fn create(
Ok(LogicalCatalog { tables, udfs })
}

/// Resolves table references to ResolvedTable instances using dynamic resolution.
/// Resolves table references to LogicalTable instances using dynamic resolution.
///
/// Processes each table reference, resolves the dataset reference to a hash,
/// loads the dataset, finds the table, and creates a ResolvedTable for catalog construction.
/// loads the dataset, finds the table, and creates a LogicalTable for catalog construction.
async fn resolve_tables(
dataset_store: &impl DatasetAccess,
refs: impl IntoIterator<Item = TableReference<PartialReference>>,
) -> Result<Vec<ResolvedTable>, ResolveTablesError> {
) -> Result<Vec<LogicalTable>, ResolveTablesError> {
// Use hash-based map to deduplicate datasets and collect resolved tables
// Inner map: table_ref -> ResolvedTable (deduplicates table references)
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<PartialReference>, ResolvedTable>> =
// Inner map: table_ref -> LogicalTable (deduplicates table references)
let mut tables: BTreeMap<Hash, BTreeMap<TableReference<PartialReference>, LogicalTable>> =
BTreeMap::new();

for table_ref in refs {
Expand Down Expand Up @@ -131,12 +131,10 @@ async fn resolve_tables(
reference: dataset_ref.clone(),
})?;

// Create ResolvedTable
let resolved_table = ResolvedTable::new(
dataset_table.clone(),
let resolved_table = LogicalTable::new(
schema.to_string(),
dataset_ref.clone(),
dataset.start_block,
dataset_table.clone(),
);

// Insert into vacant entry
Expand All @@ -145,7 +143,7 @@ async fn resolve_tables(
}
}

// Flatten to Vec<ResolvedTable>
// Flatten to Vec<LogicalTable>
Ok(tables
.into_values()
.flat_map(|map| map.into_values())
Expand Down Expand Up @@ -244,7 +242,7 @@ async fn resolve_udfs(
/// a logical catalog for Arrow Flight query planning (GetFlightInfo).
#[derive(Debug, thiserror::Error)]
pub enum CreateCatalogError {
/// Failed to resolve table references to ResolvedTable instances.
/// Failed to resolve table references to LogicalTable instances.
#[error(transparent)]
ResolveTables(ResolveTablesError),

Expand Down
Loading
Loading