Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pgdog/src/backend/replication/logical/publisher/slot.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::super::status::ReplicationSlot as ReplicationSlotTracker;
use super::super::Error;
use crate::config::config;
use crate::{
backend::{self, pool::Address, ConnectReason, Server, ServerOptions},
frontend::client::query_engine::two_pc::TwoPcTransactions,
Expand All @@ -9,6 +10,8 @@ use crate::{
},
util::random_string,
};

use pgdog_config::CopyFormat;
use std::{fmt::Display, str::FromStr, time::Duration};
use tokio::time::timeout;
use tracing::{debug, info, trace, warn};
Expand Down Expand Up @@ -293,10 +296,11 @@ impl ReplicationSlot {

/// Start replication.
pub async fn start_replication(&mut self) -> Result<(), Error> {
let is_binary = config().config.general.resharding_copy_format == CopyFormat::Binary;
// TODO: This is definitely Postgres version-specific.
let query = Query::new(format!(
r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '4', origin 'any', "publication_names" '"{}"')"#,
self.name, self.lsn, self.publication
r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '4', origin 'any', "publication_names" '"{}"', "binary" '{}')"#,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we're not passing user input, but seeing format! instead of prepared statements still makes me wince 😅

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logical replication protocol doesn't support prepared statements.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RIP

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it again I don't even know why my brain parsed this as a SQL query.
IgnoreMeTheVentureBrosGIF

self.name, self.lsn, self.publication, is_binary
));
self.server()?.send(&vec![query.into()].into()).await?;

Expand Down Expand Up @@ -528,7 +532,9 @@ mod test {
assert_eq!(relation.name, "test_slot_replication")
}
XLogPayload::Insert(insert) => {
assert_eq!(insert.column(0).unwrap().as_str().unwrap(), "1")
let col = insert.column(0).unwrap();
let id = i64::from_be_bytes(col.data[..].try_into().unwrap());
assert_eq!(id, 1);
}
XLogPayload::Begin(_) => (),
XLogPayload::Commit(_) => got_row = true,
Expand Down
24 changes: 24 additions & 0 deletions pgdog/src/backend/replication/logical/subscriber/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ mod test {
assert!(matches!(shard.shard(), Shard::Direct(_)));
}

#[test]
fn test_stream_context_binary_shard_key() {
// Binary-format shard key must flow through to_bind() and reach the router
// with Format::Binary preserved so sharding hashes the binary bigint correctly.
let cluster = Cluster::new_test(&config());
let id: i64 = 1;
let tuple = TupleData {
columns: vec![Column {
identifier: Identifier::Format(Format::Binary),
len: 8,
data: Bytes::copy_from_slice(&id.to_be_bytes()),
}],
};
let parse = Parse::new_anonymous("INSERT INTO sharded (id) VALUES ($1)");

let ctx = StreamContext::new(&cluster, &tuple, &parse).unwrap();
assert_eq!(ctx.bind().parameter_format(0).unwrap(), Format::Binary);
assert_eq!(
ctx.bind().parameter(0).unwrap().unwrap().data(),
&id.to_be_bytes()
);
assert!(matches!(ctx.shard(), Shard::Direct(_)));
}

// Verify that $N in the generated SQL matches the bind slot to_bind() places
// the corresponding value into, for each DML operation and tuple shape.

Expand Down
91 changes: 83 additions & 8 deletions pgdog/src/net/messages/replication/logical/tuple_data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::from_utf8;

use bytes::BytesMut;
use tracing::warn;

use crate::net::bind::Parameter;
use crate::net::Bind;
Expand Down Expand Up @@ -96,18 +97,22 @@ impl TupleData {
/// Used by [`Table`](crate::backend::replication::logical::publisher::Table) DML methods
/// — the `$N` they emit must agree with the column ordering of the tuple passed here.
pub fn to_bind(&self, name: &str) -> Bind {
let params = self
let (params, codes): (Vec<_>, Vec<_>) = self
.columns
.iter()
.map(|c| {
if c.identifier == Identifier::Null {
Parameter::new_null()
} else {
Parameter::new(&c.data)
.map(|c| match &c.identifier {
Identifier::Null => (Parameter::new_null(), Format::Text),
Identifier::Toasted => {
warn!(
"to_bind: toasted column reached Bind construction; \
caller should strip or fill toasted columns first — sending NULL"
);
(Parameter::new_null(), Format::Text)
}
Identifier::Format(fmt) => (Parameter::new(&c.data), *fmt),
})
.collect::<Vec<_>>();
Bind::new_params(name, &params)
.unzip();
Bind::new_params_codes(name, &params, &codes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely relevant to this PR since you didn't add this function, but this smells like we should use a builder

}

/// Does this tuple contain any unchanged-TOAST (`'u'`) column?
Expand Down Expand Up @@ -265,6 +270,15 @@ pub(crate) fn toasted_col() -> Column {
}
}

#[cfg(test)]
pub(crate) fn binary_col(data: &[u8]) -> Column {
Column {
identifier: Identifier::Format(Format::Binary),
len: data.len() as i32,
data: bytes::Bytes::copy_from_slice(data),
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -393,4 +407,65 @@ mod test {
"column count mismatch must return Err, not panic"
);
}

#[test]
fn to_bind_all_text_columns_produce_text_format_codes() {
let tuple = TupleData {
columns: vec![text_col("hello"), text_col("world")],
};
let bind = tuple.to_bind("__pgdog_1");
assert_eq!(bind.parameter_format(0).unwrap(), Format::Text);
assert_eq!(bind.parameter_format(1).unwrap(), Format::Text);
}

#[test]
fn to_bind_binary_columns_produce_binary_format_codes() {
// Simulate a bigint (8 bytes, big-endian) arriving as a binary column.
let val: i64 = 42;
let tuple = TupleData {
columns: vec![binary_col(&val.to_be_bytes())],
};
let bind = tuple.to_bind("__pgdog_1");
assert_eq!(bind.parameter_format(0).unwrap(), Format::Binary);
// Data must be forwarded verbatim — the destination decodes it as binary.
assert_eq!(
bind.parameter(0).unwrap().unwrap().data(),
&val.to_be_bytes()
);
}

#[test]
fn to_bind_null_and_toasted_use_text_format_code() {
let tuple = TupleData {
columns: vec![
Column {
identifier: Identifier::Null,
len: -1,
data: Bytes::new(),
},
toasted_col(),
],
};
let bind = tuple.to_bind("__pgdog_1");
// Format code is irrelevant for absent values, but must not be Binary
// to avoid confusing the destination server.
assert_eq!(bind.parameter_format(0).unwrap(), Format::Text);
assert_eq!(bind.parameter_format(1).unwrap(), Format::Text);
}

#[test]
fn to_bind_mixed_columns_produce_per_column_format_codes() {
let val: i32 = 99;
let tuple = TupleData {
columns: vec![
text_col("label"),
binary_col(&val.to_be_bytes()),
text_col("suffix"),
],
};
let bind = tuple.to_bind("__pgdog_1");
assert_eq!(bind.parameter_format(0).unwrap(), Format::Text);
assert_eq!(bind.parameter_format(1).unwrap(), Format::Binary);
assert_eq!(bind.parameter_format(2).unwrap(), Format::Text);
}
}
Loading