diff --git a/Cargo.lock b/Cargo.lock index 4fac718f3b2..81c62a67517 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8268,6 +8268,7 @@ dependencies = [ "clap 4.5.50", "duct", "env_logger 0.10.2", + "futures", "lazy_static", "log", "rand 0.9.2", diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index a5051e38eac..1ccadc877cd 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -34,6 +34,17 @@ pub mod util; /// surfaced to the API. #[async_trait] pub trait NodeDelegate: Send + Sync { + /// Error returned by [Self::leader]. + /// + /// Must satisfy [MaybeMisdirected] to indicate whether the method would + /// never succeed on this node due to the database not being scheduled on it. + /// + /// The [Into + fmt::Display + Send + Sync; + fn gather_metrics(&self) -> Vec; fn client_actor_index(&self) -> &ClientActorIndex; @@ -41,12 +52,38 @@ pub trait NodeDelegate: Send + Sync { fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT; /// Return the leader [`Host`] of `database_id`. /// - /// Returns `None` if the current leader is not hosted by this node. /// The [`Host`] is spawned implicitly if not already running. - async fn leader(&self, database_id: u64) -> anyhow::Result>; + async fn leader(&self, database_id: u64) -> Result; fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir; } +/// Predicate on the [NodeDelegate::GetLeaderHostError]. +/// +/// Normally, the routing layer determines the cluster node hosting the current +/// leader. In between the routing decision and actually executing the API +/// handler on the node, the database's state can, however, change, so that the +/// [NodeDelegate::leader] method is unable to provide the current leader [Host]. +/// +/// This trait allows to detect this case. +// +// Used in the logs endpoint to allow serving module logs even if +// the database is not currently running. +pub trait MaybeMisdirected { + /// Return `true` if the current node is not responsible for the leader + /// replica of the requested database. + /// + /// This could be the case if: + /// + /// - the current or most-recently-known leader is not assigned to the node + /// - no leader is currently known + /// - the database does not exist + /// + /// Note that a database may not be running (e.g. due to being in a + /// suspended state). If its last leader is known and assigned to the + /// current node, this method shall return `true`. + fn is_misdirected(&self) -> bool; +} + /// Client view of a running module. pub struct Host { pub replica_id: u64, @@ -391,6 +428,8 @@ impl ControlStateWriteAccess for Arc { #[async_trait] impl NodeDelegate for Arc { type JwtAuthProviderT = T::JwtAuthProviderT; + type GetLeaderHostError = T::GetLeaderHostError; + fn gather_metrics(&self) -> Vec { (**self).gather_metrics() } @@ -403,7 +442,7 @@ impl NodeDelegate for Arc { (**self).jwt_auth_provider() } - async fn leader(&self, database_id: u64) -> anyhow::Result> { + async fn leader(&self, database_id: u64) -> Result { (**self).leader(database_id).await } diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index f54f2b3f875..f871e1e9d5d 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -1,8 +1,8 @@ use std::borrow::Cow; -use std::env; use std::num::NonZeroU8; use std::str::FromStr; use std::time::Duration; +use std::{env, io}; use crate::auth::{ anon_auth_middleware, SpacetimeAuth, SpacetimeEnergyUsed, SpacetimeExecutionDurationMicros, SpacetimeIdentity, @@ -11,8 +11,8 @@ use crate::auth::{ use crate::routes::subscribe::generate_random_connection_id; pub use crate::util::{ByteStringBody, NameOrIdentity}; use crate::{ - log_and_500, Action, Authorization, ControlStateDelegate, DatabaseDef, DatabaseResetDef, Host, NodeDelegate, - Unauthorized, + log_and_500, Action, Authorization, ControlStateDelegate, DatabaseDef, DatabaseResetDef, Host, MaybeMisdirected, + NodeDelegate, Unauthorized, }; use axum::body::{Body, Bytes}; use axum::extract::{Path, Query, State}; @@ -20,9 +20,9 @@ use axum::response::{ErrorResponse, IntoResponse}; use axum::routing::MethodRouter; use axum::Extension; use axum_extra::TypedHeader; -use futures::StreamExt; +use futures::TryStreamExt; use http::StatusCode; -use log::info; +use log::{info, warn}; use serde::Deserialize; use spacetimedb::database_logger::DatabaseLogger; use spacetimedb::host::module_host::ClientConnectedError; @@ -84,6 +84,7 @@ pub struct CallParams { } pub const NO_SUCH_DATABASE: (StatusCode, &str) = (StatusCode::NOT_FOUND, "No such database."); +const MISDIRECTED: (StatusCode, &str) = (StatusCode::NOT_FOUND, "Database is not scheduled on this host"); fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String) { let status_code = match e { @@ -283,11 +284,7 @@ async fn find_leader_and_database( NO_SUCH_DATABASE })?; - let leader = worker_ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; + let leader = worker_ctx.leader(database.id).await.map_err(log_and_500)?; Ok((leader, database)) } @@ -430,49 +427,41 @@ where .authorize_action(auth.claims.identity, database.database_identity, Action::ViewModuleLogs) .await?; - let replica = worker_ctx - .get_leader_replica_by_database(database.id) - .await - .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; - let replica_id = replica.id; - - let logs_dir = worker_ctx.module_logs_dir(replica_id); - let lines = DatabaseLogger::read_latest(logs_dir, num_lines).await; - - let body = if follow { - let leader = worker_ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; - let log_rx = leader - .module() - .await - .map_err(log_and_500)? - .subscribe_to_logs() - .map_err(log_and_500)?; - - let stream = tokio_stream::wrappers::BroadcastStream::new(log_rx).filter_map(move |x| { - std::future::ready(match x { - Ok(log) => Some(log), - Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(skipped)) => { - log::trace!( - "Skipped {} lines in log for module {}", - skipped, - database_identity.to_hex() - ); - None - } - }) - }); - - let stream = futures::stream::once(std::future::ready(lines.into())) - .chain(stream) - .map(Ok::<_, std::convert::Infallible>); + fn log_err(database: Identity) -> impl Fn(&io::Error) { + move |e| warn!("error serving module logs for database {database}: {e:#}") + } - Body::from_stream(stream) - } else { - Body::from(lines) + let body = match worker_ctx.leader(database.id).await { + Ok(host) => { + let module = host.module().await.map_err(log_and_500)?; + let logs = module.database_logger().tail(num_lines, follow).await.map_err(|e| { + warn!("database={database_identity} unable to tail logs: {e:#}"); + (StatusCode::SERVICE_UNAVAILABLE, "Logs are temporarily not available") + })?; + Body::from_stream(logs.inspect_err(log_err(database_identity))) + } + Err(e) if e.is_misdirected() => return Err(MISDIRECTED.into()), + // If this is the right node for the current or last-known leader, + // we may still be able to serve logs from disk, + // even if we can't get hold of a running [ModuleHost]. + Err(e) => { + warn!("could not obtain leader host for module logs: {e:#}"); + let Some(replica) = worker_ctx.get_leader_replica_by_database(database.id).await else { + return Err(MISDIRECTED.into()); + }; + let logs_dir = worker_ctx.module_logs_dir(replica.id); + if !logs_dir.0.try_exists().map_err(log_and_500)? { + // Probably an in-memory database. + // Logs may become available at a later time. + return Err(( + StatusCode::SERVICE_UNAVAILABLE, + "Database is not running and doesn't have persistent logs", + ) + .into()); + } + let logs = DatabaseLogger::read_latest_on_disk(logs_dir, num_lines); + Body::from_stream(logs.inspect_err(log_err(database_identity))) + } }; Ok(( diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index d769a8e3284..77ed45e90f2 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -151,11 +151,7 @@ where .unwrap() .ok_or(StatusCode::NOT_FOUND)?; - let leader = ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; + let leader = ctx.leader(database.id).await.map_err(log_and_500)?; let identity_token = auth.creds.token().into(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 4d173c972eb..73e4fd372b0 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -105,7 +105,7 @@ thiserror.workspace = true thin-vec.workspace = true tokio-util.workspace = true tokio.workspace = true -tokio-stream = "0.1" +tokio-stream = { workspace = true, features = ["sync"] } tokio-metrics = { version = "0.4.0", features = ["rt"] } toml.workspace = true tracing-appender.workspace = true diff --git a/crates/core/src/database_logger.rs b/crates/core/src/database_logger.rs index fec718a27ff..0ab92742923 100644 --- a/crates/core/src/database_logger.rs +++ b/crates/core/src/database_logger.rs @@ -1,24 +1,181 @@ +use bytes::Bytes; use chrono::{NaiveDate, Utc}; -use parking_lot::Mutex; +use futures::stream::{self, BoxStream}; +use futures::{Stream, StreamExt as _, TryStreamExt}; +use pin_project_lite::pin_project; +use std::collections::VecDeque; use std::fs::File; +use std::future; use std::io::{self, Read, Seek, Write}; -use tokio::sync::broadcast; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, BufReader}; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use tokio_stream::wrappers::BroadcastStream; +use tokio_util::io::ReaderStream; use spacetimedb_paths::server::{ModuleLogPath, ModuleLogsDir}; use crate::util::asyncify; pub struct DatabaseLogger { - inner: Mutex, - pub tx: broadcast::Sender, + cmd: mpsc::UnboundedSender, } -struct DatabaseLoggerInner { +#[derive(Debug, thiserror::Error)] +#[error("database logger panicked")] +pub struct LoggerPanicked; + +impl From> for LoggerPanicked { + fn from(_: mpsc::error::SendError) -> Self { + Self + } +} + +impl From for LoggerPanicked { + fn from(_: oneshot::error::RecvError) -> Self { + Self + } +} + +pub type LogStream = BoxStream<'static, io::Result>; + +/// Storage backend of a [DatabaseLogger]. +trait Logger { + /// Append the serialized log `record` at timestamp `ts` to this logger. + fn append(&mut self, ts: chrono::DateTime, record: Bytes); + /// Calculate the size of this logger in bytes. + fn size(&self) -> io::Result; + /// Read up to `n` lines from the tail of this logger into memory. + fn tail(&self, n: u32) -> io::Result; + /// Stream up to `n` lines from the tail of this logger. + /// If `n` is `None`, stream all the contained lines. + fn tail_stream(&self, n: Option) -> LogStream; + /// Sync data to disk (or alternative backing storage). + fn sync_data(&self) -> io::Result<()>; +} + +/// [Logger] that stores log records in a file. +/// +/// The file is rotated daily upon calling [Logger::append]. +struct FileLogger { file: File, date: NaiveDate, path: ModuleLogPath, } +impl FileLogger { + pub fn open(path: ModuleLogPath) -> io::Result { + let date = path.date(); + let file = path.open_file(File::options().create(true).append(true))?; + Ok(Self { file, date, path }) + } + + fn maybe_rotate(&mut self, record_date: NaiveDate) { + if record_date > self.date { + let new_path = self.path.with_date(record_date); + *self = Self::open(new_path).unwrap(); + } + } +} + +impl Logger for FileLogger { + fn append(&mut self, ts: chrono::DateTime, record: Bytes) { + self.maybe_rotate(ts.date_naive()); + self.file.write_all(&record).unwrap(); + } + + fn size(&self) -> io::Result { + self.file.metadata().map(|stat| stat.len()) + } + + fn tail(&self, n: u32) -> io::Result { + let mut file = File::open(&self.path)?; + read_lines(&mut file, n).map(Into::into) + } + + fn tail_stream(&self, n: Option) -> LogStream { + stream::once(asyncify({ + let path = self.path.clone(); + move || { + let mut file = File::open(path)?; + if let Some(n) = n { + let mut buf = seek_buffer(n); + seek_to(&mut file, &mut buf, n)?; + } + + Ok::<_, io::Error>(tokio::fs::File::from_std(file)) + } + })) + .map_ok(ReaderStream::new) + .try_flatten() + .boxed() + } + + fn sync_data(&self) -> io::Result<()> { + self.file.sync_data() + } +} + +/// [Logger] that stores log records in memory. +struct MemoryLogger { + log: VecDeque, + size: u64, + max_size: u64, +} + +impl MemoryLogger { + pub fn new(max_size: u64) -> Self { + Self { + log: <_>::default(), + size: 0, + max_size, + } + } + + fn compact(&mut self) { + while self.size > self.max_size { + let Some(evicted) = self.log.pop_front() else { + break; + }; + self.size -= evicted.len() as u64; + } + } +} + +impl Logger for MemoryLogger { + fn append(&mut self, _: chrono::DateTime, record: Bytes) { + self.size += record.len() as u64; + self.log.push_back(record); + self.compact(); + } + + fn size(&self) -> io::Result { + Ok(self.size) + } + + fn tail(&self, n: u32) -> io::Result { + let total = self.log.len(); + let start = total.saturating_sub(n as _); + Ok(self.log.range(start..).flatten().copied().collect()) + } + + fn tail_stream(&self, n: Option) -> LogStream { + let total = self.log.len(); + let start = total.saturating_sub(n.map(|x| x as usize).unwrap_or(total)); + stream::iter(self.log.range(start..).cloned().collect::>()) + .map(Ok) + .boxed() + } + + fn sync_data(&self) -> io::Result<()> { + Ok(()) + } +} + #[derive(Clone, Copy, PartialEq, Eq, Debug, serde::Deserialize)] pub enum LogLevel { Error, @@ -46,6 +203,7 @@ impl From for LogLevel { #[serde_with::skip_serializing_none] #[serde_with::serde_as] #[derive(serde::Serialize, Copy, Clone)] +#[cfg_attr(test, derive(serde::Deserialize, Debug))] pub struct Record<'a> { #[serde_as(as = "serde_with::TimestampMicroSeconds")] pub ts: chrono::DateTime, @@ -155,28 +313,45 @@ enum LogEvent<'a> { }, } -impl DatabaseLoggerInner { - fn open(path: ModuleLogPath) -> io::Result { - let date = path.date(); - let file = path.open_file(File::options().create(true).append(true))?; - Ok(Self { file, date, path }) +impl DatabaseLogger { + pub fn in_memory(max_size: u64) -> Self { + let logger = MemoryLogger::new(max_size); + Self::with_logger(logger) } -} -impl DatabaseLogger { pub fn open_today(logs_dir: ModuleLogsDir) -> Self { - Self::open(logs_dir.today()) + Self::open_file(logs_dir.today()) } - pub fn open(path: ModuleLogPath) -> Self { - let inner = Mutex::new(DatabaseLoggerInner::open(path).unwrap()); - let (tx, _) = broadcast::channel(64); - Self { inner, tx } + fn open_file(path: ModuleLogPath) -> Self { + let logger = FileLogger::open(path).unwrap(); + Self::with_logger(logger) } + fn with_logger(logger: impl Logger + Send + 'static) -> Self { + let (broadcast, _) = broadcast::channel(64); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + let worker = DatabaseLoggerWorker::new(logger, broadcast); + tokio::spawn(worker.run(cmd_rx)); + + Self { cmd: cmd_tx } + } + + /// Determine the storage size of this logger. + /// + /// If the logger is [Self::in_memory], returns the resident size + /// (of the serialized log records excl. overhead). + /// + /// If the logger is backed by disk storage, returns the size of the most + /// recent log file. #[tracing::instrument(level = "trace", name = "DatabaseLogger::size", skip(self), err)] pub fn size(&self) -> io::Result { - Ok(self.inner.lock().file.metadata()?.len()) + let (tx, rx) = oneshot::channel(); + fn panicked(_: impl std::error::Error) -> io::Error { + io::Error::other("log worker panicked") + } + self.cmd.send(Cmd::GetSize { reply: tx }).map_err(panicked)?; + rx.blocking_recv().map_err(panicked)? } pub fn write(&self, level: LogLevel, &record: &Record<'_>, bt: &dyn BacktraceProvider) { @@ -193,46 +368,69 @@ impl DatabaseLogger { LogEvent::Panic { record, trace: &frames } } }; + // TODO(perf): Reuse serialization buffer. let mut buf = serde_json::to_string(&event).unwrap(); buf.push('\n'); - let mut inner = self.inner.lock(); - let record_date = record.ts.date_naive(); - if record_date > inner.date { - let new_path = inner.path.with_date(record_date); - *inner = DatabaseLoggerInner::open(new_path).unwrap(); - } - inner.file.write_all(buf.as_bytes()).unwrap(); - let _ = self.tx.send(buf.into()); + let buf = Bytes::from(buf); + self.cmd + .send(Cmd::Append { + ts: record.ts, + record: buf, + }) + .expect("log worker panicked"); } - pub async fn read_latest(logs_dir: ModuleLogsDir, num_lines: Option) -> String { - // TODO: do we want to logs from across multiple files? + /// Stream the contents of this logger. + /// + /// If `n` is `Some`, only yield up to the last `n` lines in the log. + /// If `follow` is `true`, the stream waits for new records to be appended + /// to the log (via [Self::write]) and yields them as they become available. + pub async fn tail(&self, n: Option, follow: bool) -> Result { + let (tx, rx) = oneshot::channel(); + self.cmd.send(Cmd::Tail { n, follow, reply: tx })?; + Ok(rx.await?) + } - let Some(num_lines) = num_lines else { - let path = logs_dir.today(); - // look for the most recent logfile. - match tokio::fs::read_to_string(&path).await { - Ok(contents) => return contents, - Err(e) if e.kind() == io::ErrorKind::NotFound => {} - Err(e) => panic!("couldn't read log file: {e}"), + /// Read the most recent logs in `logs_dir`, up to `num_lines`. + /// + /// Note that this only reads from the most recent log file, even if it + /// contains less than `num_lines` lines. + /// + /// If no log file exists on disk, the stream will be empty. + pub fn read_latest_on_disk(logs_dir: ModuleLogsDir, num_lines: Option) -> LogStream { + stream::once(asyncify(move || { + let Some(mut file) = Self::open_most_recent(logs_dir)? else { + return Ok(None); + }; + if let Some(n) = num_lines { + let mut buf = seek_buffer(n); + seek_to(&mut file, &mut buf, n)?; } - // if there's none for today, read the directory and - let logs_dir = path.popped(); - return asyncify(move || match logs_dir.most_recent()? { - Some(newest_log_file) => std::fs::read_to_string(newest_log_file), - None => Ok(String::new()), - }) - .await - .expect("couldn't read log file"); - }; - if num_lines == 0 { - return String::new(); - } + Ok::<_, io::Error>(Some(file)) + })) + .map_ok(into_file_stream) + .try_flatten() + .boxed() + } - asyncify(move || read_latest_lines(logs_dir, num_lines)) - .await - .expect("couldn't read log file") + /// Open the most recent log file found in `logs_dir`, or `None` if none exists. + fn open_most_recent(logs_dir: ModuleLogsDir) -> io::Result> { + let path = logs_dir.today(); + match open_file(&path)? { + Some(file) => Ok(Some(file)), + None => { + let logs_dir = path.popped(); + // `most_recent` errors if the directory doesn't exist. + if !logs_dir.0.try_exists()? { + return Ok(None); + } + let Some(path) = logs_dir.most_recent()? else { + return Ok(None); + }; + open_file(&path) + } + } } pub fn system_logger(&self) -> &SystemLogger { @@ -241,24 +439,137 @@ impl DatabaseLogger { } } -fn read_latest_lines(logs_dir: ModuleLogsDir, num_lines: u32) -> io::Result { - use std::fs::File; - let path = logs_dir.today(); - let mut file = match File::open(&path) { - Ok(f) => f, - Err(e) if e.kind() == io::ErrorKind::NotFound => { - let Some(path) = path.popped().most_recent()? else { - return Ok(String::new()); - }; - File::open(path)? +enum Cmd { + Append { + ts: chrono::DateTime, + record: Bytes, + }, + GetSize { + reply: oneshot::Sender>, + }, + Tail { + n: Option, + follow: bool, + reply: oneshot::Sender, + }, +} + +struct DatabaseLoggerWorker { + logger: Arc>, + broadcast: broadcast::Sender, +} + +impl DatabaseLoggerWorker { + fn new(logger: T, broadcast: broadcast::Sender) -> Self { + let logger = Arc::new(tokio::sync::Mutex::new(logger)); + Self { logger, broadcast } + } + + async fn run(self, mut cmd: mpsc::UnboundedReceiver) { + while let Some(cmd) = cmd.recv().await { + match cmd { + Cmd::Append { ts, record } => self.append(ts, record).await, + Cmd::GetSize { reply } => { + let size = self.size().await; + let _ = reply.send(size); + } + Cmd::Tail { n, follow, reply } => { + let logs = self.tail(n, follow).await; + let _ = reply.send(logs); + } + } } - Err(e) => return Err(e), - }; - let mut lines_read: u32 = 0; - // rough guess of an appropriate size for a chunk that could get all the lines in one, - // assuming a line is roughly 150 bytes long, but capping our buffer size at 64KiB + } + + async fn append(&self, ts: chrono::DateTime, record: Bytes) { + asyncify({ + let logger = self.logger.clone(); + let record = record.clone(); + move || logger.blocking_lock().append(ts, record) + }) + .await; + let _ = self.broadcast.send(record); + } + + async fn size(&self) -> io::Result { + let logger = self.logger.clone(); + asyncify(move || logger.blocking_lock().size()).await + } + + async fn tail(&self, n: Option, follow: bool) -> LogStream { + // If following isn't requested, we can stream the data. + if !follow { + return self.logger.lock().await.tail_stream(n); + } + match n { + // If we don't need to access the disk, + // locking and spawning can be avoided. + None | Some(0) => self.subscribe().map(Ok).boxed(), + + // Otherwise, we need to hold the lock to prevent writes + // while we gather a snapshot of the persistent tail. + Some(n) => { + // Cap reading the tail into memory at a few hundred KiB. + let n = n.min(2500); + let (tail, more) = { + let inner = self.logger.clone().lock_owned().await; + let more = self.subscribe(); + asyncify(move || { + inner.sync_data().expect("error syncing data to disk"); + (inner.tail(n), more) + }) + } + .await; + + stream::once(future::ready(tail)).chain(more.map(Ok)).boxed() + } + } + } + + fn subscribe(&self) -> impl Stream { + BroadcastStream::new(self.broadcast.subscribe()).filter_map(move |x| { + future::ready(match x { + Ok(chunk) => Some(chunk), + Err(BroadcastStreamRecvError::Lagged(skipped)) => { + log::trace!("skipped {skipped} lines in module log"); + None + } + }) + }) + } +} + +fn read_lines(file: &mut File, num_lines: u32) -> io::Result> { + let mut buf = seek_buffer(num_lines); + seek_to(file, &mut buf, num_lines)?; + buf.clear(); + + file.read_to_end(&mut buf)?; + + Ok(buf) +} + +/// Allocate a buffer to use with [seek_to]. +/// +/// We assume a log line is typically around 150 bytes long, and allocate space +/// to fit `num_lines` in one read. The max size of the buffer is 64KiB. +fn seek_buffer(num_lines: u32) -> Vec { let chunk_size = std::cmp::min((num_lines as u64 * 150).next_power_of_two(), 0x10_000); - let mut buf = vec![0; chunk_size as usize]; + vec![0; chunk_size as usize] +} + +/// Set `file`'s position such that reading to the end will yield `num_lines`. +/// +/// If `file` contains less than `num_lines`, the position is set to the start. +/// +/// The function repeatedly fills `buf` from the end of the file, and counts the +/// number of LF characters in the buffer at each step until `num_lines` is +/// satisfied. +/// +/// `buf` should be created via [seek_buffer] and is supplied by the caller in +/// order to allow reuse of the allocation. +fn seek_to(file: &mut File, buf: &mut [u8], num_lines: u32) -> io::Result<()> { + let mut lines_read: u32 = 0; // the file should end in a newline, so we skip that one character let mut pos = file.seek(io::SeekFrom::End(0))?.saturating_sub(1) as usize; 'outer: while pos > 0 { @@ -267,7 +578,7 @@ fn read_latest_lines(logs_dir: ModuleLogsDir, num_lines: u32) -> io::Result (0, &mut buf[..pos]), }; pos = new_pos; - read_exact_at(&file, buf, pos as u64)?; + read_exact_at(file, buf, pos as u64)?; for lf_pos in memchr::Memchr::new(b'\n', buf).rev() { lines_read += 1; if lines_read >= num_lines { @@ -277,10 +588,8 @@ fn read_latest_lines(logs_dir: ModuleLogsDir, num_lines: u32) -> io::Result io::Result<()> { @@ -296,6 +605,50 @@ fn read_exact_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> io::Resul } } +/// Open the [File] at `path` for reading, or `None` if the file doesn't exist. +fn open_file(path: impl AsRef) -> io::Result> { + File::open(path).map(Some).or_else(|e| { + if e.kind() == io::ErrorKind::NotFound { + Ok(None) + } else { + Err(e) + } + }) +} + +/// Create a buffered [Stream] from a file. +/// +/// If `file` is `None`, the stream is empty. +fn into_file_stream(file: impl Into>) -> impl Stream> { + ReaderStream::new(BufReader::new(MaybeFile::new(file.into()))) +} + +pin_project! { + #[project = MaybeFileProj] + enum MaybeFile { + File { #[pin] inner: tokio::fs::File }, + Empty, + } +} + +impl MaybeFile { + pub fn new(file: Option) -> Self { + match file.map(tokio::fs::File::from_std) { + Some(inner) => Self::File { inner }, + None => Self::Empty, + } + } +} + +impl AsyncRead for MaybeFile { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>) -> Poll> { + match self.project() { + MaybeFileProj::File { inner } => inner.poll_read(cx, buf), + MaybeFileProj::Empty => Poll::Ready(Ok(())), + } + } +} + /// Somewhat ad-hoc wrapper around [`DatabaseLogger`] which allows to inject /// "system messages" into the user-retrievable database / module log #[repr(transparent)] @@ -320,3 +673,180 @@ impl SystemLogger { Record::injected(message) } } + +#[cfg(test)] +mod tests { + use std::{ops::Range, sync::Arc}; + + use bytes::BytesMut; + use futures::TryStreamExt as _; + + use crate::util::asyncify; + + use super::{DatabaseLogger, LogLevel, Record}; + + async fn write_logs(logger: Arc, r: Range) { + asyncify(move || { + for i in r { + logger.write( + LogLevel::Info, + &Record { + ts: chrono::Utc::now(), + target: None, + filename: None, + line_number: None, + function: None, + message: &format!("log line {i}"), + }, + &(), + ); + } + }) + .await + } + + fn deserialize_logs<'a>(raw: &'a [u8]) -> Result>, serde_json::Error> { + serde_json::StreamDeserializer::new(serde_json::de::SliceRead::new(raw)).collect() + } + + fn drop_logger(logger: Arc) { + Arc::try_unwrap(logger) + .map(drop) + .map_err(drop) + .expect("logger should be unique"); + } + + /// Test calling [DatabaseLogger::tail] with `Some(n)`. + /// + /// Like `tail -n`. + async fn tail_n(logger: DatabaseLogger) { + let logger = Arc::new(logger); + + write_logs(logger.clone(), 0..10).await; + + let a = logger + .tail(Some(10), false) + .await + .unwrap() + .try_collect::() + .await + .unwrap(); + let b = logger + .tail(None, false) + .await + .unwrap() + .try_collect::() + .await + .unwrap(); + assert_eq!(a, b); + + let c = logger + .tail(Some(5), false) + .await + .unwrap() + .try_collect::() + .await + .unwrap(); + let json_logs = deserialize_logs(&c).unwrap(); + + assert_eq!(json_logs.len(), 5); + assert_eq!(json_logs[0].message, "log line 5"); + assert_eq!(json_logs[4].message, "log line 9"); + } + + /// Test calling [DatabaseLogger::tail] with + /// `follow = true`. + /// + /// Like `tail -f`. + async fn tail_f(logger: DatabaseLogger) { + let logger = Arc::new(logger); + + let stream = logger.tail(None, true).await.unwrap().try_collect::(); + write_logs(logger.clone(), 0..10).await; + // Drop logger so stream terminates. + drop_logger(logger); + + let raw_logs = stream.await.unwrap(); + let json_logs = deserialize_logs(&raw_logs).unwrap(); + + assert_eq!(json_logs.len(), 10); + assert_eq!(json_logs[0].message, "log line 0"); + assert_eq!(json_logs[9].message, "log line 9"); + } + + /// Test calling [DatabaseLogger::tail] with + /// both `Some(n)` and `follow = true`. + /// + /// Like `tail -n N -f`. + async fn tail_nf(logger: DatabaseLogger) { + let logger = Arc::new(logger); + + write_logs(logger.clone(), 0..10).await; + let stream = logger.tail(Some(5), true).await.unwrap().try_collect::(); + write_logs(logger.clone(), 10..20).await; + // Drop logger so stream terminates. + drop_logger(logger); + + let raw_logs = stream.await.unwrap(); + let json_logs = deserialize_logs(&raw_logs).unwrap(); + + assert_eq!(json_logs.len(), 15); + assert_eq!(json_logs[0].message, "log line 5"); + assert_eq!(json_logs[14].message, "log line 19"); + } + + mod memory { + use super::DatabaseLogger; + + #[tokio::test] + async fn tail_n() { + super::tail_n(DatabaseLogger::in_memory(1024)).await + } + + #[tokio::test] + async fn tail_f() { + super::tail_f(DatabaseLogger::in_memory(1024)).await + } + + #[tokio::test] + async fn tail_nf() { + super::tail_nf(DatabaseLogger::in_memory(1024)).await + } + } + + mod file { + use std::future::Future; + + use spacetimedb_paths::{server::ModuleLogsDir, FromPathUnchecked}; + use tempfile::tempdir; + + use super::DatabaseLogger; + + #[tokio::test] + async fn tail_n() { + with_file_logger(super::tail_n).await + } + + #[tokio::test] + async fn tail_f() { + with_file_logger(super::tail_f).await + } + + #[tokio::test] + async fn tail_nf() { + with_file_logger(super::tail_nf).await + } + + async fn with_file_logger(f: F) + where + F: FnOnce(DatabaseLogger) -> Fut, + Fut: Future, + { + let tmp = tempdir().unwrap(); + let logs_dir = ModuleLogsDir::from_path_unchecked(tmp.path()); + let logger = DatabaseLogger::open_today(logs_dir); + + f(logger).await + } + } +} diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index cbf15ee0318..8ddc2ee1035 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -33,8 +33,7 @@ use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::traits::Program; use spacetimedb_durability::{self as durability}; use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp}; -use spacetimedb_paths::server::{ReplicaDir, ServerDataDir}; -use spacetimedb_paths::FromPathUnchecked; +use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir}; use spacetimedb_sats::hash::Hash; use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle}; use spacetimedb_schema::def::ModuleDef; @@ -43,7 +42,6 @@ use std::future::Future; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; -use tempfile::TempDir; use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock}; use tokio::task::AbortHandle; use tokio::time::error::Elapsed; @@ -53,6 +51,14 @@ use tokio::time::{interval_at, timeout, Instant}; // // - [db::Config] should be per-[Database] +/// The maximum size of in-memory database loggers. +/// +/// Currently 16KiB, or about 111k log records of 150 bytes. +// +// TODO(config): We may want to allow overriding this via [db::Config], if and +// when the config applies to individual databases (as opposed to globally). +const IN_MEMORY_DATABASE_LOGGER_MAX_SIZE: u64 = 0x1_000_000; + /// A shared mutable cell containing a module host and associated database. type HostCell = Arc>>; @@ -634,13 +640,17 @@ fn stored_program_hash(db: &RelationalDB) -> anyhow::Result> { } async fn make_replica_ctx( - path: ReplicaDir, + module_logs: Option, database: Database, replica_id: u64, relational_db: Arc, bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result { - let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs()))); + let logger = match module_logs { + Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, + None => Arc::new(DatabaseLogger::in_memory(IN_MEMORY_DATABASE_LOGGER_MAX_SIZE)), + }; + let send_worker_queue = spawn_send_worker(Some(database.database_identity)); let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new( send_worker_queue.clone(), @@ -741,7 +751,7 @@ async fn launch_module( on_panic: impl Fn() + Send + Sync + 'static, relational_db: Arc, energy_monitor: Arc, - replica_dir: ReplicaDir, + module_logs: Option, runtimes: Arc, executor: SingleCoreExecutor, bsatn_rlb_pool: BsatnRowListBuilderPool, @@ -749,7 +759,7 @@ async fn launch_module( let db_identity = database.database_identity; let host_type = database.host_type; - let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db, bsatn_rlb_pool) + let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db, bsatn_rlb_pool) .await .map(Arc::new)?; let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); @@ -922,7 +932,10 @@ impl Host { on_panic, Arc::new(db), energy_monitor.clone(), - replica_dir, + match config.storage { + db::Storage::Memory => None, + db::Storage::Disk => Some(replica_dir.module_logs()), + }, runtimes.clone(), host_controller.db_cores.take(), bsatn_rlb_pool.clone(), @@ -999,14 +1012,6 @@ impl Host { executor: SingleCoreExecutor, bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result> { - // Even in-memory databases acquire a lockfile. - // Grab a tempdir to put that lockfile in. - let phony_replica_dir = TempDir::with_prefix("spacetimedb-publish-in-memory-check") - .context("Error creating temporary directory to house temporary database during publish")?; - - // Leave the `TempDir` instance in place, so that its destructor will still run. - let phony_replica_dir = ReplicaDir::from_path_unchecked(phony_replica_dir.path().to_owned()); - let (db, _connected_clients) = RelationalDB::open( database.database_identity, database.owner_identity, @@ -1026,7 +1031,7 @@ impl Host { || log::error!("launch_module on_panic called for temporary publish in-memory instance"), Arc::new(db), Arc::new(NullEnergyMonitor), - phony_replica_dir, + None, runtimes.clone(), executor, bsatn_rlb_pool, diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index baa8f982381..ff0115d68dc 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -1064,23 +1064,22 @@ mod test { use anyhow::{anyhow, Result}; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::{bsatn::to_vec, AlgebraicType, AlgebraicValue, Hash, Identity, ProductValue}; - use spacetimedb_paths::{server::ModuleLogsDir, FromPathUnchecked}; use spacetimedb_primitives::{IndexId, TableId}; use spacetimedb_sats::product; - use tempfile::TempDir; /// An `InstanceEnv` requires a `DatabaseLogger` - fn temp_logger() -> Result { - let temp = TempDir::new()?; - let path = ModuleLogsDir::from_path_unchecked(temp.keep()); - let path = path.today(); - Ok(DatabaseLogger::open(path)) + fn temp_logger() -> DatabaseLogger { + DatabaseLogger::in_memory(64 * 1024) } /// An `InstanceEnv` requires a `ReplicaContext`. /// For our purposes this is just a wrapper for `RelationalDB`. fn replica_ctx(relational_db: Arc) -> Result<(ReplicaContext, tokio::runtime::Runtime)> { let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone()); + let logger = { + let _rt = runtime.enter(); + Arc::new(temp_logger()) + }; Ok(( ReplicaContext { database: Database { @@ -1091,7 +1090,7 @@ mod test { initial_program: Hash::ZERO, }, replica_id: 0, - logger: Arc::new(temp_logger()?), + logger, subscriptions: subs, relational_db, }, diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index 8b101b8dad5..b488de987e0 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -24,7 +24,6 @@ pub fn build_common_module_from_raw( let def: ModuleDef = raw_def.try_into()?; let replica_ctx = mcc.replica_ctx; - let log_tx = replica_ctx.logger.tx.clone(); // Note: assigns Reducer IDs based on the alphabetical order of reducer names. let info = ModuleInfo::new( @@ -32,7 +31,6 @@ pub fn build_common_module_from_raw( replica_ctx.owner_identity, replica_ctx.database_identity, mcc.program_hash, - log_tx, replica_ctx.subscriptions.clone(), ); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index ba06a25e57d..f0f12e35f5a 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -4,7 +4,7 @@ use super::{ }; use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender}; -use crate::database_logger::{LogLevel, Record}; +use crate::database_logger::{DatabaseLogger, LogLevel, Record}; use crate::db::relational_db::RelationalDB; use crate::energy::EnergyQuanta; use crate::error::DBError; @@ -232,8 +232,6 @@ pub struct ModuleInfo { pub database_identity: Identity, /// The hash of the module. pub module_hash: Hash, - /// Allows subscribing to module logs. - pub log_tx: tokio::sync::broadcast::Sender, /// Subscriptions to this module. pub subscriptions: ModuleSubscriptions, /// Metrics handles for this module. @@ -296,7 +294,6 @@ impl ModuleInfo { owner_identity: Identity, database_identity: Identity, module_hash: Hash, - log_tx: tokio::sync::broadcast::Sender, subscriptions: ModuleSubscriptions, ) -> Arc { let metrics = ModuleMetrics::new(&database_identity); @@ -305,7 +302,6 @@ impl ModuleInfo { owner_identity, database_identity, module_hash, - log_tx, subscriptions, metrics, }) @@ -1970,10 +1966,6 @@ impl ModuleHost { Ok(instance.common.call_view_with_tx(tx, params, instance.instance)) } - pub fn subscribe_to_logs(&self) -> anyhow::Result> { - Ok(self.info().log_tx.subscribe()) - } - pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { self.call( "", @@ -2188,6 +2180,10 @@ impl ModuleHost { self.replica_ctx().relational_db.durable_tx_offset() } + pub fn database_logger(&self) -> &Arc { + &self.replica_ctx().logger + } + pub(crate) fn replica_ctx(&self) -> &ReplicaContext { self.module.replica_ctx() } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index cd2da537a54..2f5f68aa5b6 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -8,6 +8,7 @@ use crate::subcommands::{extract_schema, start}; use anyhow::Context as _; use async_trait::async_trait; use clap::{ArgMatches, Command}; +use http::StatusCode; use spacetimedb::client::ClientActorIndex; use spacetimedb::config::{CertificateAuthority, MetadataFile}; use spacetimedb::db; @@ -114,8 +115,42 @@ impl StandaloneEnv { } } +#[derive(Debug, thiserror::Error)] +pub enum GetLeaderHostError { + #[error("database does not exist")] + NoSuchDatabase, + #[error("replica does not exist")] + NoSuchReplica, + #[error("error starting database")] + LaunchError { source: anyhow::Error }, + #[error("error accessing controldb")] + Control(#[from] control_db::Error), +} + +impl spacetimedb_client_api::MaybeMisdirected for GetLeaderHostError { + fn is_misdirected(&self) -> bool { + matches!(self, Self::NoSuchDatabase | Self::NoSuchReplica) + } +} + +impl From for axum::response::ErrorResponse { + fn from(e: GetLeaderHostError) -> Self { + let status = match e { + GetLeaderHostError::NoSuchDatabase | GetLeaderHostError::NoSuchReplica => StatusCode::NOT_FOUND, + GetLeaderHostError::LaunchError { .. } | GetLeaderHostError::Control { .. } => { + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + Self::from((status, e.to_string())) + } +} + #[async_trait] impl NodeDelegate for StandaloneEnv { + type JwtAuthProviderT = auth::DefaultJwtAuthProvider; + type GetLeaderHostError = GetLeaderHostError; + fn gather_metrics(&self) -> Vec { self.metrics_registry.gather() } @@ -124,30 +159,27 @@ impl NodeDelegate for StandaloneEnv { &self.client_actor_index } - type JwtAuthProviderT = auth::DefaultJwtAuthProvider; - fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT { &self.auth_provider } - async fn leader(&self, database_id: u64) -> anyhow::Result> { - let leader = match self.control_db.get_leader_replica_by_database(database_id) { - Some(leader) => leader, - None => return Ok(None), + async fn leader(&self, database_id: u64) -> Result { + let Some(leader) = self.control_db.get_leader_replica_by_database(database_id) else { + return Err(GetLeaderHostError::NoSuchReplica); }; - let database = self - .control_db - .get_database_by_id(database_id)? - .with_context(|| format!("Database {database_id} not found"))?; + let Some(database) = self.control_db.get_database_by_id(database_id)? else { + return Err(GetLeaderHostError::NoSuchDatabase); + }; self.host_controller .get_or_launch_module_host(database, leader.id) .await - .context("failed to get or launch module host")?; + .map_err(|source| GetLeaderHostError::LaunchError { source })?; - Ok(Some(Host::new(leader.id, self.host_controller.clone()))) + Ok(Host::new(leader.id, self.host_controller.clone())) } + fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir { self.data_dir().replica(replica_id).module_logs() } @@ -267,10 +299,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { let database_id = database.id; let database_identity = database.database_identity; - let leader = self - .leader(database_id) - .await? - .ok_or_else(|| anyhow::anyhow!("No leader for database"))?; + let leader = self.leader(database_id).await?; let update_result = leader .update(database, spec.host_type, spec.program_bytes.to_vec().into(), policy) .await?; @@ -331,10 +360,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { match existing_db { Some(db) => { - let host = self - .leader(db.id) - .await? - .ok_or_else(|| anyhow::anyhow!("No leader for database"))?; + let host = self.leader(db.id).await?; self.host_controller .migrate_plan( db, diff --git a/crates/testing/Cargo.toml b/crates/testing/Cargo.toml index 2d06a8dddce..8d93673acd0 100644 --- a/crates/testing/Cargo.toml +++ b/crates/testing/Cargo.toml @@ -28,6 +28,7 @@ lazy_static.workspace = true rand.workspace = true tempfile.workspace = true serde.workspace = true +futures.workspace = true [dev-dependencies] serial_test.workspace = true diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 2f12b1d690e..0665240ec72 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use std::sync::OnceLock; use std::time::Instant; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; +use futures::TryStreamExt as _; use spacetimedb::config::CertificateAuthority; use spacetimedb::messages::control_db::HostType; use spacetimedb::util::jobs::JobCores; @@ -18,7 +19,6 @@ use spacetimedb_schema::def::ModuleDef; use tokio::runtime::{Builder, Runtime}; use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMessage}; -use spacetimedb::database_logger::DatabaseLogger; use spacetimedb::db::{Config, Storage}; use spacetimedb::host::FunctionArgs; use spacetimedb::messages::websocket::CallReducerFlags; @@ -87,8 +87,17 @@ impl ModuleHandle { } pub async fn read_log(&self, size: Option) -> String { - let logs_dir = self._env.data_dir().replica(self.client.replica_id).module_logs(); - DatabaseLogger::read_latest(logs_dir, size).await + let bytes = self + .client + .module() + .database_logger() + .tail(size, false) + .await + .unwrap() + .try_collect::() + .await + .expect("failed to collect log stream"); + String::from_utf8(bytes.into()).unwrap() } } @@ -227,11 +236,7 @@ impl CompiledModule { name: env.client_actor_index().next_client_name(), }; - let host = env - .leader(database.id) - .await - .expect("host should be running") - .expect("host should be running"); + let host = env.leader(database.id).await.expect("host should be running"); let module_rx = host.module_watcher().await.unwrap(); // TODO: it might be neat to add some functionality to module handle to make