diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 78238e78b..1b8f6385f 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -31,6 +31,17 @@ "ban_timeout": 300000, "broadcast_address": null, "broadcast_port": 6433, + "cache": { + "backend": "redis", + "enabled": false, + "max_result_size": 0, + "policy": "no_cache", + "redis": { + "cache_key_prefix": "pgdog:", + "url": "redis://localhost:6379" + }, + "ttl": 300 + }, "checkout_timeout": 5000, "client_connection_recovery": "drop", "client_idle_in_transaction_timeout": 9223372036854775807, @@ -275,6 +286,75 @@ } ] }, + "Cache": { + "description": "Cache configuration.", + "type": "object", + "properties": { + "backend": { + "description": "Which storage backend to use.\n\n_Default:_ `redis`", + "$ref": "#/$defs/CacheBackend", + "default": "redis" + }, + "enabled": { + "description": "Whether to enable caching.\n\n_Default:_ `false`", + "type": "boolean", + "default": false + }, + "max_result_size": { + "description": "Maximum result size in bytes to cache (0 = unlimited).\n\n_Default:_ `0`", + "type": "integer", + "format": "uint", + "default": 0, + "minimum": 0 + }, + "policy": { + "description": "Cache policy: `no_cache` or `cache`.\n\n_Default:_ `no_cache`", + "$ref": "#/$defs/CachePolicy", + "default": "no_cache" + }, + "redis": { + "description": "Redis backend configuration.\n\nOnly read when `backend = \"redis\"`.", + "$ref": "#/$defs/RedisConfig", + "default": { + "cache_key_prefix": "pgdog:", + "url": "redis://localhost:6379" + } + }, + "ttl": { + "description": "Default TTL in seconds for cached queries.\n\n_Default:_ `300`", + "type": "integer", + "format": "uint64", + "default": 300, + "minimum": 0 + } + }, + "additionalProperties": false + }, + "CacheBackend": { + "description": "Cache storage backend discriminator.", + "oneOf": [ + { + "description": "Redis backend (default).", + "type": "string", + "const": "redis" + } + ] + }, + "CachePolicy": { + "description": "Cache policy.", + "oneOf": [ + { + "description": "Never cache queries for this database.", + "type": "string", + "const": "no_cache" + }, + { + "description": "Always cache read queries.", + "type": "string", + "const": "cache" + } + ] + }, "ConnectionRecovery": { "description": "controls if server connections are recovered or dropped if a client abruptly disconnects.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#connection_recovery", "oneOf": [ @@ -574,6 +654,21 @@ "maximum": 65535, "minimum": 0 }, + "cache": { + "description": "Redis cache configuration for this database.", + "$ref": "#/$defs/Cache", + "default": { + "backend": "redis", + "enabled": false, + "max_result_size": 0, + "policy": "no_cache", + "redis": { + "cache_key_prefix": "pgdog:", + "url": "redis://localhost:6379" + }, + "ttl": 300 + } + }, "checkout_timeout": { "description": "Maximum amount of time a client is allowed to wait for a connection from the pool.\n\n_Default:_ `5000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#checkout_timeout", "type": "integer", @@ -1441,6 +1536,23 @@ } ] }, + "RedisConfig": { + "description": "Redis-specific cache backend configuration.\n\nCorresponds to the `[general.cache.redis]` TOML section.", + "type": "object", + "properties": { + "cache_key_prefix": { + "description": "Key prefix prepended to every cache key stored in Redis.\n\n_Default:_ `pgdog:`", + "type": "string", + "default": "pgdog:" + }, + "url": { + "description": "Redis connection URL.\n\n_Default:_ `redis://localhost:6379`", + "type": "string", + "default": "redis://localhost:6379" + } + }, + "additionalProperties": false + }, "ReplicaLag": { "description": "Replica lag banning configuration. When a replica's replication lag exceeds the threshold, it is banned from serving read queries.", "type": "object", diff --git a/Cargo.lock b/Cargo.lock index 98f6c842c..d2a7fa197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -663,7 +663,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -973,6 +973,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.4" @@ -1023,6 +1029,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "critical-section" version = "1.2.0" @@ -1450,6 +1462,15 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.11.1" @@ -1497,6 +1518,47 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fred" +version = "9.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cdd5378252ea124b712e0ac55147d26ae3af575883b34b8423091a4c719606b" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "crossbeam-queue", + "float-cmp", + "fred-macros", + "futures", + "log", + "parking_lot", + "rand 0.8.5", + "redis-protocol", + "rustls 0.23.27", + "rustls-native-certs 0.7.3", + "semver", + "socket2", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + +[[package]] +name = "fred-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -2871,6 +2933,7 @@ dependencies = [ "dashmap", "derive_builder", "fnv", + "fred", "futures", "hickory-resolver", "http-body-util", @@ -2914,6 +2977,7 @@ dependencies = [ "tracing-subscriber", "url", "uuid", + "xxhash-rust", ] [[package]] @@ -3458,6 +3522,20 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "redis-protocol" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" +dependencies = [ + "bytes", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom", +] + [[package]] name = "redox_syscall" version = "0.5.12" @@ -3826,6 +3904,19 @@ dependencies = [ "security-framework 2.11.1", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.1" @@ -5859,6 +5950,12 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yoke" version = "0.8.0" diff --git a/docs/CACHE.md b/docs/CACHE.md new file mode 100644 index 000000000..86177fbc1 --- /dev/null +++ b/docs/CACHE.md @@ -0,0 +1,289 @@ +# Cache for pgdog — State of Implementation + +## Architecture + +Cache SELECT queries in Redis, bypass PostgreSQL on cache hit, populate cache on cache miss. Two-tier policy resolution: SQL comment/connection parameter → pgdog's config. + +--- + +## Implementation + +### Configuration (`pgdog-config`) + +**`cache.rs`** — Cache configuration types: + +**CachePolicy enum:** `NoCache` (default), `Cache`. Implements `FromStr`, `Display`, `Serialize`, `Deserialize`, `Copy`, `JsonSchema`. + +**CacheBackend enum:** `Redis` (default). Discriminator for selecting the storage backend and for hotswap detection when the backend type changes in config. + +**RedisConfig struct** (`[general.cache.redis]`): +- `url: String` — Redis connection URL (default `redis://localhost:6379`) +- `cache_key_prefix: String` — prefix prepended to every Redis key (default `pgdog:`) + +**Cache struct** (`[general.cache]`): +- `enabled: bool` — is caching on? (default `false`) +- `policy: CachePolicy` — which policy? (default `no_cache`) +- `ttl: u64` — default TTL seconds (default `300`) +- `backend: CacheBackend` — which storage backend (default `redis`) +- `redis: RedisConfig` — Redis-specific settings +- `max_result_size: usize` — max cached result bytes (default `0` = unlimited) + +Example TOML: +```toml +[general.cache] +enabled = true +policy = "cache" +ttl = 300 + +[general.cache.redis] +url = "redis://localhost:6379" +cache_key_prefix = "pgdog:" +``` + +**`general.rs`** — `General` struct holds `cache: Cache` field. **Cache config is global.** + +**`lib.rs`** — Exports `pub use cache::{CacheBackend, CachePolicy, Cache, RedisConfig as CacheRedisConfig};`. + +### Cache Module (`pgdog/src/frontend/cache/`) + +**`mod.rs`** — Module exports, global singleton, and main `Cache` struct: +```rust +pub mod context; +pub mod integration; +pub mod policy; +pub mod storage; + +pub use context::CacheContext; +pub use integration::CacheCheckResult; +pub use policy::CacheDecision; +pub use storage::{CacheStorage, RedisCacheStorage}; +``` + +`Cache` struct wraps `RwLock>>` (tokio `RwLock`). + +**Global singleton:** Cache is global-scoped, not connection-scoped. Accessed via `cache()` function which returns `Arc` from a `Lazy>` static. `Cache::new()` reads config internally — no parameters needed. + +**Config hotswap:** `hotswap_if_needed()` is called at the top of `try_read_cache` and `save_response_in_cache`. It fast-paths with a read-lock; acquires write-lock only if the URL or backend type has changed, then rebuilds the storage. + +Key methods: +- `new()` — creates storage from current config (or `None` if disabled) +- `hotswap_if_needed()` — compares live config against the active storage's one with `has_config_changed()`; swaps if `true` returned +- `try_read_cache(cache_context, in_transaction, client_request, params)` — hotswaps, calls `cache_check()`, returns `Ok(Some(Vec))` on HIT (caller replays through pipeline), `Ok(None)` on MISS/PASSTHROUGH +- `save_response_in_cache(cache_context)` — hotswaps, finalizes by storing the captured response + +**`storage/mod.rs`** — Abstract storage trait and error type: +- `CacheStorage` trait: `get`, `set`, `is_enabled`, `has_config_changed` — implemented by all cache backends +- `Error` enum shared across all backends: `RedisError`, `ConnectionFailed`, `CacheMiss` + +**`storage/redis.rs`** — Redis storage backend (`RedisCacheStorage`) implementing `CacheStorage`: +- `RedisCacheStorage::new(config)` — builds client from given URL; immediately spawns a background connection task; returns `None` if URL is invalid +- Background connect task: retries `init()` in a loop (5ms to 5s exponential backoff); sets `reconnecting = false` on success; CAS-guarded so only one task runs at a time +- `get(&self, key)` — returns `Result, Error>`; returns `Err(Error::ConnectionFailed)` immediately (triggering cache miss) if not yet connected; marks `reconnecting` and spawns reconnect on Redis errors +- `set(&self, key, value, ttl)` — stores bytes with EX expiration; returns immediately on disconnect; respects `max_result_size` from live config +- `reconnect()` — spawns reconnect if not already running (CAS-guarded) +- `has_config_changed()` — returns `true` if cache config has changed (used for hotswap detection) +- `is_enabled()` — reads live `config().config.general.cache.enabled` +- Key prefix comes from `config().config.general.cache.redis.cache_key_prefix` +- `reconnecting: Arc` — prevents multiple concurrent reconnect tasks +- All Redis operations wrapped in `tokio::time::timeout(REDIS_OPERATION_TIMEOUT)` (2s) + +**`policy.rs`** — 2-tier policy resolution: +- `CacheDirective` enum: `Cache { ttl_seconds }`, `ForceCache { ttl_seconds }`, `NoCache` (default) +- `CacheDecision` enum: `Skip`, `Cache(u64)`, `ForceCache(u64)` +- `resolve(client_request, params, is_read)` — main resolver function, chains all tiers +- `get_cache_directive(client_request, params)` — comment hint (from AST) has priority over connection parameter (`pgdog.cache`) +- `extract_parameter_directive(params)` — parses `pgdog.cache` parameter: `no_cache`, `cache`, `cache ttl=N`, `force_cache`, `force_cache ttl=N` +- Tier 1: Extractor directive (`CacheDirective::Cache { ttl }`, `CacheDirective::ForceCache { ttl }`, or `CacheDirective::NoCache`) +- Tier 2: Global config `CachePolicy` (`NoCache` / `Cache`) + +**`context.rs`** — Cache context held in `QueryEngineContext`: +- `CacheContext` with `cache_miss: Option`, `response_buffer: Vec`, and `had_error: bool` +- `capture_response(message)` — stores message in buffer when cache miss is tracked; sets `had_error = true` on `E` messages +- `reset()` — clears all state for per-query isolation + +**`integration.rs`** — Integration methods on `impl Cache`: +- `cache_check()` — main entry point, checks route, calls `policy::resolve()`, checks Redis +- `deserialize_cached(Vec) -> Vec` — parses a flat blob of concatenated PostgreSQL wire messages into individual `Message` values. Wire format: `[1B code][4B length (incl. itself)][payload]`. Named constants `HEADER_CODE_LEN`, `HEADER_LEN_SIZE`, `HEADER_TOTAL` replace the former magic numbers. Not Redis-specific — usable with any cache backend that stores raw bytes. +- `cache_response()` — serializes `Vec` into wire bytes and stores in Redis +- Cache key: XXH3 hash of `database_name + raw_query_string` + +### Query Engine Integration + +**`pgdog/src/frontend/client/query_engine/mod.rs`** +- Imports global `cache()` from `frontend::cache` +- `handle()` flow: after `route_query()` and before `before_execution()`, calls `cache().try_read_cache(context)`. If HIT: replays each cached `Message` through `process_server_message()` (same pipeline as live backend responses — stats, transaction state, hooks all fire correctly), then returns. On MISS: stores state in `context.cache_context`. +- After `match command`, calls `cache().save_response_in_cache(context)` to finalize caching. + +**`pgdog/src/frontend/client/query_engine/query.rs`** +- `process_server_message()` calls `context.cache_context.capture_response(message.clone())`. + +**`pgdog/src/frontend/client/query_engine/context.rs`** +- `QueryEngineContext` holds `cache_context: CacheContext` field. + +### Backend and Config Integration + +**`pgdog/src/backend/pool/cluster.rs`** +- `ClusterConfig` and `Cluster` hold `cache_enabled: bool` field +- Query parser requirement check includes `|| self.cache_enabled()` — when caching is on, the query parser is forced on. + +**`pgdog-config/src/core.rs`** +- Startup warning emitted when `cache.is_enabled()` and parser is `Off` or `SessionControl`. + +### Dependencies + +**`pgdog/Cargo.toml`** +fred = { version = "9", features = ["enable-rustls"] } +xxhash-rust = { version = "0.8", features = ["xxh3"]} + +--- + +## Key Design Decisions + +| Decision | Choice | +|----------|--------| +| Interception point | Between `route_query()` and `before_execution()` in `handle()` | +| Cache config scope | **Global** (`config.general.cache`) | +| Redis client | `fred` crate v9 (async-native, tokio integration) | +| Cacheable queries | Only reads (`route.is_read()`) | +| Cache policy resolution | 2-tier: SQL comment/param → DB policy | +| Cache HIT flow | Deserialize wire bytes → `Vec` → replay each through `process_server_message()` | +| Cache MISS flow | Normal execute → capture response via `CacheContext` → store in Redis → respond | +| Cache key | XXH3 hash of `database_name + raw_query_string` | +| Wire format | Full PostgreSQL wire messages stored as raw bytes (one concatenated buffer) | + +--- + +## How to Control Cache + +### SQL Comments + +Add a C-style comment before your query. The first matching directive wins: + +```sql +-- Force bypass cache for this query +/* pgdog_cache: no_cache */ +SELECT * FROM users WHERE id = 1; + +-- Cache with database default TTL +/* pgdog_cache: cache */ +SELECT * FROM products WHERE category = 'electronics'; + +-- Cache with custom TTL in seconds +/* pgdog_cache: cache ttl=300 */ +SELECT * FROM orders; + +-- Force cache with database default TTL +-- Query hash computed as if comment were like "/* pgdog_cache: cache */" +/* pgdog_cache: force_cache */ +SELECT * FROM products WHERE category = 'electronics'; + +-- Force cache with custom TTL in seconds +-- Query hash computed as if comment were like "/* pgdog_cache: cache ttl=300*/" +/* pgdog_cache: force_cache ttl=300 */ +SELECT * FROM orders; +``` + +### Connection Parameter + +Set `pgdog.cache` at connection time (via DSN options) or with `SET` after connecting: + +```sql +-- Session-wide: all queries in this connection bypass cache +SET pgdog.cache = 'no_cache'; + +-- Session-wide: cache all queries with default TTL +SET pgdog.cache = 'cache'; + +-- Session-wide: cache all queries with 5-minute TTL +SET pgdog.cache = 'cache ttl=300'; + +-- Session-wide: force cache all queries with default TTL +SET pgdog.cache = 'force_cache'; + +-- Session-wide: force cache all queries with 5-minute TTL +SET pgdog.cache = 'force_cache ttl=300'; +``` + +```sh +# Session-wide: all queries in this connection bypass cache +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dno_cache + +# Session-wide: cache all queries with default TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dcache + +# Session-wide: cache all queries with 5-minute TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dcache%20ttl%3D300 + +# Session-wide: force cache all queries with default TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dforce_cache + +# Session-wide: force cache all queries with 5-minute TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dforce_cache%20ttl%3D300 +``` + +### Priority Order + +Sources are checked in order — first non-None result wins, then falls through to global config: + +``` +SQL comment → pgdog.cache parameter → DB policy config +(highest) (lowest) +``` + +--- + +## Completed + +1. **Redis client never connects** - Problem: CacheClient::new() built the client but never called init(). Fred requires explicit connection initialization. Fix: Added lazy `ensure_connected()` using `client.init().await`, guarded by `AtomicBool` so it runs exactly once on first get()/set(). Changed CacheClient from `#[derive(Debug)]` to manual Debug impl (contains `Arc`). + +2. **Redis GET fails on NULL / cache miss** - Problem: `client.get::()` throws `Parse Error: Cannot parse into bytes` when the key doesn't exist. Fix: Use `client.get::()` and check `val.is_null()` before extracting bytes. Later refined: `get()` now returns `Result, Error>` instead of `Result>>` — a missing key yields `Err(Error::CacheMiss)`, which is matched explicitly in `cache_check()` and converted to `CacheCheckResult::Miss`. Other errors propagate as `Passthrough`. + +3. **Wire format deserialization wrong in send_cached_response** - Problem: PostgreSQL wire message structure is `[1B code][4B length]` where length includes the 4B itself. I calculated `offset + 5 + msg_len` (treating length as payload-only), causing incorrect byte slicing. Fix: Corrected to `offset + 1 + msg_len`, then replaced magic numbers with named constants `HEADER_CODE_LEN`, `HEADER_LEN_SIZE`, `HEADER_TOTAL`. + +4. **Route incorrectly reports read-only as write when parser is disabled** - Problem: `query_parser_bypass()` conservatively returns `Route::write()` for all SQL when the query parser is disabled. Since pgdog doesn't enable the parser by default for simple queries, `route.is_read()` was false for `SELECT 1`. Fix: When any database has `cache.enabled = true`, the query parser level is auto-upgraded to `On` in the cluster config. The `|| self.cache_enabled()` check in `cluster.rs:475` forces the parser on. Cache also emits a startup warning if parser is `Off` or `SessionControl`. The old `is_likely_read()` string-prefix heuristic has been removed entirely. + +5. **DB cache config defaults** - Observation: `Cache.policy` defaults to `CachePolicy::NoCache`. Even with `enabled = true`, caching is skipped unless policy is explicitly set. User action taken: Add `policy = "cache"` to pgdog.toml. + +6. **Query parser auto-upgrade for caching** — When caching is enabled and parser is `Auto`/`Off`/`SessionControl`, the parser is forced to `On` via `|| self.cache_enabled()` check in `cluster.rs`. A startup warning is emitted in `core.rs` if parser remains incompatible. + +7. **Decoupled cache policy extraction** — Cache directives extracted via standalone regex in `cache/policy.rs`, works regardless of parser state. Supports `/* pgdog_cache: ... */` format with optional `ttl=` parameter. Unified with sharding hints via `comment()` function in `comment.rs`. + +8. **Error handling / Reconnection** — Automatic reconnection with background task, CAS-guarded single reconnect, 2s operation timeout on all Redis calls, PING-based connection verification. + +9. **Cache key collision across databases sharing one Redis** — Database name and raw query string are combined via a single XXH3 hash call, producing deterministic, collision-resistant per-database keys even on shared Redis. Different literal values in queries produce different cache keys. `force_cache` hints normalize the query in the hash to use the same key as regular `cache`. + +10. **Wire format serialization/deserialization** — PostgreSQL wire messages stored as raw bytes. Correct byte slice calculation expressed via named constants (`HEADER_CODE_LEN = 1`, `HEADER_LEN_SIZE = 4`, `HEADER_TOTAL = 5`). Deserialization extracted into `deserialize_cached()` with inline comments explaining each boundary check. + +11. **Do not cache error responses**. + +12. **Setting pgdog.cache via connection url doesn't work** — now works. + +13. **Moved all cache-related structs from QueryEngine to Client** — now all cache structs including redis client are creating for whole pgdog's lifetime. + +14. **Use built-in query comment hints** — Cache hints (`pgdog_cache:`) are now extracted alongside sharding hints (`pgdog_shard:`, `pgdog_sharding_key:`, `pgdog_role:`) via the unified `comment()` function in `comment.rs`. The `comment_cache` field is stored in `AstInner` and accessed during cache checking via `client_request.ast.comment_cache`. Policy resolution simplified: trait-based extractors replaced with free functions (`resolve()`, `get_cache_directive()`, `extract_parameter_directive()`). Comment hint (from AST) has priority over connection parameter `pgdog.cache`. `Cache` struct no longer needs `policy_dispatcher` field. Parameter format unified to `no_cache` (underscore, not dash). + +15. **Add cache config to .schema**. + +16. **Force-cache hint support** — `/* pgdog_cache: force_cache */` and `/* pgdog_cache: force_cache ttl=N */` directives always attempt to cache (cache key normalized), bypassing normal cache miss flow considerations. + +17. **Cache HIT replays through the server-message pipeline** — Previously, cache hits sent responses directly to the stream, bypassing `process_server_message()`. Now `try_read_cache()` returns `Option>` and the caller (`handle()`) feeds each message through `process_server_message()` — giving correct stats accounting, transaction state updates from `ReadyForQuery`, and hook invocations on every cache hit. + +18. **CacheClient error types refined** — `get()` now returns `Result, Error>` (no more `Option`). `Error::CacheMiss(u64)` is a dedicated variant for key-not-found; `Error::RedisError` is now a struct variant carrying `cmd: &'static str`, `key: u64`, and the underlying error for richer diagnostics. `Error::ConnectionFailed` uses `&'static str` instead of `String` to avoid heap allocation on the hot path. + +19. **Config hotswap** — `Cache` singleton holds `Arc>>>`. `hotswap_if_needed()` runs at the start of every `try_read_cache` and `save_response_in_cache` call: read-locks to compare the active backend's URL against `config().config.general.cache.redis.url`; if they differ (or the backend type changes) it write-locks and rebuilds the storage. Fast path is a read-lock-only check with no allocation. + +20. **CacheClient rewritten as `RedisCacheStorage`** — Replaced `CacheClient` with `RedisCacheStorage` implementing the `CacheStorage` trait. Key improvements: background connect task is spawned immediately in `new()` so the first query never blocks on init; `get`/`set` check only one atomic flag (`reconnecting`) and return immediately if `true` returned instead of running `ensure_connected`; the `Option` field and the three-condition guard at the top of every operation are gone; `reconnect` is the single place that sets the flag and CAS-guards the reconnect spawn. + +21. **Abstract storage backend** — `storage/mod.rs` defines the `CacheStorage` trait (`get`, `set`, `is_enabled`, `has_config_changed`) and the shared `Error` enum. `storage/redis.rs` is the Redis implementation. `Cache` holds `Box` behind a tokio `RwLock` so any backend (e.g. Memcached) can be plugged in by adding a sub-module under `storage/` and a variant to `CacheBackend`. `deserialize_cached()` remains backend-agnostic in `integration.rs`. + +22. **Nested backend config** — Backend-specific settings live in their own TOML subtable (`[general.cache.redis]`) rather than flat fields on `[general.cache]`. `RedisConfig` holds `url` and `cache_key_prefix`. When a new backend is added, it gets its own subtable (e.g. `[general.cache.memcached]`) without polluting the top-level cache section. `client.rs` renamed to `storage/redis.rs`. + +23. **Cache key must include Bind parameters for extended protocol** — For simple `Query` messages, parameter values are embedded in the SQL string, so the XXH3 hash of `database + query_text` is naturally unique per value. For extended protocol (Parse/Bind/Execute), the SQL contains `$1`/`$2` placeholders and the actual values arrive in the `Bind` message separately. The current hash ignores them, so `SELECT * FROM users WHERE id = $1` with `id = 1` and `id = 2` produce the same cache key — wrong rows are returned on the second call. Fix: hash `param.len` (the `i32` field, not the `len()` method which returns wire size) and `param.data` for each entry in `bind.params_raw()` into the hasher in `cache_check()` in `integration.rs`. This affects all production drivers that use extended protocol by default: psycopg3, asyncpg, JDBC, npgsql. Note: pgdog's built-in prepared statement cache (`PreparedStatements` / `GlobalCache`) is a proxy-level plan cache only — it deduplicates backend `Parse` round-trips. It does not cache result rows and is orthogonal to the Redis result cache. + +--- + +## What's Left To Do + +1. **Redis disconnect/reconnect under heavy load** — The reconnection logic works, but timing edge cases under rapid disconnect/reconnect cycles still need stress-testing. + +2. **Integration tests**. diff --git a/pgdog-config/src/cache.rs b/pgdog-config/src/cache.rs new file mode 100644 index 000000000..0a7ae9021 --- /dev/null +++ b/pgdog-config/src/cache.rs @@ -0,0 +1,162 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Cache policy. +#[derive( + Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum CachePolicy { + /// Never cache queries for this database. + #[default] + NoCache, + /// Always cache read queries. + Cache, +} + +impl std::str::FromStr for CachePolicy { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "no_cache" => Ok(Self::NoCache), + "cache" => Ok(Self::Cache), + _ => Err(format!("Invalid cache policy: {}", s)), + } + } +} + +impl std::fmt::Display for CachePolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let display = match self { + Self::NoCache => "no_cache", + Self::Cache => "cache", + }; + write!(f, "{}", display) + } +} + +/// Cache storage backend discriminator. +#[derive( + Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum CacheBackend { + /// Redis backend (default). + #[default] + Redis, +} + +/// Redis-specific cache backend configuration. +/// +/// Corresponds to the `[general.cache.redis]` TOML section. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct RedisConfig { + /// Redis connection URL. + /// + /// _Default:_ `redis://localhost:6379` + #[serde(default = "RedisConfig::url")] + pub url: String, + + /// Key prefix prepended to every cache key stored in Redis. + /// + /// _Default:_ `pgdog:` + #[serde(default = "RedisConfig::cache_key_prefix")] + pub cache_key_prefix: String, +} + +impl Default for RedisConfig { + fn default() -> Self { + Self { + url: Self::url(), + cache_key_prefix: Self::cache_key_prefix(), + } + } +} + +impl RedisConfig { + fn url() -> String { + "redis://localhost:6379".to_string() + } + + fn cache_key_prefix() -> String { + "pgdog:".to_string() + } +} + +/// Cache configuration. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Cache { + /// Whether to enable caching. + /// + /// _Default:_ `false` + #[serde(default = "Cache::enabled")] + pub enabled: bool, + + /// Cache policy: `no_cache` or `cache`. + /// + /// _Default:_ `no_cache` + #[serde(default = "Cache::policy")] + pub policy: CachePolicy, + + /// Default TTL in seconds for cached queries. + /// + /// _Default:_ `300` + #[serde(default = "Cache::ttl")] + pub ttl: u64, + + /// Which storage backend to use. + /// + /// _Default:_ `redis` + #[serde(default = "Cache::backend")] + pub backend: CacheBackend, + + /// Redis backend configuration. + /// + /// Only read when `backend = "redis"`. + #[serde(default)] + pub redis: RedisConfig, + + /// Maximum result size in bytes to cache (0 = unlimited). + /// + /// _Default:_ `0` + #[serde(default = "Cache::max_result_size")] + pub max_result_size: usize, +} + +impl Default for Cache { + fn default() -> Self { + Self { + enabled: Self::enabled(), + policy: Self::policy(), + ttl: Self::ttl(), + backend: Self::backend(), + redis: RedisConfig::default(), + max_result_size: Self::max_result_size(), + } + } +} + +impl Cache { + fn enabled() -> bool { + false + } + + fn policy() -> CachePolicy { + CachePolicy::default() + } + + fn ttl() -> u64 { + 300 + } + + fn backend() -> CacheBackend { + CacheBackend::default() + } + + fn max_result_size() -> usize { + 0 + } +} diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 856518a89..52e0187fb 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -564,6 +564,11 @@ impl Config { r#""pg_query_raw" parser engine requires a large thread stack, setting it to 32MiB for each Tokio worker"# ); } + + if self.general.cache.enabled + && matches!(self.general.query_parser, QueryParserLevel::Off | QueryParserLevel::SessionControl) { + warn!("cache requires enabled query parser but it's disabled or session controlled"); + } } /// Multi-tenancy is enabled. diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index aa0f636f6..59d76aafa 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use crate::cache::Cache; use crate::pooling::ConnectionRecovery; use crate::UniqueIdFunction; use crate::{ @@ -643,6 +644,10 @@ pub struct General { /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_save_config #[serde(default)] pub cutover_save_config: bool, + + /// Redis cache configuration for this database. + #[serde(default)] + pub cache: Cache, } impl Default for General { @@ -729,6 +734,7 @@ impl Default for General { cutover_timeout_action: Self::cutover_timeout_action(), cutover_save_config: bool::default(), unique_id_function: Self::unique_id_function(), + cache: Cache::default(), } } } diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 1a106a295..22ab53404 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -1,5 +1,6 @@ // Submodules pub mod auth; +pub mod cache; pub mod core; pub mod data_types; pub mod database; @@ -18,6 +19,7 @@ pub mod users; pub mod util; pub use auth::{AuthType, PassthroughAuth}; +pub use cache::{CacheBackend, CachePolicy, Cache, RedisConfig as CacheRedisConfig}; pub use core::{Config, ConfigAndUsers}; pub use data_types::*; pub use database::{ diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 7c62c28c4..461daec45 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -69,6 +69,8 @@ pgdog-config = { path = "../pgdog-config" } pgdog-vector = { path = "../pgdog-vector" } pgdog-stats = { path = "../pgdog-stats" } pgdog-postgres-types = { path = "../pgdog-postgres-types"} +fred = { version = "9", features = ["enable-rustls"] } +xxhash-rust = { version = "0.8", features = ["xxh3"]} [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 9dbd038d0..1dff017a5 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -81,6 +81,7 @@ pub struct Cluster { reload_schema_on_ddl: bool, load_schema: LoadSchema, resharding_parallel_copies: usize, + cache_enabled: bool, } /// Sharding configuration from the cluster. @@ -157,6 +158,7 @@ pub struct ClusterConfig<'a> { pub reload_schema_on_ddl: bool, pub load_schema: LoadSchema, pub resharding_parallel_copies: usize, + pub cache_enabled: bool, } impl<'a> ClusterConfig<'a> { @@ -210,6 +212,7 @@ impl<'a> ClusterConfig<'a> { reload_schema_on_ddl: general.reload_schema_on_ddl, load_schema: general.load_schema, resharding_parallel_copies: general.resharding_parallel_copies, + cache_enabled: general.cache.enabled, } } } @@ -247,6 +250,7 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + cache_enabled, } = config; let identifier = Arc::new(DatabaseUser { @@ -296,6 +300,7 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + cache_enabled, } } @@ -470,6 +475,7 @@ impl Cluster { || self.dry_run() || self.prepared_statements() == &PreparedStatements::Full || self.pub_sub_enabled() + || self.cache_enabled() || RegexParser::use_parser(request) } } @@ -545,6 +551,11 @@ impl Cluster { self.resharding_parallel_copies } + /// Redis cache enabled. + pub fn cache_enabled(&self) -> bool { + self.cache_enabled + } + /// Launch the connection pools. pub(crate) fn launch(&self) { for shard in self.shards() { diff --git a/pgdog/src/config/cache.rs b/pgdog/src/config/cache.rs new file mode 100644 index 000000000..ece03acb6 --- /dev/null +++ b/pgdog/src/config/cache.rs @@ -0,0 +1 @@ +pub use pgdog_config::cache::*; diff --git a/pgdog/src/config/mod.rs b/pgdog/src/config/mod.rs index 835a0f10e..6ecd3785d 100644 --- a/pgdog/src/config/mod.rs +++ b/pgdog/src/config/mod.rs @@ -1,6 +1,7 @@ //! Configuration. // Submodules +pub mod cache; pub mod convert; pub mod core; pub mod database; @@ -15,6 +16,7 @@ pub mod rewrite; pub mod sharding; pub mod users; +pub use cache::*; pub use core::{Config, ConfigAndUsers}; pub use database::{Database, Role}; pub use error::Error; diff --git a/pgdog/src/frontend/cache/context.rs b/pgdog/src/frontend/cache/context.rs new file mode 100644 index 000000000..aeeab7613 --- /dev/null +++ b/pgdog/src/frontend/cache/context.rs @@ -0,0 +1,31 @@ +use crate::{ + frontend::cache::integration::CacheMiss, + net::{messages::Protocol, Message}, +}; + +/// Cache context to use in QueryEngineContext. +#[derive(Default)] +pub struct CacheContext { + pub cache_miss: Option, + pub response_buffer: Vec, + pub had_error: bool, +} + +impl CacheContext { + /// Capture a response message for caching. + pub fn capture_response(&mut self, message: Message) { + if self.cache_miss.is_some() { + if message.code() == 'E' { + self.had_error = true; + } + self.response_buffer.push(message); + } + } + + /// Reset the cache context for a new query. + pub fn reset(&mut self) { + self.cache_miss = None; + self.response_buffer.clear(); + self.had_error = false; + } +} diff --git a/pgdog/src/frontend/cache/integration.rs b/pgdog/src/frontend/cache/integration.rs new file mode 100644 index 000000000..5114c865f --- /dev/null +++ b/pgdog/src/frontend/cache/integration.rs @@ -0,0 +1,215 @@ +use std::hash::{Hash, Hasher}; + +use once_cell::sync::Lazy; +use regex::Regex; + +use crate::{ + frontend::{ + cache::{storage::Error as CacheStorageError, CacheDecision}, + ClientRequest, + }, + net::{FromBytes, Message, Parameters, ToBytes}, +}; + +use tracing::{debug, warn}; + +use super::{policy, Cache}; + +static FORCE_CACHE_RE: Lazy = + Lazy::new(|| Regex::new(r#"pgdog_cache:\s*force_cache"#).unwrap()); + +pub struct CacheMiss { + pub cache_key_hash: u64, + pub ttl: u64, +} + +pub enum CacheCheckResult { + Hit { cached: Vec }, + Miss(CacheMiss), + Passthrough, +} + +const HEADER_CODE_LEN: usize = 1; +const HEADER_LEN_SIZE: usize = 4; +const HEADER_TOTAL: usize = HEADER_CODE_LEN + HEADER_LEN_SIZE; + +impl Cache { + pub(super) async fn cache_check( + &self, + in_transaction: bool, + client_request: &ClientRequest, + params: &Parameters, + ) -> Result { + if in_transaction { + return Ok(CacheCheckResult::Passthrough); + } + + let route = match client_request.route.as_ref() { + Some(r) => r, + None => return Ok(CacheCheckResult::Passthrough), + }; + + // Detect read-only status via the AST parser's route classification. + // When caching is enabled, the query parser is auto-enabled. + let is_read = route.is_read(); + if !is_read { + return Ok(CacheCheckResult::Passthrough); + } + + let query = match client_request.query() { + Ok(Some(q)) => q, + _ => return Ok(CacheCheckResult::Passthrough), + }; + + let compute_cache_key_hash = || { + let user = params.get_required("user")?; + let database = params.get_default("database", user); + let mut hasher = xxhash_rust::xxh3::Xxh3Default::new(); + database.hash(&mut hasher); + let normalized_query = FORCE_CACHE_RE.replace(query.query(), "pgdog_cache: cache"); + normalized_query.hash(&mut hasher); + if let Some(bind) = client_request.parameters()? { + for param in bind.params_raw() { + param.len.hash(&mut hasher); + param.data.hash(&mut hasher); + } + }; + Ok::(hasher.finish()) + }; + + let decision = policy::resolve(client_request, params, is_read).await; + match decision { + CacheDecision::Skip => Ok(CacheCheckResult::Passthrough), + CacheDecision::ForceCache(ttl) => Ok(CacheCheckResult::Miss(CacheMiss { + cache_key_hash: compute_cache_key_hash()?, + ttl, + })), + CacheDecision::Cache(ttl) => { + let cache_key_hash = compute_cache_key_hash()?; + let guard = self.storage.read().await; + match guard.as_ref() { + None => Ok(CacheCheckResult::Passthrough), + Some(storage) => match storage.get(cache_key_hash).await { + Ok(cached) => Ok(CacheCheckResult::Hit { cached }), + Err(CacheStorageError::CacheMiss(_)) => { + Ok(CacheCheckResult::Miss(CacheMiss { + cache_key_hash, + ttl, + })) + } + Err(e) => { + warn!("{}", e); + Ok(CacheCheckResult::Passthrough) + } + }, + } + } + } + } + + /// Deserializes a flat byte blob (N concatenated PostgreSQL wire messages) into `Vec`. + /// + /// Redis stores cache responses as raw wire-format bytes concatenated together without framing. + /// We walk through the blob reading each message boundary, then slice out the individual message. + /// + /// ### PostgreSQL wire protocol message layout: + /// + /// [Source](https://www.postgresql.org/docs/current/protocol-overview.html) + /// + /// ```text + /// +----------+--------------------------+-------------------+ + /// | 1 byte | 4 bytes (big-endian) | N bytes (payload) | + /// | code | length (incl. 4B itself) | data | + /// +----------+--------------------------+-------------------+ + /// ``` + /// + /// Constants for parsing: + /// - `HEADER_CODE_LEN` = 1 byte (message type code, e.g. 'T' = RowDescription) + /// - `HEADER_LEN_SIZE` = 4 bytes (message length, includes itself but NOT the code byte) + /// - `HEADER_TOTAL` = 5 bytes (minimum bytes needed to read the length field) + pub(super) fn deserialize_cached(cached: Vec) -> Vec { + let mut messages = Vec::new(); + let mut offset = 0; + let len = cached.len(); + + while offset < len { + // Need at least a full header (code + length) to proceed. + if offset + HEADER_TOTAL > len { + debug!( + "deserializing cached response: not enough bytes for message header (offset={}, len={})", + offset, len + ); + break; + } + + let _code = cached[offset] as char; + + // Read the message length field (4 bytes, big-endian). + // This length includes the 4-byte length field itself but NOT the code byte. + let msg_len = u32::from_be_bytes([ + cached[offset + 1], + cached[offset + 2], + cached[offset + 3], + cached[offset + 4], + ]) as usize; + + // Sanity checks: + // 1. Length must be at least 4 (the length field itself): if < 4 the data is corrupt. + // 2. Must not read past the end of the blob. + if msg_len < 4 || offset + HEADER_CODE_LEN + msg_len > len { + debug!( + "deserializing cached response: invalid msg length {} (offset={}, len={})", + msg_len, offset, len + ); + break; + } + + // Full message spans: 1 byte (code) + msg_len (length field + payload) + let end = offset + HEADER_CODE_LEN + msg_len; + + let msg_bytes: bytes::Bytes = cached[offset..end].to_vec().into(); + if let Ok(msg) = Message::from_bytes(msg_bytes) { + messages.push(msg); + } + offset = end; + } + + messages + } + + pub(super) async fn cache_response( + &self, + cache_key_hash: u64, + messages: Vec, + ttl: u64, + ) { + let guard = self.storage.read().await; + let storage = match guard.as_ref() { + Some(s) if s.is_enabled() => s, + _ => return, + }; + + if messages.is_empty() { + return; + } + + let mut buffer = Vec::new(); + for msg in &messages { + match msg.to_bytes() { + Ok(bytes) => buffer.extend_from_slice(&bytes), + Err(e) => { + warn!("Failed to serialize message for caching: {}", e); + return; + } + } + } + + if buffer.is_empty() { + return; + } + + if let Err(e) = storage.set(cache_key_hash, &buffer, ttl).await { + warn!("{}", e); + } + } +} diff --git a/pgdog/src/frontend/cache/mod.rs b/pgdog/src/frontend/cache/mod.rs new file mode 100644 index 000000000..6c0023b2d --- /dev/null +++ b/pgdog/src/frontend/cache/mod.rs @@ -0,0 +1,143 @@ +pub mod context; +pub mod integration; +pub mod policy; +pub mod storage; + +pub use context::CacheContext; +pub use integration::CacheCheckResult; +pub use policy::CacheDecision; +pub use storage::{CacheStorage, RedisCacheStorage}; + +use once_cell::sync::Lazy; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::debug; + +use crate::{ + config::config, + frontend::{ + cache::{integration::CacheMiss, storage::build_storage}, + ClientRequest, + }, + net::{Message, Parameters}, +}; + +/// Wraps the active storage backend behind a tokio `RwLock` so it can be +/// hotswapped without restarting pgdog. +pub struct Cache { + storage: RwLock>>, +} + +impl std::fmt::Debug for Cache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Cache").field("storage", &"...").finish() + } +} + +static CACHE: Lazy> = Lazy::new(|| Arc::new(Cache::new())); + +pub fn cache() -> Arc { + CACHE.clone() +} + +impl Cache { + fn new() -> Self { + let storage = build_storage(); + Cache { + storage: RwLock::new(storage), + } + } + + /// Replace the storage backend if the config has changed (URL or backend type). + /// + /// Acquires the write lock only when a change is detected; otherwise the + /// read-lock path is zero-allocation and very fast. + async fn hotswap_if_needed(&self) { + let cfg = &config().config.general.cache; + + // Fast path: read-lock to check whether anything has changed. + { + let guard = self.storage.read().await; + let needs_swap = match guard.as_ref() { + Some(s) => s.has_config_changed(cfg), + None => cfg.enabled, + }; + if !needs_swap { + return; + } + } + + // Slow path: write-lock and rebuild. + let mut guard = self.storage.write().await; + // Re-check under the write lock (another task may have already swapped). + let needs_swap = match guard.as_ref() { + Some(s) => s.has_config_changed(cfg), + None => cfg.enabled, + }; + + if needs_swap { + debug!("Cache storage config changed — rebuilding backend"); + *guard = build_storage(); + } + } + + // ── public API ─────────────────────────────────────────────────────────── + + /// Check the cache for a query response. + /// + /// On HIT returns `Ok(Some(messages))` — the caller is responsible for + /// replaying these messages through the normal server-message pipeline. + /// + /// On MISS or PASSTHROUGH returns `Ok(None)` and updates `cache_context` + /// so that the response can later be captured and stored via + /// `save_response_in_cache`. + pub async fn try_read_cache( + &self, + cache_context: &mut CacheContext, + in_transaction: bool, + client_request: &ClientRequest, + params: &Parameters, + ) -> Result>, crate::frontend::Error> { + self.hotswap_if_needed().await; + + let cache_result = self + .cache_check(in_transaction, client_request, params) + .await?; + + match cache_result { + CacheCheckResult::Hit { cached } => { + debug!("Cache hit, serving from cache"); + let messages = Self::deserialize_cached(cached); + cache_context.reset(); + Ok(Some(messages)) + } + CacheCheckResult::Miss(cache_miss) => { + debug!("Cache miss for key hash: {}", cache_miss.cache_key_hash); + cache_context.cache_miss = Some(cache_miss); + cache_context.response_buffer.clear(); + cache_context.had_error = false; + Ok(None) + } + CacheCheckResult::Passthrough => { + cache_context.reset(); + Ok(None) + } + } + } + + /// Finalize caching by storing the response in the active backend. + pub async fn save_response_in_cache(&self, cache_context: &mut CacheContext) { + self.hotswap_if_needed().await; + + if let Some(CacheMiss { + cache_key_hash, + ttl, + }) = cache_context.cache_miss.take() + { + if !cache_context.had_error && !cache_context.response_buffer.is_empty() { + let messages = std::mem::take(&mut cache_context.response_buffer); + self.cache_response(cache_key_hash, messages, ttl).await; + } + } + } +} diff --git a/pgdog/src/frontend/cache/policy.rs b/pgdog/src/frontend/cache/policy.rs new file mode 100644 index 000000000..60073dcc9 --- /dev/null +++ b/pgdog/src/frontend/cache/policy.rs @@ -0,0 +1,97 @@ +use crate::config::{config, CachePolicy}; +use crate::frontend::ClientRequest; +use crate::net::parameter::ParameterValue; +use crate::net::Parameters; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum CacheDirective { + Cache { + ttl_seconds: Option, + }, + ForceCache { + ttl_seconds: Option, + }, + #[default] + NoCache, +} + +pub enum CacheDecision { + Skip, + Cache(u64), + ForceCache(u64), +} + +const KEY: &str = "pgdog.cache"; + +pub async fn resolve( + client_request: &ClientRequest, + params: &Parameters, + is_read: bool, +) -> CacheDecision { + let cache_config = &config().config.general.cache; + + if !is_read { + return CacheDecision::Skip; + } + + let cache_directive = get_cache_directive(client_request, params); + match cache_directive { + Some(CacheDirective::NoCache) => return CacheDecision::Skip, + Some(CacheDirective::Cache { ttl_seconds }) => { + return CacheDecision::Cache(ttl_seconds.unwrap_or(cache_config.ttl)) + } + Some(CacheDirective::ForceCache { ttl_seconds }) => { + return CacheDecision::ForceCache(ttl_seconds.unwrap_or(cache_config.ttl)) + } + _ => (), + } + + match cache_config.policy { + CachePolicy::NoCache => CacheDecision::Skip, + CachePolicy::Cache => CacheDecision::Cache(cache_config.ttl), + } +} + +// Comment hint has priority over connection parameter +fn get_cache_directive( + client_request: &ClientRequest, + params: &Parameters, +) -> Option { + client_request + .ast + .as_ref() + .and_then(|ast| ast.comment_cache) + .or_else(|| extract_parameter_directive(params)) +} + +fn extract_parameter_directive(params: &Parameters) -> Option { + let value = params.get(KEY)?; + let s = match value { + ParameterValue::String(v) => v.as_str().trim(), + _ => return None, + }; + + match s { + "no_cache" => return Some(CacheDirective::NoCache), + "force_cache" => return Some(CacheDirective::ForceCache { ttl_seconds: None }), + "cache" => return Some(CacheDirective::Cache { ttl_seconds: None }), + _ => (), + } + + if let Some(ttl) = s + .strip_prefix("force_cache") + .or_else(|| s.strip_prefix("cache")) + .map(|s| s.trim_start()) + .and_then(|s| s.strip_prefix("ttl=")) + .and_then(|t| t.trim().parse::().ok()) + { + let ttl_seconds = Some(ttl); + if s.starts_with("force_cache") { + return Some(CacheDirective::ForceCache { ttl_seconds }); + } else { + return Some(CacheDirective::Cache { ttl_seconds }); + } + } + + None +} diff --git a/pgdog/src/frontend/cache/storage/mod.rs b/pgdog/src/frontend/cache/storage/mod.rs new file mode 100644 index 000000000..91a7b377d --- /dev/null +++ b/pgdog/src/frontend/cache/storage/mod.rs @@ -0,0 +1,58 @@ +pub mod redis; + +pub use redis::RedisCacheStorage; + +use async_trait::async_trait; + +use crate::config::{ + cache::{Cache as CacheConfig, CacheBackend}, + config, +}; + +/// Errors returned by cache storage backends. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Redis {cmd} error for key {key}: {err}")] + RedisError { + cmd: &'static str, + key: u64, + err: fred::error::RedisError, + }, + #[error("Connection failed: {0}")] + ConnectionFailed(&'static str), + #[error("Cache miss for key {0}")] + CacheMiss(u64), +} + +/// Abstract cache storage backend. +/// +/// Implementations must be `Send + Sync` so they can be held behind +/// something like `Arc>` and shared across async tasks. +#[async_trait] +pub trait CacheStorage: Send + Sync { + /// Fetch cached bytes for `key`. Returns [`Error::CacheMiss`] when the + /// key is absent (not an error condition — used for control flow). + async fn get(&self, key: u64) -> Result, Error>; + + /// Store `value` under `key` with a `ttl` in seconds. + async fn set(&self, key: u64, value: &[u8], ttl: u64) -> Result<(), Error>; + + /// Returns `true` when the backend is configured and enabled. + fn is_enabled(&self) -> bool; + + /// Returns `true` if cache config has changed (used for hotswap detection). + fn has_config_changed(&self, new_config: &CacheConfig) -> bool; +} + +/// Construct the appropriate storage backend from the current config. +pub fn build_storage() -> Option> { + let cfg = &config().config.general.cache; + if !cfg.enabled { + return None; + } + match cfg.backend { + CacheBackend::Redis => { + RedisCacheStorage::new(cfg).map(|s| Box::new(s) as Box) + } + } +} diff --git a/pgdog/src/frontend/cache/storage/redis.rs b/pgdog/src/frontend/cache/storage/redis.rs new file mode 100644 index 000000000..e0aec87a6 --- /dev/null +++ b/pgdog/src/frontend/cache/storage/redis.rs @@ -0,0 +1,246 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use fred::prelude::*; +use pgdog_config::CacheBackend; +use tracing::{debug, error, info}; + +use crate::config::{cache::Cache as CacheConfig, config}; + +use super::{CacheStorage, Error}; + +/// Timeout for individual Redis operations (GET/SET/ping). +const REDIS_OPERATION_TIMEOUT: Duration = Duration::from_secs(2); +/// Max time between reconnection attempts +const MAX_REDIS_RECONNECTION_PERIOD: Duration = Duration::from_secs(5); + +/// Redis implementation of [`CacheStorage`]. +/// +/// Connection is established in a background task spawned from [`RedisCacheStorage::new`]. +/// All operations return immediately if the connection is not yet ready — `get` returns +/// [`Error::ConnectionFailed`] (triggering a cache-miss path) and `set` is silently dropped. +/// +/// At most one reconnect task runs at any time, enforced by a CAS on `reconnecting`. +pub struct RedisCacheStorage { + client: RedisClient, + /// Cache config. + config: CacheConfig, + /// Guards against spawning multiple concurrent reconnect tasks. + reconnecting: Arc, +} + +impl std::fmt::Debug for RedisCacheStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisCacheStorage") + .field("config", &self.config) + .field("reconnecting", &self.reconnecting.load(Ordering::Relaxed)) + .finish() + } +} + +impl RedisCacheStorage { + /// Build a new storage instance for `url` and immediately start a background + /// connection task. Returns `None` when the URL cannot be parsed. + pub fn new(config: &CacheConfig) -> Option { + let client_config = match RedisConfig::from_url(&config.redis.url) { + Ok(c) => c, + Err(e) => { + error!("Failed to parse Redis URL '{}': {}", config.redis.url, e); + return None; + } + }; + + let client = match Builder::from_config(client_config).build() { + Ok(c) => c, + Err(e) => { + error!("Failed to build Redis client: {}", e); + return None; + } + }; + + let reconnecting = Arc::new(AtomicBool::new(true)); // treat initial connect as "reconnecting" + + let storage = Self { + client, + config: config.clone(), + reconnecting, + }; + + // Fire-and-forget initial connection. + storage.spawn_connect_task(); + + Some(storage) + } + + // ── internal helpers ──────────────────────────────────────────────────── + + /// Spawn the (re)connect background loop. Uses a CAS to ensure only one + /// task is ever running at a time. + fn spawn_connect_task(&self) { + let client = self.client.clone(); + let reconnecting = self.reconnecting.clone(); + + tokio::spawn(async move { + info!("Redis connect task started"); + let mut attempt = 0u32; + + loop { + attempt += 1; + debug!("Redis connect attempt #{}", attempt); + + let init_ok = + match tokio::time::timeout(REDIS_OPERATION_TIMEOUT, client.init()).await { + Ok(Ok(_)) => true, + Ok(Err(e)) => { + debug!("Redis init error: {}", e); + false + } + Err(_) => { + debug!("Redis init timed out"); + false + } + }; + + if init_ok { + reconnecting.store(false, Ordering::Release); + info!("Redis connected (attempt #{})", attempt); + return; + } + + // Exponential backoff + tokio::time::sleep( + const { Duration::from_millis(5) } + .saturating_mul(1u32 << attempt.min(10)) + .min(MAX_REDIS_RECONNECTION_PERIOD), + ) + .await; + } + }); + } + + /// Mark the reconnecting as true and spawn a reconnect task if one is not + /// already running. + fn reconnect(&self) { + if self + .reconnecting + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + self.spawn_connect_task(); + } else { + debug!("Redis reconnect task already running"); + } + } +} + +#[async_trait] +impl CacheStorage for RedisCacheStorage { + async fn get(&self, key: u64) -> Result, Error> { + if self.reconnecting.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed("Redis not connected")); + } + + let full_key = format!("{}{}", self.config.redis.cache_key_prefix, key); + + let redis_result = tokio::time::timeout( + REDIS_OPERATION_TIMEOUT, + self.client.get::(full_key), + ) + .await; + let val = match redis_result { + Ok(Ok(v)) => v, + Ok(Err(err)) => { + self.reconnect(); + return Err(Error::RedisError { + cmd: "GET", + key, + err, + }); + } + Err(_) => { + self.reconnect(); + return Err(Error::ConnectionFailed("Redis GET timed out")); + } + }; + + match val.into_bytes() { + Some(bytes) => { + debug!("Cache hit for key {}", key); + Ok(bytes.to_vec()) + } + None => Err(Error::CacheMiss(key)), + } + } + + async fn set(&self, key: u64, value: &[u8], ttl: u64) -> Result<(), Error> { + if self.reconnecting.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed("Redis not connected")); + } + + let max_result_size = config().config.general.cache.max_result_size; + if max_result_size != 0 && value.len() > max_result_size { + debug!( + "Skipping cache for key {}: size {} exceeds max {}", + key, + value.len(), + max_result_size + ); + return Ok(()); + } + + let full_key = format!("{}{}", self.config.redis.cache_key_prefix, key); + let ttl_seconds = ttl as i64; + + match tokio::time::timeout( + REDIS_OPERATION_TIMEOUT, + self.client.set::<(), _, _>( + full_key, + value, + Some(Expiration::EX(ttl_seconds)), + None, + false, + ), + ) + .await + { + Ok(Ok(_)) => { + debug!("Cached key {} with TTL {}s", key, ttl_seconds); + Ok(()) + } + Ok(Err(err)) => { + self.reconnect(); + Err(Error::RedisError { + cmd: "SET", + key, + err, + }) + } + Err(_) => { + self.reconnect(); + Err(Error::ConnectionFailed("Redis SET timed out")) + } + } + } + + fn is_enabled(&self) -> bool { + config().config.general.cache.enabled + } + + fn has_config_changed(&self, new_config: &CacheConfig) -> bool { + new_config.backend != CacheBackend::Redis + || self.config.redis.cmp(&new_config.redis).is_ne() + } +} + +// Avoid shallow copy +impl Clone for RedisCacheStorage { + fn clone(&self) -> Self { + Self { + client: self.client.clone_new(), + config: self.config.clone(), + reconnecting: Arc::new(AtomicBool::new(false)), + } + } +} diff --git a/pgdog/src/frontend/client/query_engine/context.rs b/pgdog/src/frontend/client/query_engine/context.rs index b54751a35..6a1fe3c38 100644 --- a/pgdog/src/frontend/client/query_engine/context.rs +++ b/pgdog/src/frontend/client/query_engine/context.rs @@ -1,6 +1,7 @@ use crate::{ backend::pool::{connection::mirror::Mirror, stats::MemoryStats}, frontend::{ + cache::context::CacheContext, client::{timeouts::Timeouts, Sticky, TransactionType}, router::parser::rewrite::statement::plan::RewriteResult, Client, ClientRequest, PreparedStatements, @@ -39,6 +40,8 @@ pub struct QueryEngineContext<'a> { pub(super) sticky: Sticky, /// Rewrite result. pub(super) rewrite_result: Option, + /// Cache context. + pub(super) cache_context: CacheContext, } impl<'a> QueryEngineContext<'a> { @@ -60,6 +63,7 @@ impl<'a> QueryEngineContext<'a> { rollback: false, sticky: client.sticky, rewrite_result: None, + cache_context: CacheContext::default(), } } @@ -86,6 +90,7 @@ impl<'a> QueryEngineContext<'a> { rollback: false, sticky: Sticky::new(), rewrite_result: None, + cache_context: CacheContext::default(), } } diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index f0dc8979b..9223f5b86 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -2,6 +2,7 @@ use crate::{ backend::pool::{Connection, Request}, config::config, frontend::{ + cache::cache, client::query_engine::{hooks::QueryEngineHooks, route_query::ClusterCheck}, router::{parser::Shard, Route}, BufferedQuery, Client, ClientComms, Command, Error, Router, RouterContext, Stats, @@ -129,6 +130,23 @@ impl QueryEngine { return Ok(()); } + let in_transaction = context.in_transaction(); + if let Some(cached_messages) = cache() + .try_read_cache( + &mut context.cache_context, + in_transaction, + context.client_request, + context.params, + ) + .await? + { + for msg in cached_messages { + self.process_server_message(context, msg).await?; + } + self.update_stats(context); + return Ok(()); + } + self.hooks.before_execution(context)?; // Queue up request to mirrors, if any. @@ -228,6 +246,10 @@ impl QueryEngine { command => self.unknown_command(context, command.clone()).await?, } + cache() + .save_response_in_cache(&mut context.cache_context) + .await; + self.hooks.after_execution(context)?; if context.in_error() { diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 231d936cd..0775b4682 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -120,6 +120,8 @@ impl QueryEngine { context: &mut QueryEngineContext<'_>, mut message: Message, ) -> Result<(), Error> { + context.cache_context.capture_response(message.clone()); + self.streaming = message.streaming(); let code = message.code(); diff --git a/pgdog/src/frontend/mod.rs b/pgdog/src/frontend/mod.rs index 284b777b0..aa1bbe523 100644 --- a/pgdog/src/frontend/mod.rs +++ b/pgdog/src/frontend/mod.rs @@ -1,6 +1,7 @@ //! pgDog frontend manages connections to clients. pub mod buffered_query; +pub mod cache; pub mod client; pub mod client_request; pub mod comms; diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index c34d865dc..855792f96 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -12,6 +12,7 @@ use super::super::{ }; use super::{Fingerprint, Stats}; use crate::backend::schema::Schema; +use crate::frontend::cache::policy::CacheDirective; use crate::frontend::router::parser::rewrite::statement::RewritePlan; use crate::frontend::{BufferedQuery, PreparedStatements}; use crate::net::parameter::ParameterValue; @@ -37,6 +38,8 @@ pub struct AstInner { pub comment_shard: Option, /// Role. pub comment_role: Option, + /// Cache. + pub comment_cache: Option, /// Rewrite plan. pub rewrite_plan: RewritePlan, /// Fingerprint. @@ -51,6 +54,7 @@ impl AstInner { stats: Mutex::new(Stats::new()), comment_role: None, comment_shard: None, + comment_cache: None, rewrite_plan: RewritePlan::default(), fingerprint: Fingerprint::default(), } @@ -81,7 +85,7 @@ impl Ast { QueryParserEngine::PgQueryRaw => parse_raw(query), } .map_err(Error::PgQuery)?; - let (comment_shard, comment_role) = comment(query, schema)?; + let (comment_shard, comment_role, comment_cache) = comment(query, schema)?; let fingerprint = Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; @@ -113,6 +117,7 @@ impl Ast { stats: Mutex::new(stats), comment_shard, comment_role, + comment_cache, ast, rewrite_plan, fingerprint, diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index a87883adb..29494c287 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -6,6 +6,7 @@ use regex::Regex; use crate::backend::ShardingSchema; use crate::config::database::Role; +use crate::frontend::cache::policy::CacheDirective; use crate::frontend::router::sharding::ContextBuilder; use super::super::parser::Shard; @@ -16,6 +17,9 @@ static SHARDING_KEY: Lazy = Lazy::new(|| { Regex::new(r#"pgdog_sharding_key: *(?:"([^"]*)"|'([^']*)'|([0-9a-zA-Z-]+))"#).unwrap() }); static ROLE: Lazy = Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap()); +static CACHE: Lazy = Lazy::new(|| { + Regex::new(r#"pgdog_cache: *(no_cache|force_cache(?:\s+ttl\s*=\s*([0-9]+))?|cache(?:\s+ttl\s*=\s*([0-9]+))?)?"#).unwrap() +}); fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { caps.get(1) @@ -24,23 +28,24 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { .map(|m| m.as_str()) } -/// Extract shard number from a comment. +/// Extract shard number, role and cache directive from a comment. /// /// Comment style uses the C-style comments (not SQL comments!) /// as to allow the comment to appear anywhere in the query. /// -/// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect. +/// See [`SHARD`], [`SHARDING_KEY`], [`ROLE`] and [`CACHE`] for the style of comment we expect. /// pub fn comment( query: &str, schema: &ShardingSchema, -) -> Result<(Option, Option), Error> { +) -> Result<(Option, Option, Option), Error> { let tokens = match schema.query_parser_engine { QueryParserEngine::PgQueryProtobuf => scan(query), QueryParserEngine::PgQueryRaw => scan_raw(query), } .map_err(Error::PgQuery)?; let mut role = None; + let mut cache = None; for token in tokens.tokens.iter() { if token.token == Token::CComment as i32 { @@ -54,15 +59,29 @@ pub fn comment( } } } + if let Some(cap) = CACHE.captures(comment) { + if let Some(action) = cap.get(1) { + let action = action.as_str(); + if action == "no_cache" { + cache = Some(CacheDirective::NoCache); + } else if action.starts_with("force_cache") { + let ttl = cap.get(2).and_then(|m| m.as_str().parse::().ok()); + cache = Some(CacheDirective::ForceCache { ttl_seconds: ttl }); + } else { + let ttl = cap.get(3).and_then(|m| m.as_str().parse::().ok()); + cache = Some(CacheDirective::Cache { ttl_seconds: ttl }); + } + } + } if let Some(cap) = SHARDING_KEY.captures(comment) { if let Some(sharding_key) = get_matched_value(&cap) { if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) { - return Ok((Some(schema.shard().into()), role)); + return Ok((Some(schema.shard().into()), role, cache)); } let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? .shards(schema.shards) .build()?; - return Ok((Some(ctx.apply()?), role)); + return Ok((Some(ctx.apply()?), role, cache)); } } if let Some(cap) = SHARD.captures(comment) { @@ -77,13 +96,14 @@ pub fn comment( .unwrap_or(Shard::All), ), role, + cache, )); } } } } - Ok((None, role)) + Ok((None, role, cache)) } #[cfg(test)] @@ -255,4 +275,132 @@ mod tests { let result = comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(1))); } + + #[test] + fn test_cache_hint_no_cache() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: no_cache */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!(result.2, Some(CacheDirective::NoCache))); + } + + #[test] + fn test_cache_hint_cache_default_ttl() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: cache */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::Cache { ttl_seconds: None }) + )); + } + + #[test] + fn test_cache_hint_cache_with_ttl() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: cache ttl=60 */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::Cache { + ttl_seconds: Some(60) + }) + )); + } + + #[test] + fn test_cache_hint_no_directive() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users"; + let result = comment(query, &schema).unwrap(); + assert!(matches!(result.2, None)); + } + + #[test] + fn test_combined_shard_and_cache_hints() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_role: replica pgdog_shard: 1 pgdog_cache: cache ttl=300 */"; + let result = comment(query, &schema).unwrap(); + assert_eq!(result.1, Some(Role::Replica)); + assert_eq!(result.0, Some(Shard::Direct(1))); + assert!(matches!( + result.2, + Some(CacheDirective::Cache { + ttl_seconds: Some(300) + }) + )); + } + + #[test] + fn test_cache_hint_force_cache() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: force_cache */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::ForceCache { ttl_seconds: None }) + )); + } + + #[test] + fn test_cache_hint_force_cache_with_ttl() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: force_cache ttl=60 */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::ForceCache { + ttl_seconds: Some(60) + }) + )); + } } diff --git a/pgdog/src/net/messages/hello.rs b/pgdog/src/net/messages/hello.rs index 84f901b06..5c989221b 100644 --- a/pgdog/src/net/messages/hello.rs +++ b/pgdog/src/net/messages/hello.rs @@ -60,7 +60,7 @@ impl Startup { } else if name == "options" { let kvs = value.split("-c"); for kv in kvs { - let mut nvs = kv.split("="); + let mut nvs = kv.splitn(2, "="); let name = nvs.next(); let value = nvs.next(); diff --git a/pgdog/src/net/parameter.rs b/pgdog/src/net/parameter.rs index 1502d0397..4dd0c6114 100644 --- a/pgdog/src/net/parameter.rs +++ b/pgdog/src/net/parameter.rs @@ -33,6 +33,7 @@ static UNTRACKED_PARAMS: Lazy> = Lazy::new(|| { String::from("pgdog.role"), String::from("pgdog.shard"), String::from("pgdog.sharding_key"), + String::from("pgdog.cache"), ]) });