Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 42 additions & 3 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,56 @@ pub mod util;
/// surfaced to the API.
#[async_trait]
pub trait NodeDelegate: Send + Sync {
/// Error returned by [Self::leader].
///
/// Must satisfy [MaybeMisdirected] to indicate whether the method would
/// never succeed on this node due to the database not being scheduled on it.
///
/// The [Into<axum::response::ErrorResponse] shall convert the error into an
/// HTTP response, providing an error message suitable for API clients.
/// The [fmt::Display] impl is used for logging the error, and may provide
/// additional context useful for debugging purposes.
type GetLeaderHostError: MaybeMisdirected + Into<axum::response::ErrorResponse> + fmt::Display + Send + Sync;

fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
fn client_actor_index(&self) -> &ClientActorIndex;

type JwtAuthProviderT: auth::JwtAuthProvider;
fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT;
/// Return the leader [`Host`] of `database_id`.
///
/// Returns `None` if the current leader is not hosted by this node.
/// The [`Host`] is spawned implicitly if not already running.
async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>>;
async fn leader(&self, database_id: u64) -> Result<Host, Self::GetLeaderHostError>;
fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir;
}

/// Predicate on the [NodeDelegate::GetLeaderHostError].
///
/// Normally, the routing layer determines the cluster node hosting the current
/// leader. In between the routing decision and actually executing the API
/// handler on the node, the database's state can, however, change, so that the
/// [NodeDelegate::leader] method is unable to provide the current leader [Host].
///
/// This trait allows to detect this case.
//
// Used in the logs endpoint to allow serving module logs even if
// the database is not currently running.
pub trait MaybeMisdirected {
/// Return `true` if the current node is not responsible for the leader
/// replica of the requested database.
///
/// This could be the case if:
///
/// - the current or most-recently-known leader is not assigned to the node
/// - no leader is currently known
/// - the database does not exist
///
/// Note that a database may not be running (e.g. due to being in a
/// suspended state). If its last leader is known and assigned to the
/// current node, this method shall return `true`.
fn is_misdirected(&self) -> bool;
}

/// Client view of a running module.
pub struct Host {
pub replica_id: u64,
Expand Down Expand Up @@ -391,6 +428,8 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
#[async_trait]
impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
type JwtAuthProviderT = T::JwtAuthProviderT;
type GetLeaderHostError = T::GetLeaderHostError;

fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
(**self).gather_metrics()
}
Expand All @@ -403,7 +442,7 @@ impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
(**self).jwt_auth_provider()
}

async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
async fn leader(&self, database_id: u64) -> Result<Host, Self::GetLeaderHostError> {
(**self).leader(database_id).await
}

Expand Down
93 changes: 41 additions & 52 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::borrow::Cow;
use std::env;
use std::num::NonZeroU8;
use std::str::FromStr;
use std::time::Duration;
use std::{env, io};

use crate::auth::{
anon_auth_middleware, SpacetimeAuth, SpacetimeEnergyUsed, SpacetimeExecutionDurationMicros, SpacetimeIdentity,
Expand All @@ -11,18 +11,18 @@ use crate::auth::{
use crate::routes::subscribe::generate_random_connection_id;
pub use crate::util::{ByteStringBody, NameOrIdentity};
use crate::{
log_and_500, Action, Authorization, ControlStateDelegate, DatabaseDef, DatabaseResetDef, Host, NodeDelegate,
Unauthorized,
log_and_500, Action, Authorization, ControlStateDelegate, DatabaseDef, DatabaseResetDef, Host, MaybeMisdirected,
NodeDelegate, Unauthorized,
};
use axum::body::{Body, Bytes};
use axum::extract::{Path, Query, State};
use axum::response::{ErrorResponse, IntoResponse};
use axum::routing::MethodRouter;
use axum::Extension;
use axum_extra::TypedHeader;
use futures::StreamExt;
use futures::TryStreamExt;
use http::StatusCode;
use log::info;
use log::{info, warn};
use serde::Deserialize;
use spacetimedb::database_logger::DatabaseLogger;
use spacetimedb::host::module_host::ClientConnectedError;
Expand Down Expand Up @@ -84,6 +84,7 @@ pub struct CallParams {
}

pub const NO_SUCH_DATABASE: (StatusCode, &str) = (StatusCode::NOT_FOUND, "No such database.");
const MISDIRECTED: (StatusCode, &str) = (StatusCode::NOT_FOUND, "Database is not scheduled on this host");

fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String) {
let status_code = match e {
Expand Down Expand Up @@ -283,11 +284,7 @@ async fn find_leader_and_database<S: ControlStateDelegate + NodeDelegate>(
NO_SUCH_DATABASE
})?;

let leader = worker_ctx
.leader(database.id)
.await
.map_err(log_and_500)?
.ok_or(StatusCode::NOT_FOUND)?;
let leader = worker_ctx.leader(database.id).await.map_err(log_and_500)?;

Ok((leader, database))
}
Expand Down Expand Up @@ -430,49 +427,41 @@ where
.authorize_action(auth.claims.identity, database.database_identity, Action::ViewModuleLogs)
.await?;

let replica = worker_ctx
.get_leader_replica_by_database(database.id)
.await
.ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?;
let replica_id = replica.id;

let logs_dir = worker_ctx.module_logs_dir(replica_id);
let lines = DatabaseLogger::read_latest(logs_dir, num_lines).await;

let body = if follow {
let leader = worker_ctx
.leader(database.id)
.await
.map_err(log_and_500)?
.ok_or(StatusCode::NOT_FOUND)?;
let log_rx = leader
.module()
.await
.map_err(log_and_500)?
.subscribe_to_logs()
.map_err(log_and_500)?;

let stream = tokio_stream::wrappers::BroadcastStream::new(log_rx).filter_map(move |x| {
std::future::ready(match x {
Ok(log) => Some(log),
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(skipped)) => {
log::trace!(
"Skipped {} lines in log for module {}",
skipped,
database_identity.to_hex()
);
None
}
})
});

let stream = futures::stream::once(std::future::ready(lines.into()))
.chain(stream)
.map(Ok::<_, std::convert::Infallible>);
fn log_err(database: Identity) -> impl Fn(&io::Error) {
move |e| warn!("error serving module logs for database {database}: {e:#}")
}

Body::from_stream(stream)
} else {
Body::from(lines)
let body = match worker_ctx.leader(database.id).await {
Ok(host) => {
let module = host.module().await.map_err(log_and_500)?;
let logs = module.database_logger().tail(num_lines, follow).await.map_err(|e| {
warn!("database={database_identity} unable to tail logs: {e:#}");
(StatusCode::SERVICE_UNAVAILABLE, "Logs are temporarily not available")
})?;
Body::from_stream(logs.inspect_err(log_err(database_identity)))
}
Err(e) if e.is_misdirected() => return Err(MISDIRECTED.into()),
// If this is the right node for the current or last-known leader,
// we may still be able to serve logs from disk,
// even if we can't get hold of a running [ModuleHost].
Err(e) => {
warn!("could not obtain leader host for module logs: {e:#}");
let Some(replica) = worker_ctx.get_leader_replica_by_database(database.id).await else {
return Err(MISDIRECTED.into());
};
let logs_dir = worker_ctx.module_logs_dir(replica.id);
if !logs_dir.0.try_exists().map_err(log_and_500)? {
// Probably an in-memory database.
// Logs may become available at a later time.
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"Database is not running and doesn't have persistent logs",
)
.into());
}
let logs = DatabaseLogger::read_latest_on_disk(logs_dir, num_lines);
Body::from_stream(logs.inspect_err(log_err(database_identity)))
}
};

Ok((
Expand Down
6 changes: 1 addition & 5 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ where
.unwrap()
.ok_or(StatusCode::NOT_FOUND)?;

let leader = ctx
.leader(database.id)
.await
.map_err(log_and_500)?
.ok_or(StatusCode::NOT_FOUND)?;
let leader = ctx.leader(database.id).await.map_err(log_and_500)?;

let identity_token = auth.creds.token().into();

Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ thiserror.workspace = true
thin-vec.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-stream = "0.1"
tokio-stream = { workspace = true, features = ["sync"] }
tokio-metrics = { version = "0.4.0", features = ["rt"] }
toml.workspace = true
tracing-appender.workspace = true
Expand Down
Loading
Loading