Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pgdog/src/admin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl AdminServer {
return Err(Error::SimpleOnly);
}

let query = Query::from_bytes(message.to_bytes()?)?;
let query = Query::from_bytes(message.to_bytes())?;

let messages = match Parser::parse(&query.query().to_lowercase()) {
Ok(command) => {
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/admin/show_mirrors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {
);

// Parse the RowDescription to check column names
let row_desc = RowDescription::from_bytes(messages[0].to_bytes().unwrap()).unwrap();
let row_desc = RowDescription::from_bytes(messages[0].to_bytes()).unwrap();
let fields = &row_desc.fields;

// Should have 7 columns for per-cluster stats
Expand Down Expand Up @@ -119,7 +119,7 @@ mod tests {
// If we have data rows, validate their format
if !data_rows.is_empty() {
// Parse the first data row to ensure it has valid format
let data_row = DataRow::from_bytes(data_rows[0].to_bytes().unwrap()).unwrap();
let data_row = DataRow::from_bytes(data_rows[0].to_bytes()).unwrap();

// Skip validating database and user strings for now since DataRow doesn't have get_string
// Just validate the counter values are integers (>= 0)
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/admin/show_query_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod test {
for message in show {
if message.code() == 'D' {
total += 1;
let data_row = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap();
let data_row = DataRow::from_bytes(message.to_bytes()).unwrap();
let hits = data_row.get_int(1, true).unwrap();
assert_eq!(hits, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/auth/scram/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ impl Server {
async fn read_password(stream: &mut Stream) -> Result<Option<Password>, Error> {
let message = stream.read().await?;
match message.code() {
'p' => Ok(Some(Password::from_bytes(message.to_bytes()?)?)),
'p' => Ok(Some(Password::from_bytes(message.to_bytes())?)),
'E' => {
let err = ErrorResponse::from_bytes(message.to_bytes()?)?;
let err = ErrorResponse::from_bytes(message.to_bytes())?;
error!("{}", err);
Ok(None)
}
Expand Down
18 changes: 9 additions & 9 deletions pgdog/src/backend/pool/connection/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(super) struct Buffer {
impl Buffer {
/// Add message to buffer.
pub(super) fn add(&mut self, message: Message) -> Result<(), super::Error> {
let dr = DataRow::from_bytes(message.to_bytes()?)?;
let dr = DataRow::from_bytes(message.to_bytes())?;

self.buffer.push_back(dr);

Expand Down Expand Up @@ -264,7 +264,7 @@ mod test {

let mut i = 1;
while let Some(message) = buf.take() {
let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(message.to_bytes()).unwrap();
let one = dr.get::<i64>(0, Format::Text).unwrap();
let two = dr.get::<String>(1, Format::Text).unwrap();
assert_eq!(one, i);
Expand Down Expand Up @@ -293,7 +293,7 @@ mod test {

assert_eq!(buf.len(), 1);
let row = buf.take().unwrap();
let dr = DataRow::from_bytes(row.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(row.to_bytes()).unwrap();
let count = dr.get::<i64>(0, Format::Text).unwrap();
assert_eq!(count, 15 * 6);
}
Expand Down Expand Up @@ -321,7 +321,7 @@ mod test {
assert_eq!(buf.len(), 2);
for _ in &emails {
let row = buf.take().unwrap();
let dr = DataRow::from_bytes(row.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(row.to_bytes()).unwrap();
let count = dr.get::<i64>(0, Format::Text).unwrap();
assert_eq!(count, 15 * 6);
}
Expand Down Expand Up @@ -364,7 +364,7 @@ mod test {

for expected in expected_order {
let message = buf.take().expect("Should have message");
let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(message.to_bytes()).unwrap();
let ts = dr.get::<String>(0, Format::Text).unwrap();
assert_eq!(ts, expected);
}
Expand Down Expand Up @@ -400,7 +400,7 @@ mod test {

for expected in expected_order {
let message = buf.take().expect("Should have message");
let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(message.to_bytes()).unwrap();
let price = dr.get::<String>(0, Format::Text).unwrap();
assert_eq!(price, expected);
}
Expand Down Expand Up @@ -454,7 +454,7 @@ mod test {

for expected in expected_order {
let message = buf.take().expect("Should have message");
let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(message.to_bytes()).unwrap();
// Get the numeric value and convert to string for comparison
let column = dr.get_column(0, &decoder).unwrap().unwrap();
if let Datum::Numeric(numeric) = column.value {
Expand Down Expand Up @@ -506,7 +506,7 @@ mod test {

for expected in expected_order {
let message = buf.take().expect("Should have message");
let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(message.to_bytes()).unwrap();
let value = dr.get::<String>(0, Format::Text).unwrap();
assert_eq!(value, expected);
}
Expand Down Expand Up @@ -547,7 +547,7 @@ mod test {
b.full();
assert_eq!(b.len(), 4);
for expected in 2..6_i64 {
let dr = DataRow::from_bytes(b.take().unwrap().to_bytes().unwrap()).unwrap();
let dr = DataRow::from_bytes(b.take().unwrap().to_bytes()).unwrap();
assert_eq!(dr.get::<i64>(0, Format::Text).unwrap(), expected);
}

Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/backend/pool/connection/multi_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl MultiShard {
// Once all shards finished executing the command,
// we can start aggregating and sorting.
'C' => {
let cc = CommandComplete::from_bytes(message.to_bytes()?)?;
let cc = CommandComplete::from_bytes(message.to_bytes())?;
let has_rows = if let Some(rows) = cc.rows()? {
if self.route.is_omni() {
// Only use the first shard's row count for consistency with DataRow.
Expand Down Expand Up @@ -192,7 +192,7 @@ impl MultiShard {

'T' => {
self.counters.row_description += 1;
let rd = RowDescription::from_bytes(message.to_bytes()?)?;
let rd = RowDescription::from_bytes(message.to_bytes())?;

// Validate row description consistency
let is_first = self.validator.validate_row_description(&rd)?;
Expand Down Expand Up @@ -230,7 +230,7 @@ impl MultiShard {
'D' => {
if self.shards > 1 {
// Validate data row consistency.
let data_row = DataRow::from_bytes(message.to_bytes()?)?;
let data_row = DataRow::from_bytes(message.to_bytes())?;
self.validator.validate_data_row(&data_row)?;
}

Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/backend/pool/connection/multi_shard/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ fn test_ready_for_query_error_preservation() {
// Should return the error message, not the normal one
assert!(result.is_some());
let returned_message = result.unwrap();
let returned_rfq = ReadyForQuery::from_bytes(returned_message.to_bytes().unwrap()).unwrap();
let returned_rfq = ReadyForQuery::from_bytes(returned_message.to_bytes()).unwrap();
assert!(returned_rfq.is_transaction_aborted());
}

Expand Down Expand Up @@ -184,7 +184,7 @@ fn test_omni_command_complete_not_summed() {
.unwrap();

let result = multi_shard.message();
let cc = CommandComplete::from_bytes(result.unwrap().to_bytes().unwrap()).unwrap();
let cc = CommandComplete::from_bytes(result.unwrap().to_bytes()).unwrap();
// Should be 5 (from one shard), not 15 (sum of all shards)
assert_eq!(cc.rows().unwrap(), Some(5));
}
Expand Down Expand Up @@ -219,7 +219,7 @@ fn test_omni_command_complete_uses_first_shard_row_count() {
.unwrap();

let result = multi_shard.message();
let cc = CommandComplete::from_bytes(result.unwrap().to_bytes().unwrap()).unwrap();
let cc = CommandComplete::from_bytes(result.unwrap().to_bytes()).unwrap();
// Should be 7 (from FIRST shard), not 9 (from last)
assert_eq!(cc.rows().unwrap(), Some(7));
}
Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/backend/prepared_statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl PreparedStatements {
if let Some(describe) = self.describes.pop_front() {
self.add_row_description(
&describe,
&RowDescription::from_bytes(message.to_bytes()?)?,
&RowDescription::from_bytes(message.to_bytes())?,
);
};
}
Expand Down Expand Up @@ -709,7 +709,7 @@ mod test {
// Simulate a ReadyForQuery message.
// First we need to add a 'Z' to the state so we can action it.
ps.state.add('Z');
let rfq = Message::new(ReadyForQuery::idle().to_bytes().unwrap());
let rfq = Message::new(ReadyForQuery::idle().to_bytes());
ps.forward(&rfq).unwrap();

// In extended_anonymous mode, cache should be cleared after done.
Expand All @@ -724,7 +724,7 @@ mod test {
assert_eq!(ps.len(), 2);

ps.state.add('Z');
let rfq = Message::new(ReadyForQuery::idle().to_bytes().unwrap());
let rfq = Message::new(ReadyForQuery::idle().to_bytes());
ps.forward(&rfq).unwrap();

// In extended mode, cache should be preserved.
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/pub_sub/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl PubSubListener {

// NotificationResponse (B)
if message.code() == 'A' {
let notification = NotificationResponse::from_bytes(message.to_bytes()?)?;
let notification = NotificationResponse::from_bytes(message.to_bytes())?;
let mut unsub = None;
if let Some(channel) = channels.lock().get(notification.channel()) {
match channel.send(notification) {
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/replication/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Buffer {
/// like Insert/Update/Delete that don't belong to the shard.
pub fn handle(&mut self, message: Message) -> Result<(), Error> {
let data = match message.code() {
'd' => CopyData::from_bytes(message.to_bytes()?)?,
'd' => CopyData::from_bytes(message.to_bytes())?,
_ => {
self.buffer.push_back(message);
return Ok(());
Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/backend/replication/logical/publisher/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Copy {
server.send(&vec![query.into()].into()).await?;
let result = server.read().await?;
match result.code() {
'E' => return Err(ErrorResponse::from_bytes(result.to_bytes()?)?.into()),
'E' => return Err(ErrorResponse::from_bytes(result.to_bytes())?.into()),
'H' => (),
c => return Err(Error::OutOfSync(c)),
}
Expand All @@ -54,14 +54,14 @@ impl Copy {

match msg.code() {
'd' => {
let data = CopyData::from_bytes(msg.to_bytes()?)?;
let data = CopyData::from_bytes(msg.to_bytes())?;
trace!("[{}] --> {:?}", server.addr().addr().await?, data);
return Ok(Some(data));
}
'C' => (),
'c' => (), // CopyDone.
'Z' => return Ok(None),
'E' => return Err(ErrorResponse::from_bytes(msg.to_bytes()?)?.into()),
'E' => return Err(ErrorResponse::from_bytes(msg.to_bytes())?.into()),
c => return Err(Error::OutOfSync(c)),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,9 @@ mod test {
commit_timestamp: 0,
xid: 1,
}
.to_bytes()
.unwrap(),
.to_bytes(),
};
CopyData::bytes(xlog.to_bytes().unwrap())
CopyData::bytes(xlog.to_bytes())
}
fn commit_copy_data(lsn: i64) -> CopyData {
let xlog = XLogData {
Expand All @@ -714,10 +713,9 @@ mod test {
end_lsn: lsn,
commit_timestamp: 0,
}
.to_bytes()
.unwrap(),
.to_bytes(),
};
CopyData::bytes(xlog.to_bytes().unwrap())
CopyData::bytes(xlog.to_bytes())
}

// -- handle ---------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/backend/replication/logical/publisher/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl ReplicationSlot {
let copy_both = self.server()?.read().await?;

match copy_both.code() {
'E' => return Err(ErrorResponse::from_bytes(copy_both.to_bytes()?)?.into()),
'E' => return Err(ErrorResponse::from_bytes(copy_both.to_bytes())?.into()),
'W' => (),
c => return Err(Error::OutOfSync(c)),
}
Expand All @@ -329,7 +329,7 @@ impl ReplicationSlot {

match message.code() {
'd' => {
let copy_data = CopyData::from_bytes(message.to_bytes()?)?;
let copy_data = CopyData::from_bytes(message.to_bytes())?;
trace!("{:?} [{}]", copy_data, self.address);

return Ok(Some(ReplicationData::CopyData(copy_data)));
Expand All @@ -341,7 +341,7 @@ impl ReplicationSlot {
return Ok(None);
}
'E' => {
let error = ErrorResponse::from_bytes(message.to_bytes()?)?;
let error = ErrorResponse::from_bytes(message.to_bytes())?;
if let Some(ref tracker) = self.tracker {
tracker.error(&error);
}
Expand Down
10 changes: 5 additions & 5 deletions pgdog/src/backend/replication/logical/subscriber/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl CopySubscriber {
'G' => (),
'E' => {
return Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
msg.to_bytes()?,
msg.to_bytes(),
)?)))
}
c => return Err(Error::OutOfSync(c)),
Expand All @@ -142,7 +142,7 @@ impl CopySubscriber {
let command_complete = server.read().await?;
match command_complete.code() {
'E' => {
let error = ErrorResponse::from_bytes(command_complete.to_bytes()?)?;
let error = ErrorResponse::from_bytes(command_complete.to_bytes())?;
if error.code == "08P01" && error.message == "insufficient data left in message"
{
return Err(Error::BinaryFormatMismatch(Box::new(error)));
Expand Down Expand Up @@ -254,21 +254,21 @@ mod test {
.unwrap();
subscriber.start_copy().await.unwrap();

let header = CopyData::new(&Header::new().to_bytes().unwrap());
let header = CopyData::new(&Header::new().to_bytes());
subscriber.copy_data(header).await.unwrap();

for i in 0..25_i64 {
let id = Data::Column(Bytes::copy_from_slice(&i.to_be_bytes()));
let email = Data::Column(Bytes::copy_from_slice("test@test.com".as_bytes()));
let tuple = Tuple::new(&[id, email]);
subscriber
.copy_data(CopyData::new(&tuple.to_bytes().unwrap()))
.copy_data(CopyData::new(&tuple.to_bytes()))
.await
.unwrap();
}

subscriber
.copy_data(CopyData::new(&Tuple::new_end().to_bytes().unwrap()))
.copy_data(CopyData::new(&Tuple::new_end().to_bytes()))
.await
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions pgdog/src/backend/replication/logical/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl StreamSubscriber {
'1' | 'C' | 'Z' => (),
'E' => {
return Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
msg.to_bytes()?,
msg.to_bytes(),
)?)))
}
c => return Err(Error::OutOfSync(c)),
Expand Down Expand Up @@ -284,7 +284,7 @@ impl StreamSubscriber {
'2' => (),
'E' => {
return Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
msg.to_bytes()?,
msg.to_bytes(),
)?)))
}
c => return Err(Error::SendOutOfSync(c)),
Expand Down Expand Up @@ -481,7 +481,7 @@ impl StreamSubscriber {
match msg.code() {
'E' => {
return Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
msg.to_bytes()?,
msg.to_bytes(),
)?)))
}
'Z' => break,
Expand Down Expand Up @@ -697,7 +697,7 @@ impl StreamSubscriber {
match msg.code() {
'E' => {
return Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
msg.to_bytes()?,
msg.to_bytes(),
)?)))
}
'Z' => (),
Expand Down
Loading
Loading