From 8b9ef479e7937acb3bb3b28a6b544f92c23470a0 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Thu, 28 May 2026 13:52:51 -0600 Subject: [PATCH] Remove useless Result Our `ToBytes` trait returns a `Result` instead of just `Bytes`. This is a reasonable forward looking decision, except not a single impl anywhere in the code base ever returns an error. And it's a good thing they don't, because the vast majority of the code which calls this method doesn't handle the error. It just unwraps. If it weren't for the dozens of unwraps everywhere, I would actually be in favor of leaving the Result there if we thought we might conceivably return an error in the future. But since we're not handling the errors, better to just get rid of it entirely. --- pgdog/src/admin/server.rs | 2 +- pgdog/src/admin/show_mirrors.rs | 4 +- pgdog/src/admin/show_query_cache.rs | 2 +- pgdog/src/auth/scram/server.rs | 4 +- pgdog/src/backend/pool/connection/buffer.rs | 18 +++--- .../pool/connection/multi_shard/mod.rs | 6 +- .../pool/connection/multi_shard/test.rs | 6 +- pgdog/src/backend/prepared_statements.rs | 6 +- pgdog/src/backend/pub_sub/listener.rs | 2 +- pgdog/src/backend/replication/buffer.rs | 2 +- .../replication/logical/publisher/copy.rs | 6 +- .../logical/publisher/publisher_impl.rs | 10 ++-- .../replication/logical/publisher/slot.rs | 6 +- .../replication/logical/subscriber/copy.rs | 10 ++-- .../replication/logical/subscriber/stream.rs | 8 +-- .../replication/logical/subscriber/tests.rs | 57 ++++++++----------- pgdog/src/backend/server.rs | 47 ++++++++------- pgdog/src/frontend/client/mod.rs | 8 +-- .../client/query_engine/multi_step/state.rs | 2 +- .../query_engine/multi_step/test/mod.rs | 4 +- .../query_engine/multi_step/test/prepared.rs | 22 +++---- .../query_engine/multi_step/test/simple.rs | 6 +- .../src/frontend/client/query_engine/query.rs | 2 +- .../client/query_engine/test/replicas.rs | 10 ++-- pgdog/src/frontend/client/test/mod.rs | 30 +++++----- pgdog/src/frontend/client/test/test_client.rs | 2 +- .../frontend/prepared_statements/rewrite.rs | 8 +-- .../frontend/router/parser/binary/header.rs | 4 +- .../frontend/router/parser/binary/tuple.rs | 4 +- pgdog/src/frontend/router/parser/copy.rs | 4 +- pgdog/src/net/messages/auth/mod.rs | 14 ++--- pgdog/src/net/messages/auth/password.rs | 4 +- pgdog/src/net/messages/backend_key.rs | 8 +-- pgdog/src/net/messages/bind.rs | 16 +++--- pgdog/src/net/messages/bind_complete.rs | 4 +- pgdog/src/net/messages/buffer.rs | 16 ++---- pgdog/src/net/messages/close.rs | 6 +- pgdog/src/net/messages/close_complete.rs | 4 +- pgdog/src/net/messages/command_complete.rs | 4 +- pgdog/src/net/messages/copy_data.rs | 4 +- pgdog/src/net/messages/copy_done.rs | 4 +- pgdog/src/net/messages/copy_fail.rs | 4 +- pgdog/src/net/messages/data_row.rs | 6 +- pgdog/src/net/messages/describe.rs | 12 ++-- .../src/net/messages/empty_query_response.rs | 4 +- pgdog/src/net/messages/error_response.rs | 4 +- pgdog/src/net/messages/execute.rs | 4 +- pgdog/src/net/messages/fastpath.rs | 6 +- pgdog/src/net/messages/flush.rs | 4 +- pgdog/src/net/messages/hello.rs | 26 ++++----- pgdog/src/net/messages/mod.rs | 10 ++-- .../messages/negotiate_protocol_version.rs | 6 +- pgdog/src/net/messages/no_data.rs | 4 +- pgdog/src/net/messages/notice_response.rs | 6 +- .../src/net/messages/notification_response.rs | 4 +- .../src/net/messages/parameter_description.rs | 8 +-- pgdog/src/net/messages/parameter_status.rs | 8 +-- pgdog/src/net/messages/parse.rs | 10 ++-- pgdog/src/net/messages/parse_complete.rs | 4 +- pgdog/src/net/messages/payload.rs | 6 +- pgdog/src/net/messages/query.rs | 6 +- .../replication/hot_standby_feedback.rs | 6 +- .../net/messages/replication/keep_alive.rs | 8 +-- .../net/messages/replication/logical/begin.rs | 4 +- .../messages/replication/logical/commit.rs | 4 +- .../messages/replication/logical/delete.rs | 8 +-- .../messages/replication/logical/insert.rs | 6 +- .../messages/replication/logical/relation.rs | 4 +- .../replication/logical/stream_start.rs | 4 +- .../replication/logical/tuple_data.rs | 4 +- .../messages/replication/logical/update.rs | 12 ++-- pgdog/src/net/messages/replication/mod.rs | 4 +- .../net/messages/replication/status_update.rs | 8 +-- .../src/net/messages/replication/xlog_data.rs | 8 +-- pgdog/src/net/messages/rfq.rs | 4 +- pgdog/src/net/messages/row_description.rs | 4 +- pgdog/src/net/messages/sync.rs | 6 +- pgdog/src/net/messages/terminate.rs | 4 +- pgdog/src/net/parameter.rs | 8 +-- pgdog/src/net/protocol_message.rs | 2 +- pgdog/src/net/stream.rs | 2 +- 81 files changed, 304 insertions(+), 334 deletions(-) diff --git a/pgdog/src/admin/server.rs b/pgdog/src/admin/server.rs index a6c994200..846e4cb88 100644 --- a/pgdog/src/admin/server.rs +++ b/pgdog/src/admin/server.rs @@ -46,7 +46,7 @@ impl AdminServer { return Err(Error::SimpleOnly); } - let query = Query::from_bytes(message.to_bytes()?)?; + let query = Query::from_bytes(message.to_bytes())?; let messages = match Parser::parse(&query.query().to_lowercase()) { Ok(command) => { diff --git a/pgdog/src/admin/show_mirrors.rs b/pgdog/src/admin/show_mirrors.rs index 896e62e61..d10fa63b7 100644 --- a/pgdog/src/admin/show_mirrors.rs +++ b/pgdog/src/admin/show_mirrors.rs @@ -85,7 +85,7 @@ mod tests { ); // Parse the RowDescription to check column names - let row_desc = RowDescription::from_bytes(messages[0].to_bytes().unwrap()).unwrap(); + let row_desc = RowDescription::from_bytes(messages[0].to_bytes()).unwrap(); let fields = &row_desc.fields; // Should have 7 columns for per-cluster stats @@ -119,7 +119,7 @@ mod tests { // If we have data rows, validate their format if !data_rows.is_empty() { // Parse the first data row to ensure it has valid format - let data_row = DataRow::from_bytes(data_rows[0].to_bytes().unwrap()).unwrap(); + let data_row = DataRow::from_bytes(data_rows[0].to_bytes()).unwrap(); // Skip validating database and user strings for now since DataRow doesn't have get_string // Just validate the counter values are integers (>= 0) diff --git a/pgdog/src/admin/show_query_cache.rs b/pgdog/src/admin/show_query_cache.rs index 1f0605863..c7b5ddf45 100644 --- a/pgdog/src/admin/show_query_cache.rs +++ b/pgdog/src/admin/show_query_cache.rs @@ -94,7 +94,7 @@ mod test { for message in show { if message.code() == 'D' { total += 1; - let data_row = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap(); + let data_row = DataRow::from_bytes(message.to_bytes()).unwrap(); let hits = data_row.get_int(1, true).unwrap(); assert_eq!(hits, 1); } diff --git a/pgdog/src/auth/scram/server.rs b/pgdog/src/auth/scram/server.rs index c11d8d9d2..ad776c20b 100644 --- a/pgdog/src/auth/scram/server.rs +++ b/pgdog/src/auth/scram/server.rs @@ -144,9 +144,9 @@ impl Server { async fn read_password(stream: &mut Stream) -> Result, Error> { let message = stream.read().await?; match message.code() { - 'p' => Ok(Some(Password::from_bytes(message.to_bytes()?)?)), + 'p' => Ok(Some(Password::from_bytes(message.to_bytes())?)), 'E' => { - let err = ErrorResponse::from_bytes(message.to_bytes()?)?; + let err = ErrorResponse::from_bytes(message.to_bytes())?; error!("{}", err); Ok(None) } diff --git a/pgdog/src/backend/pool/connection/buffer.rs b/pgdog/src/backend/pool/connection/buffer.rs index c5f6c3527..da8853f93 100644 --- a/pgdog/src/backend/pool/connection/buffer.rs +++ b/pgdog/src/backend/pool/connection/buffer.rs @@ -29,7 +29,7 @@ pub(super) struct Buffer { impl Buffer { /// Add message to buffer. pub(super) fn add(&mut self, message: Message) -> Result<(), super::Error> { - let dr = DataRow::from_bytes(message.to_bytes()?)?; + let dr = DataRow::from_bytes(message.to_bytes())?; self.buffer.push_back(dr); @@ -264,7 +264,7 @@ mod test { let mut i = 1; while let Some(message) = buf.take() { - let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(message.to_bytes()).unwrap(); let one = dr.get::(0, Format::Text).unwrap(); let two = dr.get::(1, Format::Text).unwrap(); assert_eq!(one, i); @@ -293,7 +293,7 @@ mod test { assert_eq!(buf.len(), 1); let row = buf.take().unwrap(); - let dr = DataRow::from_bytes(row.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(row.to_bytes()).unwrap(); let count = dr.get::(0, Format::Text).unwrap(); assert_eq!(count, 15 * 6); } @@ -321,7 +321,7 @@ mod test { assert_eq!(buf.len(), 2); for _ in &emails { let row = buf.take().unwrap(); - let dr = DataRow::from_bytes(row.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(row.to_bytes()).unwrap(); let count = dr.get::(0, Format::Text).unwrap(); assert_eq!(count, 15 * 6); } @@ -364,7 +364,7 @@ mod test { for expected in expected_order { let message = buf.take().expect("Should have message"); - let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(message.to_bytes()).unwrap(); let ts = dr.get::(0, Format::Text).unwrap(); assert_eq!(ts, expected); } @@ -400,7 +400,7 @@ mod test { for expected in expected_order { let message = buf.take().expect("Should have message"); - let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(message.to_bytes()).unwrap(); let price = dr.get::(0, Format::Text).unwrap(); assert_eq!(price, expected); } @@ -454,7 +454,7 @@ mod test { for expected in expected_order { let message = buf.take().expect("Should have message"); - let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(message.to_bytes()).unwrap(); // Get the numeric value and convert to string for comparison let column = dr.get_column(0, &decoder).unwrap().unwrap(); if let Datum::Numeric(numeric) = column.value { @@ -506,7 +506,7 @@ mod test { for expected in expected_order { let message = buf.take().expect("Should have message"); - let dr = DataRow::from_bytes(message.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(message.to_bytes()).unwrap(); let value = dr.get::(0, Format::Text).unwrap(); assert_eq!(value, expected); } @@ -547,7 +547,7 @@ mod test { b.full(); assert_eq!(b.len(), 4); for expected in 2..6_i64 { - let dr = DataRow::from_bytes(b.take().unwrap().to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(b.take().unwrap().to_bytes()).unwrap(); assert_eq!(dr.get::(0, Format::Text).unwrap(), expected); } diff --git a/pgdog/src/backend/pool/connection/multi_shard/mod.rs b/pgdog/src/backend/pool/connection/multi_shard/mod.rs index 31f9073fc..b97e847fc 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/mod.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/mod.rs @@ -140,7 +140,7 @@ impl MultiShard { // Once all shards finished executing the command, // we can start aggregating and sorting. 'C' => { - let cc = CommandComplete::from_bytes(message.to_bytes()?)?; + let cc = CommandComplete::from_bytes(message.to_bytes())?; let has_rows = if let Some(rows) = cc.rows()? { if self.route.is_omni() { // Only use the first shard's row count for consistency with DataRow. @@ -192,7 +192,7 @@ impl MultiShard { 'T' => { self.counters.row_description += 1; - let rd = RowDescription::from_bytes(message.to_bytes()?)?; + let rd = RowDescription::from_bytes(message.to_bytes())?; // Validate row description consistency let is_first = self.validator.validate_row_description(&rd)?; @@ -230,7 +230,7 @@ impl MultiShard { 'D' => { if self.shards > 1 { // Validate data row consistency. - let data_row = DataRow::from_bytes(message.to_bytes()?)?; + let data_row = DataRow::from_bytes(message.to_bytes())?; self.validator.validate_data_row(&data_row)?; } diff --git a/pgdog/src/backend/pool/connection/multi_shard/test.rs b/pgdog/src/backend/pool/connection/multi_shard/test.rs index 98b4532ff..b65792a2d 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/test.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/test.rs @@ -143,7 +143,7 @@ fn test_ready_for_query_error_preservation() { // Should return the error message, not the normal one assert!(result.is_some()); let returned_message = result.unwrap(); - let returned_rfq = ReadyForQuery::from_bytes(returned_message.to_bytes().unwrap()).unwrap(); + let returned_rfq = ReadyForQuery::from_bytes(returned_message.to_bytes()).unwrap(); assert!(returned_rfq.is_transaction_aborted()); } @@ -184,7 +184,7 @@ fn test_omni_command_complete_not_summed() { .unwrap(); let result = multi_shard.message(); - let cc = CommandComplete::from_bytes(result.unwrap().to_bytes().unwrap()).unwrap(); + let cc = CommandComplete::from_bytes(result.unwrap().to_bytes()).unwrap(); // Should be 5 (from one shard), not 15 (sum of all shards) assert_eq!(cc.rows().unwrap(), Some(5)); } @@ -219,7 +219,7 @@ fn test_omni_command_complete_uses_first_shard_row_count() { .unwrap(); let result = multi_shard.message(); - let cc = CommandComplete::from_bytes(result.unwrap().to_bytes().unwrap()).unwrap(); + let cc = CommandComplete::from_bytes(result.unwrap().to_bytes()).unwrap(); // Should be 7 (from FIRST shard), not 9 (from last) assert_eq!(cc.rows().unwrap(), Some(7)); } diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index feaf0311f..31015902e 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -290,7 +290,7 @@ impl PreparedStatements { if let Some(describe) = self.describes.pop_front() { self.add_row_description( &describe, - &RowDescription::from_bytes(message.to_bytes()?)?, + &RowDescription::from_bytes(message.to_bytes())?, ); }; } @@ -709,7 +709,7 @@ mod test { // Simulate a ReadyForQuery message. // First we need to add a 'Z' to the state so we can action it. ps.state.add('Z'); - let rfq = Message::new(ReadyForQuery::idle().to_bytes().unwrap()); + let rfq = Message::new(ReadyForQuery::idle().to_bytes()); ps.forward(&rfq).unwrap(); // In extended_anonymous mode, cache should be cleared after done. @@ -724,7 +724,7 @@ mod test { assert_eq!(ps.len(), 2); ps.state.add('Z'); - let rfq = Message::new(ReadyForQuery::idle().to_bytes().unwrap()); + let rfq = Message::new(ReadyForQuery::idle().to_bytes()); ps.forward(&rfq).unwrap(); // In extended mode, cache should be preserved. diff --git a/pgdog/src/backend/pub_sub/listener.rs b/pgdog/src/backend/pub_sub/listener.rs index 7bb5de35b..1e2559a32 100644 --- a/pgdog/src/backend/pub_sub/listener.rs +++ b/pgdog/src/backend/pub_sub/listener.rs @@ -192,7 +192,7 @@ impl PubSubListener { // NotificationResponse (B) if message.code() == 'A' { - let notification = NotificationResponse::from_bytes(message.to_bytes()?)?; + let notification = NotificationResponse::from_bytes(message.to_bytes())?; let mut unsub = None; if let Some(channel) = channels.lock().get(notification.channel()) { match channel.send(notification) { diff --git a/pgdog/src/backend/replication/buffer.rs b/pgdog/src/backend/replication/buffer.rs index e525df76c..b9c3c4841 100644 --- a/pgdog/src/backend/replication/buffer.rs +++ b/pgdog/src/backend/replication/buffer.rs @@ -57,7 +57,7 @@ impl Buffer { /// like Insert/Update/Delete that don't belong to the shard. pub fn handle(&mut self, message: Message) -> Result<(), Error> { let data = match message.code() { - 'd' => CopyData::from_bytes(message.to_bytes()?)?, + 'd' => CopyData::from_bytes(message.to_bytes())?, _ => { self.buffer.push_back(message); return Ok(()); diff --git a/pgdog/src/backend/replication/logical/publisher/copy.rs b/pgdog/src/backend/replication/logical/publisher/copy.rs index 4277eb303..ac4222e3f 100644 --- a/pgdog/src/backend/replication/logical/publisher/copy.rs +++ b/pgdog/src/backend/replication/logical/publisher/copy.rs @@ -41,7 +41,7 @@ impl Copy { server.send(&vec![query.into()].into()).await?; let result = server.read().await?; match result.code() { - 'E' => return Err(ErrorResponse::from_bytes(result.to_bytes()?)?.into()), + 'E' => return Err(ErrorResponse::from_bytes(result.to_bytes())?.into()), 'H' => (), c => return Err(Error::OutOfSync(c)), } @@ -54,14 +54,14 @@ impl Copy { match msg.code() { 'd' => { - let data = CopyData::from_bytes(msg.to_bytes()?)?; + let data = CopyData::from_bytes(msg.to_bytes())?; trace!("[{}] --> {:?}", server.addr().addr().await?, data); return Ok(Some(data)); } 'C' => (), 'c' => (), // CopyDone. 'Z' => return Ok(None), - 'E' => return Err(ErrorResponse::from_bytes(msg.to_bytes()?)?.into()), + 'E' => return Err(ErrorResponse::from_bytes(msg.to_bytes())?.into()), c => return Err(Error::OutOfSync(c)), } } diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 56f43a6c0..8130fac2c 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -698,10 +698,9 @@ mod test { commit_timestamp: 0, xid: 1, } - .to_bytes() - .unwrap(), + .to_bytes(), }; - CopyData::bytes(xlog.to_bytes().unwrap()) + CopyData::bytes(xlog.to_bytes()) } fn commit_copy_data(lsn: i64) -> CopyData { let xlog = XLogData { @@ -714,10 +713,9 @@ mod test { end_lsn: lsn, commit_timestamp: 0, } - .to_bytes() - .unwrap(), + .to_bytes(), }; - CopyData::bytes(xlog.to_bytes().unwrap()) + CopyData::bytes(xlog.to_bytes()) } // -- handle --------------------------------------------------------------- diff --git a/pgdog/src/backend/replication/logical/publisher/slot.rs b/pgdog/src/backend/replication/logical/publisher/slot.rs index dbc1997c1..f74e40e6a 100644 --- a/pgdog/src/backend/replication/logical/publisher/slot.rs +++ b/pgdog/src/backend/replication/logical/publisher/slot.rs @@ -303,7 +303,7 @@ impl ReplicationSlot { let copy_both = self.server()?.read().await?; match copy_both.code() { - 'E' => return Err(ErrorResponse::from_bytes(copy_both.to_bytes()?)?.into()), + 'E' => return Err(ErrorResponse::from_bytes(copy_both.to_bytes())?.into()), 'W' => (), c => return Err(Error::OutOfSync(c)), } @@ -329,7 +329,7 @@ impl ReplicationSlot { match message.code() { 'd' => { - let copy_data = CopyData::from_bytes(message.to_bytes()?)?; + let copy_data = CopyData::from_bytes(message.to_bytes())?; trace!("{:?} [{}]", copy_data, self.address); return Ok(Some(ReplicationData::CopyData(copy_data))); @@ -341,7 +341,7 @@ impl ReplicationSlot { return Ok(None); } 'E' => { - let error = ErrorResponse::from_bytes(message.to_bytes()?)?; + let error = ErrorResponse::from_bytes(message.to_bytes())?; if let Some(ref tracker) = self.tracker { tracker.error(&error); } diff --git a/pgdog/src/backend/replication/logical/subscriber/copy.rs b/pgdog/src/backend/replication/logical/subscriber/copy.rs index 262db9d58..67c0bfc7a 100644 --- a/pgdog/src/backend/replication/logical/subscriber/copy.rs +++ b/pgdog/src/backend/replication/logical/subscriber/copy.rs @@ -121,7 +121,7 @@ impl CopySubscriber { 'G' => (), 'E' => { return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, + msg.to_bytes(), )?))) } c => return Err(Error::OutOfSync(c)), @@ -142,7 +142,7 @@ impl CopySubscriber { let command_complete = server.read().await?; match command_complete.code() { 'E' => { - let error = ErrorResponse::from_bytes(command_complete.to_bytes()?)?; + let error = ErrorResponse::from_bytes(command_complete.to_bytes())?; if error.code == "08P01" && error.message == "insufficient data left in message" { return Err(Error::BinaryFormatMismatch(Box::new(error))); @@ -254,7 +254,7 @@ mod test { .unwrap(); subscriber.start_copy().await.unwrap(); - let header = CopyData::new(&Header::new().to_bytes().unwrap()); + let header = CopyData::new(&Header::new().to_bytes()); subscriber.copy_data(header).await.unwrap(); for i in 0..25_i64 { @@ -262,13 +262,13 @@ mod test { let email = Data::Column(Bytes::copy_from_slice("test@test.com".as_bytes())); let tuple = Tuple::new(&[id, email]); subscriber - .copy_data(CopyData::new(&tuple.to_bytes().unwrap())) + .copy_data(CopyData::new(&tuple.to_bytes())) .await .unwrap(); } subscriber - .copy_data(CopyData::new(&Tuple::new_end().to_bytes().unwrap())) + .copy_data(CopyData::new(&Tuple::new_end().to_bytes())) .await .unwrap(); diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index b8f4dd1f4..6e1c6eabd 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -205,7 +205,7 @@ impl StreamSubscriber { '1' | 'C' | 'Z' => (), 'E' => { return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, + msg.to_bytes(), )?))) } c => return Err(Error::OutOfSync(c)), @@ -284,7 +284,7 @@ impl StreamSubscriber { '2' => (), 'E' => { return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, + msg.to_bytes(), )?))) } c => return Err(Error::SendOutOfSync(c)), @@ -481,7 +481,7 @@ impl StreamSubscriber { match msg.code() { 'E' => { return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, + msg.to_bytes(), )?))) } 'Z' => break, @@ -697,7 +697,7 @@ impl StreamSubscriber { match msg.code() { 'E' => { return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, + msg.to_bytes(), )?))) } 'Z' => (), diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs index 836c64b0b..f19da11f1 100644 --- a/pgdog/src/backend/replication/logical/subscriber/tests.rs +++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs @@ -46,7 +46,7 @@ fn xlog_copy_data(payload: Bytes) -> CopyData { system_clock: 0, bytes: payload, }; - CopyData::bytes(xlog.to_bytes().unwrap()) + CopyData::bytes(xlog.to_bytes()) } fn make_sharded_table() -> Table { @@ -178,8 +178,7 @@ fn begin_copy_data(lsn: i64) -> CopyData { commit_timestamp: 0, xid: 1, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -191,17 +190,16 @@ fn commit_copy_data(end_lsn: i64) -> CopyData { end_lsn, commit_timestamp: 0, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } fn relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(sharded_relation(oid).to_bytes().unwrap()) + xlog_copy_data(sharded_relation(oid).to_bytes()) } fn sharded_test_b_relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(sharded_test_b_relation(oid).to_bytes().unwrap()) + xlog_copy_data(sharded_test_b_relation(oid).to_bytes()) } fn insert_copy_data(oid: Oid, id: &str, value: &str) -> CopyData { @@ -213,8 +211,7 @@ fn insert_copy_data(oid: Oid, id: &str, value: &str) -> CopyData { columns: vec![text_column(id), text_column(value)], }, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -227,8 +224,7 @@ fn delete_copy_data(oid: Oid, id: &str) -> CopyData { }), old: None, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -249,7 +245,7 @@ fn null_column() -> TupleColumn { } fn x_update(u: XLogUpdate) -> CopyData { - xlog_copy_data(u.to_bytes().unwrap()) + xlog_copy_data(u.to_bytes()) } fn make_subscriber() -> StreamSubscriber { @@ -633,10 +629,10 @@ async fn partition_leaves_share_destination() { relation_b.name = "sharded_p2".to_string(); sub.handle(begin_copy_data(100)).await.unwrap(); - sub.handle(xlog_copy_data(relation_a.to_bytes().unwrap())) + sub.handle(xlog_copy_data(relation_a.to_bytes())) .await .unwrap(); - sub.handle(xlog_copy_data(relation_b.to_bytes().unwrap())) + sub.handle(xlog_copy_data(relation_b.to_bytes())) .await .unwrap(); @@ -975,7 +971,7 @@ fn posts_relation(oid: Oid) -> Relation { } fn posts_relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(posts_relation(oid).to_bytes().unwrap()) + xlog_copy_data(posts_relation(oid).to_bytes()) } fn posts_insert_copy_data(oid: Oid, id: &str, title: &str, body: &str) -> CopyData { @@ -987,8 +983,7 @@ fn posts_insert_copy_data(oid: Oid, id: &str, title: &str, body: &str) -> CopyDa columns: vec![text_column(id), text_column(title), text_column(body)], }, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -1004,8 +999,7 @@ fn posts_update_title_copy_data(oid: Oid, id: &str, new_title: &str) -> CopyData columns: vec![text_column(id), text_column(new_title), toasted_column()], }, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -1163,8 +1157,7 @@ async fn toast_update_all_toasted_is_noop() { columns: vec![text_column(&id), toasted_column(), toasted_column()], }, } - .to_bytes() - .unwrap(), + .to_bytes(), )) .await .unwrap(); @@ -1392,7 +1385,7 @@ fn full_identity_relation(oid: Oid) -> Relation { } fn full_identity_relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(full_identity_relation(oid).to_bytes().unwrap()) + xlog_copy_data(full_identity_relation(oid).to_bytes()) } /// Helper: build a FULL-identity UPDATE CopyData. @@ -1457,8 +1450,7 @@ fn full_delete_copy_data(oid: Oid, id: &str, value: &str) -> CopyData { columns: vec![text_column(id), text_column(value)], }), } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -1496,7 +1488,7 @@ fn full_dup_rows_relation(oid: Oid) -> Relation { } fn full_dup_rows_relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(full_dup_rows_relation(oid).to_bytes().unwrap()) + xlog_copy_data(full_dup_rows_relation(oid).to_bytes()) } /// Omni FULL-identity table with `(a TEXT, b TEXT)` and a unique index on `(a, b)`. @@ -1531,7 +1523,7 @@ fn full_omni_dedup_relation(oid: Oid) -> Relation { } fn full_omni_dedup_relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(full_omni_dedup_relation(oid).to_bytes().unwrap()) + xlog_copy_data(full_omni_dedup_relation(oid).to_bytes()) } /// Build an INSERT CopyData for the omni dedup table `(a, b)`. @@ -1544,8 +1536,7 @@ fn omni_insert_copy_data(oid: Oid, a: &str, b: &str) -> CopyData { columns: vec![text_column(a), text_column(b)], }, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } @@ -1567,7 +1558,7 @@ async fn full_identity_nothing_rejected() { let mut rel = full_identity_relation(oid); rel.name = "sharded".to_string(); let err = sub - .handle(xlog_copy_data(rel.to_bytes().unwrap())) + .handle(xlog_copy_data(rel.to_bytes())) .await .expect_err("REPLICA IDENTITY NOTHING must be rejected"); assert!( @@ -2247,8 +2238,7 @@ async fn full_identity_delete_matches_null_column() { columns: vec![text_column(&id), null_column()], }), } - .to_bytes() - .unwrap(), + .to_bytes(), )) .await .unwrap(); @@ -2333,7 +2323,7 @@ fn settings_relation(oid: Oid) -> Relation { } fn settings_relation_copy_data(oid: Oid) -> CopyData { - xlog_copy_data(settings_relation(oid).to_bytes().unwrap()) + xlog_copy_data(settings_relation(oid).to_bytes()) } /// WAL UPDATE for settings(id, name, value) — full new tuple, no toasted columns. @@ -2346,8 +2336,7 @@ fn settings_update_copy_data(oid: Oid, id: &str, name: &str, value: &str) -> Cop columns: vec![text_column(id), text_column(name), text_column(value)], }, } - .to_bytes() - .unwrap(), + .to_bytes(), ) } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 306fcda7e..a903f8180 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -167,7 +167,7 @@ impl Server { ); // Request TLS. - stream.write_all(&Startup::tls().to_bytes()?).await?; + stream.write_all(&Startup::tls().to_bytes()).await?; stream.flush().await?; let mut ssl = BytesMut::new(); @@ -219,8 +219,7 @@ impl Server { stream .write_all( - &Startup::new(&addr.user, &addr.database_name, options.params.clone()) - .to_bytes()?, + &Startup::new(&addr.user, &addr.database_name, options.params.clone()).to_bytes(), ) .await?; stream.flush().await?; @@ -297,7 +296,7 @@ impl Server { // ErrorResponse (B) 'E' => { return Err(Error::ConnectionError(Box::new(ErrorResponse::from_bytes( - message.to_bytes()?, + message.to_bytes(), )?))); } // NoticeResponse (B) @@ -360,7 +359,7 @@ impl Server { pub async fn cancel(addr: &Address, id: &BackendKeyData) -> Result<(), Error> { let mut stream = TcpStream::connect(addr.addr().await?).await?; stream - .write_all(&Startup::Cancel { id: *id }.to_bytes()?) + .write_all(&Startup::Cancel { id: *id }.to_bytes()) .await?; stream.flush().await?; @@ -529,7 +528,7 @@ impl Server { self.statement_executed = false; } 'E' => { - let error = ErrorResponse::from_bytes(message.to_bytes()?)?; + let error = ErrorResponse::from_bytes(message.to_bytes())?; self.schema_changed = error.code == "0A000"; self.stats.error(); @@ -548,11 +547,11 @@ impl Server { self.streaming = true; } 'S' => { - let ps = ParameterStatus::from_bytes(message.to_bytes()?)?; + let ps = ParameterStatus::from_bytes(message.to_bytes())?; self.changed_params.insert(ps.name, ps.value); } 'C' => { - let cmd = CommandComplete::from_bytes(message.to_bytes()?)?; + let cmd = CommandComplete::from_bytes(message.to_bytes())?; match cmd.command() { "PREPARE" | "DEALLOCATE" => self.sync_prepared = true, "DEALLOCATE ALL" => self.prepared_statements.clear(), @@ -791,7 +790,7 @@ impl Server { } if message.code() == 'E' { - err = Some(ErrorResponse::from_bytes(message.to_bytes()?)?); + err = Some(ErrorResponse::from_bytes(message.to_bytes())?); } messages.push(message); } @@ -818,13 +817,13 @@ impl Server { let notices = messages.iter().filter(|m| m.code() == 'N'); for notice in notices { - let notice = NoticeResponse::from_bytes(notice.to_bytes()?)?; + let notice = NoticeResponse::from_bytes(notice.to_bytes())?; warn!("{} [{}]", notice.message.message, self.addr()); } let error = messages.iter().find(|m| m.code() == 'E'); if let Some(error) = error { - let error = ErrorResponse::from_bytes(error.to_bytes()?)?; + let error = ErrorResponse::from_bytes(error.to_bytes())?; Err(Error::ExecutionError(Box::new(error))) } else { Ok(messages) @@ -840,7 +839,7 @@ impl Server { Ok(messages .into_iter() .filter(|message| message.code() == 'D') - .map(|message| message.to_bytes().unwrap()) + .map(|message| message.to_bytes()) .map(DataRow::from_bytes) .collect::, crate::net::Error>>()? .into_iter() @@ -949,7 +948,7 @@ impl Server { '3' => self.prepared_statements.remove(close.name()), 'E' => { return Err(Error::PreparedStatementError(Box::new( - ErrorResponse::from_bytes(response.to_bytes()?)?, + ErrorResponse::from_bytes(response.to_bytes())?, ))); } c => { @@ -1165,7 +1164,7 @@ impl Drop for Server { ); spawn(async move { - stream.write_all(&Terminate.to_bytes()?).await?; + stream.write_all(&Terminate.to_bytes()).await?; stream.flush().await?; Ok::<(), Error>(()) }); @@ -1307,7 +1306,7 @@ pub mod test { assert!(matches!(startup, Startup::Startup { .. })); socket - .write_all(&Authentication::ClearTextPassword.to_bytes().unwrap()) + .write_all(&Authentication::ClearTextPassword.to_bytes()) .await .unwrap(); @@ -1315,15 +1314,15 @@ pub mod test { assert_eq!(password.password(), Some(expected_secret.as_str())); socket - .write_all(&Authentication::Ok.to_bytes().unwrap()) + .write_all(&Authentication::Ok.to_bytes()) .await .unwrap(); socket - .write_all(&BackendKeyData::new().to_bytes().unwrap()) + .write_all(&BackendKeyData::new().to_bytes()) .await .unwrap(); socket - .write_all(&ReadyForQuery::idle().to_bytes().unwrap()) + .write_all(&ReadyForQuery::idle().to_bytes()) .await .unwrap(); } @@ -1368,7 +1367,7 @@ pub mod test { assert!(matches!(startup, Startup::Startup { .. })); socket - .write_all(&Authentication::ClearTextPassword.to_bytes().unwrap()) + .write_all(&Authentication::ClearTextPassword.to_bytes()) .await .unwrap(); @@ -1376,15 +1375,15 @@ pub mod test { assert_eq!(password.password(), Some(expected_secret.as_str())); socket - .write_all(&Authentication::Ok.to_bytes().unwrap()) + .write_all(&Authentication::Ok.to_bytes()) .await .unwrap(); socket - .write_all(&BackendKeyData::new().to_bytes().unwrap()) + .write_all(&BackendKeyData::new().to_bytes()) .await .unwrap(); socket - .write_all(&ReadyForQuery::idle().to_bytes().unwrap()) + .write_all(&ReadyForQuery::idle().to_bytes()) .await .unwrap(); } @@ -2134,7 +2133,7 @@ pub mod test { let msg = server.read().await.unwrap(); assert_eq!(c, msg.code()); if c == 'D' { - let data_row = DataRow::from_bytes(msg.to_bytes().unwrap()).unwrap(); + let data_row = DataRow::from_bytes(msg.to_bytes()).unwrap(); let result: i64 = data_row.get(0, Format::Text).unwrap(); assert_eq!(result, 1); // We prepared SELECT 1, SELECT 2 is ignored. } @@ -3327,7 +3326,7 @@ pub mod test { .iter() .find(|m| m.code() == 'D') .expect("expected DataRow"); - let data_row = DataRow::from_bytes(data_row.to_bytes().unwrap()).unwrap(); + let data_row = DataRow::from_bytes(data_row.to_bytes()).unwrap(); let value: i32 = data_row.get(0, Format::Text).unwrap(); assert_eq!(value, expected); assert!(server.done()); diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 47601c767..e2a6f6033 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -163,7 +163,7 @@ impl Client { &passwords.iter().map(|s| s.to_string()).collect::>(), ); stream.send_flush(&md5.challenge()).await?; - let password = Password::from_bytes(stream.read().await?.to_bytes()?)?; + let password = Password::from_bytes(stream.read().await?.to_bytes())?; if let Password::PasswordMessage { response } = password { if md5.check(&response) { AuthResult::Ok @@ -192,7 +192,7 @@ impl Client { .send_flush(&Authentication::ClearTextPassword) .await?; let response = stream.read().await?; - let response = Password::from_bytes(response.to_bytes()?)?; + let response = Password::from_bytes(response.to_bytes())?; let is_match = passwords .iter() .any(|p| Some(p.as_str()) == response.password()); @@ -245,7 +245,7 @@ impl Client { .send_flush(&Authentication::ClearTextPassword) .await?; let password = stream.read().await?; - let password = Password::from_bytes(password.to_bytes()?)?; + let password = Password::from_bytes(password.to_bytes())?; // Passthrough authentication assumes the client password is good // and lets Postgres perform the authentication instead. If Postgres // returns an error, the connection pool will be banned and the client @@ -607,7 +607,7 @@ impl Client { if message.code() == 'X' { return Ok(BufferEvent::DisconnectGraceful); } else { - let message = ProtocolMessage::from_bytes(message.to_bytes()?)?; + let message = ProtocolMessage::from_bytes(message.to_bytes())?; self.client_request.push(message); } } diff --git a/pgdog/src/frontend/client/query_engine/multi_step/state.rs b/pgdog/src/frontend/client/query_engine/multi_step/state.rs index cff6af113..ede62a358 100644 --- a/pgdog/src/frontend/client/query_engine/multi_step/state.rs +++ b/pgdog/src/frontend/client/query_engine/multi_step/state.rs @@ -36,7 +36,7 @@ impl MultiServerState { Ok(match code { 'T' | '1' | '2' | '3' | 't' => *count == 1, 'C' => { - let command_complete = CommandComplete::from_bytes(message.to_bytes()?)?; + let command_complete = CommandComplete::from_bytes(message.to_bytes())?; self.rows += command_complete.rows()?.unwrap_or(0); false } diff --git a/pgdog/src/frontend/client/query_engine/multi_step/test/mod.rs b/pgdog/src/frontend/client/query_engine/multi_step/test/mod.rs index ffd1c8085..8dd342ded 100644 --- a/pgdog/src/frontend/client/query_engine/multi_step/test/mod.rs +++ b/pgdog/src/frontend/client/query_engine/multi_step/test/mod.rs @@ -10,9 +10,7 @@ pub mod simple; pub mod update; async fn truncate_table(table: &str, stream: &mut TcpStream) { - let query = Query::new(format!("TRUNCATE {}", table)) - .to_bytes() - .unwrap(); + let query = Query::new(format!("TRUNCATE {}", table)).to_bytes(); stream.write_all(&query).await.unwrap(); stream.flush().await.unwrap(); diff --git a/pgdog/src/frontend/client/query_engine/multi_step/test/prepared.rs b/pgdog/src/frontend/client/query_engine/multi_step/test/prepared.rs index 947850250..4ec59426a 100644 --- a/pgdog/src/frontend/client/query_engine/multi_step/test/prepared.rs +++ b/pgdog/src/frontend/client/query_engine/multi_step/test/prepared.rs @@ -43,39 +43,39 @@ mod insert { let exec = Execute::new(); let sync = Sync; - stream.write_all(&stmt.to_bytes().unwrap()).await.unwrap(); - stream.write_all(&desc.to_bytes().unwrap()).await.unwrap(); - stream.write_all(&flush.to_bytes().unwrap()).await.unwrap(); + stream.write_all(&stmt.to_bytes()).await.unwrap(); + stream.write_all(&desc.to_bytes()).await.unwrap(); + stream.write_all(&flush.to_bytes()).await.unwrap(); stream.flush().await.unwrap(); let _ = read_messages(&mut stream, &['1', 't', 'T']).await; - stream.write_all(¶ms.to_bytes().unwrap()).await.unwrap(); - stream.write_all(&exec.to_bytes().unwrap()).await.unwrap(); - stream.write_all(&sync.to_bytes().unwrap()).await.unwrap(); + stream.write_all(¶ms.to_bytes()).await.unwrap(); + stream.write_all(&exec.to_bytes()).await.unwrap(); + stream.write_all(&sync.to_bytes()).await.unwrap(); stream.flush().await.unwrap(); let messages = read_messages(&mut stream, &['2', 'D', 'D', 'D', 'D', 'C', 'Z']).await; // Assert DataRow values (messages[1..5] are the 4 DataRow messages) - let row1 = DataRow::from_bytes(messages[1].to_bytes().unwrap()).unwrap(); + let row1 = DataRow::from_bytes(messages[1].to_bytes()).unwrap(); assert_eq!(row1.get::(0, Format::Text), Some(123423425245)); assert_eq!(row1.get_text(1), Some("test_value_1".to_string())); - let row2 = DataRow::from_bytes(messages[2].to_bytes().unwrap()).unwrap(); + let row2 = DataRow::from_bytes(messages[2].to_bytes()).unwrap(); assert_eq!(row2.get::(0, Format::Text), Some(123423425246)); assert_eq!(row2.get_text(1), Some("test_value_2".to_string())); - let row3 = DataRow::from_bytes(messages[3].to_bytes().unwrap()).unwrap(); + let row3 = DataRow::from_bytes(messages[3].to_bytes()).unwrap(); assert_eq!(row3.get::(0, Format::Text), Some(123423425247)); assert_eq!(row3.get_text(1), Some("test_value_3".to_string())); - let row4 = DataRow::from_bytes(messages[4].to_bytes().unwrap()).unwrap(); + let row4 = DataRow::from_bytes(messages[4].to_bytes()).unwrap(); assert_eq!(row4.get::(0, Format::Text), Some(12342342524823424)); assert_eq!(row4.get_text(1), Some("test_value_4".to_string())); // Assert CommandComplete returns 4 rows - let cc = CommandComplete::from_bytes(messages[5].to_bytes().unwrap()).unwrap(); + let cc = CommandComplete::from_bytes(messages[5].to_bytes()).unwrap(); assert_eq!(cc.rows().unwrap(), Some(4)); truncate_table("sharded", &mut stream).await; diff --git a/pgdog/src/frontend/client/query_engine/multi_step/test/simple.rs b/pgdog/src/frontend/client/query_engine/multi_step/test/simple.rs index ba90dc8e7..0e0b91297 100644 --- a/pgdog/src/frontend/client/query_engine/multi_step/test/simple.rs +++ b/pgdog/src/frontend/client/query_engine/multi_step/test/simple.rs @@ -32,8 +32,7 @@ mod insert { stream .write_all( &Query::new(format!("INSERT INTO sharded (id, value) VALUES {}", values)) - .to_bytes() - .unwrap(), + .to_bytes(), ) .await .unwrap(); @@ -42,8 +41,7 @@ mod insert { let messages = read_messages(&mut stream, &['C', 'Z']).await; assert_eq!(messages.len(), 2); - let command_complete = - CommandComplete::from_bytes(messages[0].to_bytes().unwrap()).unwrap(); + let command_complete = CommandComplete::from_bytes(messages[0].to_bytes()).unwrap(); assert_eq!(command_complete.rows().unwrap().unwrap(), 5); truncate_table("sharded", &mut stream).await; diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 996786f5d..c4f875afa 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -163,7 +163,7 @@ impl QueryEngine { self.stats.query(); let mut two_pc_auto = false; - let state = ReadyForQuery::from_bytes(message.to_bytes()?)?.state()?; + let state = ReadyForQuery::from_bytes(message.to_bytes())?.state()?; match state { TransactionState::Error => { diff --git a/pgdog/src/frontend/client/query_engine/test/replicas.rs b/pgdog/src/frontend/client/query_engine/test/replicas.rs index 53ba6dba1..29dff7e03 100644 --- a/pgdog/src/frontend/client/query_engine/test/replicas.rs +++ b/pgdog/src/frontend/client/query_engine/test/replicas.rs @@ -17,7 +17,7 @@ async fn test_round_robin_with_replicas() { // Write goes to primary. let query = Query::new("CREATE TABLE IF NOT EXISTS test_round_robin_replicas (id BIGINT)"); - len_sent += query.to_bytes().unwrap().len(); + len_sent += query.to_bytes().len(); client.send_simple(query).await; for msg in client.read_until('Z').await.unwrap() { len_recv += msg.len(); @@ -29,10 +29,10 @@ async fn test_round_robin_with_replicas() { let bind = Bind::new_statement("test"); let execute = Execute::new(); let sync = Sync; - len_sent += parse.to_bytes().unwrap().len(); - len_sent += bind.to_bytes().unwrap().len(); - len_sent += execute.to_bytes().unwrap().len(); - len_sent += sync.to_bytes().unwrap().len(); + len_sent += parse.to_bytes().len(); + len_sent += bind.to_bytes().len(); + len_sent += execute.to_bytes().len(); + len_sent += sync.to_bytes().len(); client.send(parse).await; client.send(bind).await; diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index d47c6b216..f20beaf23 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -102,7 +102,7 @@ macro_rules! new_client { pub fn buffer(messages: &[impl ToBytes]) -> BytesMut { let mut buf = BytesMut::new(); for message in messages { - buf.put(message.to_bytes().unwrap()); + buf.put(message.to_bytes()); } buf } @@ -138,7 +138,7 @@ pub async fn read_messages(stream: &mut (impl AsyncRead + Unpin), codes: &[char] if error { panic!( "{:#}", - ErrorResponse::from_bytes(message.to_bytes().unwrap()).unwrap() + ErrorResponse::from_bytes(message.to_bytes()).unwrap() ); } else { result.push(message); @@ -153,7 +153,7 @@ macro_rules! buffer { let mut buf = BytesMut::new(); $( - buf.put($msg.to_bytes().unwrap()); + buf.put($msg.to_bytes()); )* buf @@ -198,7 +198,7 @@ async fn test_multiple_async() { let mut buf = vec![]; for i in 0..50 { let q = Query::new(format!("SELECT {}::bigint AS one", i)); - buf.extend(&q.to_bytes().unwrap()) + buf.extend(&q.to_bytes()) } conn.write_all(&buf).await.unwrap(); @@ -250,9 +250,7 @@ async fn test_multiple_async() { assert_eq!(codes, ['T', 'D', 'C', 'Z']); } - conn.write_all(&Terminate.to_bytes().unwrap()) - .await - .unwrap(); + conn.write_all(&Terminate.to_bytes()).await.unwrap(); handle.await.unwrap(); let dbs = databases(); @@ -313,27 +311,25 @@ async fn test_parse_describe_flush_bind_execute_close_sync() { let mut buf = BytesMut::new(); - buf.put(Parse::new_anonymous("SELECT 1").to_bytes().unwrap()); - buf.put(Describe::new_statement("").to_bytes().unwrap()); - buf.put(Flush.to_bytes().unwrap()); + buf.put(Parse::new_anonymous("SELECT 1").to_bytes()); + buf.put(Describe::new_statement("").to_bytes()); + buf.put(Flush.to_bytes()); conn.write_all(&buf).await.unwrap(); let _ = read!(conn, ['1', 't', 'T']); let mut buf = BytesMut::new(); - buf.put(Bind::new_statement("").to_bytes().unwrap()); - buf.put(Execute::new().to_bytes().unwrap()); - buf.put(Close::named("").to_bytes().unwrap()); - buf.put(Sync.to_bytes().unwrap()); + buf.put(Bind::new_statement("").to_bytes()); + buf.put(Execute::new().to_bytes()); + buf.put(Close::named("").to_bytes()); + buf.put(Sync.to_bytes()); conn.write_all(&buf).await.unwrap(); let _ = read!(conn, ['2', 'D', 'C', '3', 'Z']); - conn.write_all(&Terminate.to_bytes().unwrap()) - .await - .unwrap(); + conn.write_all(&Terminate.to_bytes()).await.unwrap(); handle.await.unwrap(); } diff --git a/pgdog/src/frontend/client/test/test_client.rs b/pgdog/src/frontend/client/test/test_client.rs index af686517a..f23c766e2 100644 --- a/pgdog/src/frontend/client/test/test_client.rs +++ b/pgdog/src/frontend/client/test/test_client.rs @@ -62,7 +62,7 @@ pub async fn read_message(conn: &mut TcpStream) -> Message { /// Send a protocol message to a TCP stream. pub async fn send_message(conn: &mut TcpStream, message: impl Protocol) { - let message = message.to_bytes().expect("message to convert to bytes"); + let message = message.to_bytes(); conn.write_all(&message).await.expect("write_all"); conn.flush().await.expect("flush"); } diff --git a/pgdog/src/frontend/prepared_statements/rewrite.rs b/pgdog/src/frontend/prepared_statements/rewrite.rs index 92dc18343..1d7c416e2 100644 --- a/pgdog/src/frontend/prepared_statements/rewrite.rs +++ b/pgdog/src/frontend/prepared_statements/rewrite.rs @@ -79,7 +79,7 @@ mod test { let parse = Parse::named("__sqlx_1", "SELECT * FROM users"); let mut parse = ProtocolMessage::from(parse); rewrite.rewrite(&mut parse).unwrap(); - let parse = Parse::from_bytes(parse.to_bytes().unwrap()).unwrap(); + let parse = Parse::from_bytes(parse.to_bytes()).unwrap(); assert!(!parse.anonymous()); assert_eq!(parse.name(), "__pgdog_1"); @@ -88,13 +88,13 @@ mod test { let bind = Bind::new_statement("__sqlx_1"); let mut bind_msg = ProtocolMessage::from(bind); rewrite.rewrite(&mut bind_msg).unwrap(); - let bind = Bind::from_bytes(bind_msg.to_bytes().unwrap()).unwrap(); + let bind = Bind::from_bytes(bind_msg.to_bytes()).unwrap(); assert_eq!(bind.statement(), "__pgdog_1"); let describe = Describe::new_statement("__sqlx_1"); let mut describe = ProtocolMessage::from(describe); rewrite.rewrite(&mut describe).unwrap(); - let describe = Describe::from_bytes(describe.to_bytes().unwrap()).unwrap(); + let describe = Describe::from_bytes(describe.to_bytes()).unwrap(); assert_eq!(describe.statement(), "__pgdog_1"); assert_eq!(describe.kind(), 'S'); @@ -110,7 +110,7 @@ mod test { let parse = Parse::new_anonymous("SELECT * FROM users"); let mut parse = ProtocolMessage::from(parse); rewrite.rewrite(&mut parse).unwrap(); - let parse = Parse::from_bytes(parse.to_bytes().unwrap()).unwrap(); + let parse = Parse::from_bytes(parse.to_bytes()).unwrap(); assert!(!parse.anonymous()); assert_eq!(parse.query(), "SELECT * FROM users"); diff --git a/pgdog/src/frontend/router/parser/binary/header.rs b/pgdog/src/frontend/router/parser/binary/header.rs index ff2d3c95c..70e5f5c70 100644 --- a/pgdog/src/frontend/router/parser/binary/header.rs +++ b/pgdog/src/frontend/router/parser/binary/header.rs @@ -74,12 +74,12 @@ impl Header { } impl ToBytes for Header { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { let mut payload = BytesMut::new(); payload.extend(SIGNATURE.iter()); payload.put_i32(self.flags); payload.put_i32(self.header_extension); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/frontend/router/parser/binary/tuple.rs b/pgdog/src/frontend/router/parser/binary/tuple.rs index ef59d59dc..87e6e37fe 100644 --- a/pgdog/src/frontend/router/parser/binary/tuple.rs +++ b/pgdog/src/frontend/router/parser/binary/tuple.rs @@ -170,7 +170,7 @@ impl Tuple { } impl ToBytes for Tuple { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut result = BytesMut::new(); if self.end { result.put_i16(-1); @@ -187,7 +187,7 @@ impl ToBytes for Tuple { } } - Ok(result.freeze()) + result.freeze() } } diff --git a/pgdog/src/frontend/router/parser/copy.rs b/pgdog/src/frontend/router/parser/copy.rs index 545220929..20d3e787e 100644 --- a/pgdog/src/frontend/router/parser/copy.rs +++ b/pgdog/src/frontend/router/parser/copy.rs @@ -262,7 +262,7 @@ impl CopyParser { if self.headers { if let Some(header) = stream.header()? { rows.push(CopyRow::new( - &header.to_bytes()?, + &header.to_bytes(), self.schema_shard.clone().unwrap_or(Shard::All), )); self.headers = false; @@ -299,7 +299,7 @@ impl CopyParser { Shard::All }; - rows.push(CopyRow::new(&tuple.to_bytes()?, shard)); + rows.push(CopyRow::new(&tuple.to_bytes(), shard)); } } } diff --git a/pgdog/src/net/messages/auth/mod.rs b/pgdog/src/net/messages/auth/mod.rs index 08c3599fd..ee4ab8252 100644 --- a/pgdog/src/net/messages/auth/mod.rs +++ b/pgdog/src/net/messages/auth/mod.rs @@ -80,26 +80,26 @@ impl Protocol for Authentication { } impl ToBytes for Authentication { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); match self { Authentication::Ok => { payload.put_i32(0); - Ok(payload.freeze()) + payload.freeze() } Authentication::ClearTextPassword => { payload.put_i32(3); - Ok(payload.freeze()) + payload.freeze() } Authentication::Md5(salt) => { payload.put_i32(5); payload.put(salt.clone()); - Ok(payload.freeze()) + payload.freeze() } Authentication::Sasl(mechanism) => { @@ -107,21 +107,21 @@ impl ToBytes for Authentication { payload.put_string(mechanism); payload.put_u8(0); - Ok(payload.freeze()) + payload.freeze() } Authentication::SaslContinue(data) => { payload.put_i32(11); payload.put(Bytes::copy_from_slice(data.as_bytes())); - Ok(payload.freeze()) + payload.freeze() } Authentication::SaslFinal(data) => { payload.put_i32(12); payload.put(Bytes::copy_from_slice(data.as_bytes())); - Ok(payload.freeze()) + payload.freeze() } } } diff --git a/pgdog/src/net/messages/auth/password.rs b/pgdog/src/net/messages/auth/password.rs index 83bf85cef..56012b143 100644 --- a/pgdog/src/net/messages/auth/password.rs +++ b/pgdog/src/net/messages/auth/password.rs @@ -63,7 +63,7 @@ impl FromBytes for Password { } impl ToBytes for Password { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); match self { Password::SASLInitialResponse { name, response } => { @@ -77,7 +77,7 @@ impl ToBytes for Password { } } - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/backend_key.rs b/pgdog/src/net/messages/backend_key.rs index 9e9e4bc70..2c3b20350 100644 --- a/pgdog/src/net/messages/backend_key.rs +++ b/pgdog/src/net/messages/backend_key.rs @@ -151,13 +151,13 @@ impl BackendKeyData { } impl ToBytes for BackendKeyData { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { let mut payload = Payload::named(self.code()); payload.put_i32(self.pid); payload.put_slice(self.secret.as_slice()); - Ok(payload.freeze()) + payload.freeze() } } @@ -197,7 +197,7 @@ mod tests { #[test] fn test_backend_key_roundtrip_legacy() { let key = BackendKeyData::legacy(42, 1234); - let roundtrip = BackendKeyData::from_bytes(key.to_bytes().unwrap()).unwrap(); + let roundtrip = BackendKeyData::from_bytes(key.to_bytes()).unwrap(); assert_eq!(roundtrip, key); assert_eq!(roundtrip.secret.len(), 4); } @@ -208,7 +208,7 @@ mod tests { pid: 7, secret: SecretKey::random(32), }; - let roundtrip = BackendKeyData::from_bytes(key.to_bytes().unwrap()).unwrap(); + let roundtrip = BackendKeyData::from_bytes(key.to_bytes()).unwrap(); assert_eq!(roundtrip, key); assert_eq!(roundtrip.secret.len(), 32); } diff --git a/pgdog/src/net/messages/bind.rs b/pgdog/src/net/messages/bind.rs index 70d868031..231f97935 100644 --- a/pgdog/src/net/messages/bind.rs +++ b/pgdog/src/net/messages/bind.rs @@ -407,10 +407,10 @@ impl FromBytes for Bind { } impl ToBytes for Bind { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { // Fast path. if let Some(ref original) = self.original { - return Ok(original.clone()); + return original.clone(); } let mut payload = Payload::named(self.code()); @@ -432,7 +432,7 @@ impl ToBytes for Bind { } payload.put_i16((self.results.len() / 2) as i16); payload.put(self.results.clone()); - Ok(payload.freeze()) + payload.freeze() } } @@ -478,7 +478,7 @@ mod test { buf.freeze() }, }; - let bytes = bind.to_bytes().unwrap(); + let bytes = bind.to_bytes(); let mut original = Bind::from_bytes(bytes.clone()).unwrap(); original.original = None; assert_eq!(original, bind); @@ -493,7 +493,7 @@ mod test { .await .unwrap(); let res = conn.read().await.unwrap(); - let err = ErrorResponse::from_bytes(res.to_bytes().unwrap()).unwrap(); + let err = ErrorResponse::from_bytes(res.to_bytes()).unwrap(); assert_eq!(err.code, "26000"); let anon = Bind::default(); @@ -533,12 +533,12 @@ mod test { for c in ['1', '2', 'D', 'C', 'Z'] { let msg = server.read().await.unwrap(); if msg.code() == 'E' { - let err = ErrorResponse::from_bytes(msg.to_bytes().unwrap()).unwrap(); + let err = ErrorResponse::from_bytes(msg.to_bytes()).unwrap(); panic!("{:?}", err); } if msg.code() == 'D' { - let dr = DataRow::from_bytes(msg.to_bytes().unwrap()).unwrap(); + let dr = DataRow::from_bytes(msg.to_bytes()).unwrap(); let r = dr.get::(0, Format::Binary).unwrap(); assert_eq!(r, json); } @@ -566,7 +566,7 @@ mod test { let params: Vec = (0..count).map(|_| Parameter::new_null()).collect(); let bind = Bind::new_params("__pgdog_large", ¶ms); - let bytes = bind.to_bytes().unwrap(); + let bytes = bind.to_bytes(); let decoded = Bind::from_bytes(bytes.clone()).unwrap(); assert_eq!(decoded.params_raw().len(), count); diff --git a/pgdog/src/net/messages/bind_complete.rs b/pgdog/src/net/messages/bind_complete.rs index 4d907a59c..2f96f1587 100644 --- a/pgdog/src/net/messages/bind_complete.rs +++ b/pgdog/src/net/messages/bind_complete.rs @@ -14,9 +14,9 @@ impl FromBytes for BindComplete { } impl ToBytes for BindComplete { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let payload = Payload::named(self.code()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/buffer.rs b/pgdog/src/net/messages/buffer.rs index 3052cc3e8..228ab813a 100644 --- a/pgdog/src/net/messages/buffer.rs +++ b/pgdog/src/net/messages/buffer.rs @@ -162,7 +162,7 @@ mod test { let mut rng = StdRng::from_os_rng(); for i in 0..5000 { - let msg = Sync.to_bytes().unwrap(); + let msg = Sync.to_bytes(); conn.write_all(&msg).await.unwrap(); let query_len = rng.random_range(10..=1000); @@ -170,9 +170,7 @@ mod test { .map(|_| rng.sample(rand::distr::Alphanumeric) as char) .collect(); - let msg = Parse::named(format!("test_{}", i), &query) - .to_bytes() - .unwrap(); + let msg = Parse::named(format!("test_{}", i), &query).to_bytes(); conn.write_all(&msg).await.unwrap(); conn.flush().await.unwrap(); } @@ -202,7 +200,7 @@ mod test { assert_eq!(msg.code(), 'S'); } else { assert_eq!(msg.code(), 'P'); - let parse = Parse::from_bytes(msg.to_bytes().unwrap()).unwrap(); + let parse = Parse::from_bytes(msg.to_bytes()).unwrap(); assert_eq!(parse.name(), format!("test_{}", counter / 2)); } @@ -252,11 +250,11 @@ mod test { // Create a large message (10KB query) let large_query = "SELECT * FROM ".to_string() + &"x".repeat(10_000); - let large_msg = Parse::named("large", &large_query).to_bytes().unwrap(); + let large_msg = Parse::named("large", &large_query).to_bytes(); stream_data.extend_from_slice(&large_msg); // Create a small message - let small_msg = Sync.to_bytes().unwrap(); + let small_msg = Sync.to_bytes(); stream_data.extend_from_slice(&small_msg); let mut cursor = Cursor::new(stream_data); @@ -316,9 +314,7 @@ mod test { // Create several small messages that won't exceed BUFFER_SIZE for i in 0..10 { let query = format!("SELECT {}", i); - let msg = Parse::named(format!("stmt_{}", i), &query) - .to_bytes() - .unwrap(); + let msg = Parse::named(format!("stmt_{}", i), &query).to_bytes(); stream_data.extend_from_slice(&msg); } diff --git a/pgdog/src/net/messages/close.rs b/pgdog/src/net/messages/close.rs index c7632b0db..4058d5165 100644 --- a/pgdog/src/net/messages/close.rs +++ b/pgdog/src/net/messages/close.rs @@ -78,8 +78,8 @@ impl FromBytes for Close { } impl ToBytes for Close { - fn to_bytes(&self) -> Result { - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.payload.clone() } } @@ -96,6 +96,6 @@ mod test { #[test] fn test_close() { let close = Close::named("test"); - assert_eq!(close.len(), close.to_bytes().unwrap().len()); + assert_eq!(close.len(), close.to_bytes().len()); } } diff --git a/pgdog/src/net/messages/close_complete.rs b/pgdog/src/net/messages/close_complete.rs index a9cfda465..7cda29291 100644 --- a/pgdog/src/net/messages/close_complete.rs +++ b/pgdog/src/net/messages/close_complete.rs @@ -18,7 +18,7 @@ impl FromBytes for CloseComplete { } impl ToBytes for CloseComplete { - fn to_bytes(&self) -> Result { - Ok(Payload::named('3').freeze()) + fn to_bytes(&self) -> Bytes { + Payload::named('3').freeze() } } diff --git a/pgdog/src/net/messages/command_complete.rs b/pgdog/src/net/messages/command_complete.rs index e36168470..4e2349487 100644 --- a/pgdog/src/net/messages/command_complete.rs +++ b/pgdog/src/net/messages/command_complete.rs @@ -85,8 +85,8 @@ impl CommandComplete { } impl ToBytes for CommandComplete { - fn to_bytes(&self) -> Result { - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.payload.clone() } } diff --git a/pgdog/src/net/messages/copy_data.rs b/pgdog/src/net/messages/copy_data.rs index 91000f07e..13bd623b9 100644 --- a/pgdog/src/net/messages/copy_data.rs +++ b/pgdog/src/net/messages/copy_data.rs @@ -76,11 +76,11 @@ impl FromBytes for CopyData { } impl ToBytes for CopyData { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put(self.data.clone()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/copy_done.rs b/pgdog/src/net/messages/copy_done.rs index 1aa3b188b..a0fe83c8f 100644 --- a/pgdog/src/net/messages/copy_done.rs +++ b/pgdog/src/net/messages/copy_done.rs @@ -13,8 +13,8 @@ impl FromBytes for CopyDone { } impl ToBytes for CopyDone { - fn to_bytes(&self) -> Result { - Ok(Payload::named(self.code()).freeze()) + fn to_bytes(&self) -> Bytes { + Payload::named(self.code()).freeze() } } diff --git a/pgdog/src/net/messages/copy_fail.rs b/pgdog/src/net/messages/copy_fail.rs index 9b74cde9e..ffcdc7cc2 100644 --- a/pgdog/src/net/messages/copy_fail.rs +++ b/pgdog/src/net/messages/copy_fail.rs @@ -24,10 +24,10 @@ impl FromBytes for CopyFail { } impl ToBytes for CopyFail { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put(self.error.clone()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/data_row.rs b/pgdog/src/net/messages/data_row.rs index c11900c03..39f4dbaef 100644 --- a/pgdog/src/net/messages/data_row.rs +++ b/pgdog/src/net/messages/data_row.rs @@ -214,7 +214,7 @@ impl FromBytes for DataRow { } impl ToBytes for DataRow { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put_i16(self.columns.len() as i16); @@ -227,7 +227,7 @@ impl ToBytes for DataRow { } } - Ok(payload.freeze()) + payload.freeze() } } @@ -267,7 +267,7 @@ mod test { dr.add(Data::null()); dr.add("world"); - let serialized = dr.to_bytes().unwrap(); + let serialized = dr.to_bytes(); let deserialized = DataRow::from_bytes(serialized).unwrap(); assert_eq!(deserialized.len(), 5); diff --git a/pgdog/src/net/messages/describe.rs b/pgdog/src/net/messages/describe.rs index 79d5142cc..60ed440ae 100644 --- a/pgdog/src/net/messages/describe.rs +++ b/pgdog/src/net/messages/describe.rs @@ -42,12 +42,8 @@ impl FromBytes for Describe { } impl ToBytes for Describe { - fn to_bytes(&self) -> Result { - if let Some(ref original) = self.original { - return Ok(original.clone()); - } - - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.original.as_ref().unwrap_or(&self.payload).clone() } } @@ -136,10 +132,10 @@ mod test { .await .unwrap(); let res = conn.read().await.unwrap(); - let err = ErrorResponse::from_bytes(res.to_bytes().unwrap()).unwrap(); + let err = ErrorResponse::from_bytes(res.to_bytes()).unwrap(); assert_eq!(err.code, "34000"); let describe = Describe::new_statement("test"); - assert_eq!(describe.len(), describe.to_bytes().unwrap().len()); + assert_eq!(describe.len(), describe.to_bytes().len()); } } diff --git a/pgdog/src/net/messages/empty_query_response.rs b/pgdog/src/net/messages/empty_query_response.rs index b7d6962d0..8e6cbfda1 100644 --- a/pgdog/src/net/messages/empty_query_response.rs +++ b/pgdog/src/net/messages/empty_query_response.rs @@ -13,8 +13,8 @@ impl FromBytes for EmptyQueryResponse { } impl ToBytes for EmptyQueryResponse { - fn to_bytes(&self) -> Result { - Ok(Payload::named(self.code()).freeze()) + fn to_bytes(&self) -> Bytes { + Payload::named(self.code()).freeze() } } diff --git a/pgdog/src/net/messages/error_response.rs b/pgdog/src/net/messages/error_response.rs index 3ec1e8bfb..5a7ed6a14 100644 --- a/pgdog/src/net/messages/error_response.rs +++ b/pgdog/src/net/messages/error_response.rs @@ -296,7 +296,7 @@ impl FromBytes for ErrorResponse { } impl ToBytes for ErrorResponse { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put_u8(b'S'); @@ -333,7 +333,7 @@ impl ToBytes for ErrorResponse { payload.put_u8(0); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/execute.rs b/pgdog/src/net/messages/execute.rs index 41696d007..eca73cf48 100644 --- a/pgdog/src/net/messages/execute.rs +++ b/pgdog/src/net/messages/execute.rs @@ -94,8 +94,8 @@ impl FromBytes for Execute { } impl ToBytes for Execute { - fn to_bytes(&self) -> Result { - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.payload.clone() } } diff --git a/pgdog/src/net/messages/fastpath.rs b/pgdog/src/net/messages/fastpath.rs index bb073df4c..dc730f6d3 100644 --- a/pgdog/src/net/messages/fastpath.rs +++ b/pgdog/src/net/messages/fastpath.rs @@ -25,11 +25,11 @@ impl FromBytes for Fastpath { } impl ToBytes for Fastpath { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put(self.body.clone()); - Ok(payload.freeze()) + payload.freeze() } } @@ -58,7 +58,7 @@ mod test { assert_eq!(fp.code(), 'F'); // to_bytes must reproduce the exact wire frame. - let serialized = fp.to_bytes().unwrap(); + let serialized = fp.to_bytes(); assert_eq!(serialized, original); } } diff --git a/pgdog/src/net/messages/flush.rs b/pgdog/src/net/messages/flush.rs index 9b018223f..0ed8fd4f9 100644 --- a/pgdog/src/net/messages/flush.rs +++ b/pgdog/src/net/messages/flush.rs @@ -17,9 +17,9 @@ impl FromBytes for Flush { } impl ToBytes for Flush { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let payload = Payload::named(self.code()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/hello.rs b/pgdog/src/net/messages/hello.rs index 34b322ab0..1b1cd0f91 100644 --- a/pgdog/src/net/messages/hello.rs +++ b/pgdog/src/net/messages/hello.rs @@ -180,7 +180,7 @@ impl Startup { } impl super::ToBytes for Startup { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { match self { Startup::Ssl => { let mut buf = BytesMut::new(); @@ -188,7 +188,7 @@ impl super::ToBytes for Startup { buf.put_i32(8); buf.put_i32(80877103); - Ok(buf.freeze()) + buf.freeze() } Startup::GssEnc => { @@ -197,7 +197,7 @@ impl super::ToBytes for Startup { buf.put_i32(8); buf.put_i32(80877104); - Ok(buf.freeze()) + buf.freeze() } Startup::Cancel { id } => { @@ -207,7 +207,7 @@ impl super::ToBytes for Startup { payload.put_i32(id.pid); payload.put_slice(id.secret.as_slice()); - Ok(payload.freeze()) + payload.freeze() } Startup::Startup { @@ -233,7 +233,7 @@ impl super::ToBytes for Startup { payload.put(params_buf); payload.put_u8(0); // Terminating null character. - Ok(payload.freeze()) + payload.freeze() } } } @@ -247,11 +247,11 @@ pub enum SslReply { } impl ToBytes for SslReply { - fn to_bytes(&self) -> Result { - Ok(match self { + fn to_bytes(&self) -> bytes::Bytes { + match self { SslReply::Yes => Bytes::from("S"), SslReply::No => Bytes::from("N"), - }) + } } } @@ -307,7 +307,7 @@ mod test { #[test] fn test_ssl() { let ssl = Startup::Ssl; - let mut bytes = ssl.to_bytes().unwrap(); + let mut bytes = ssl.to_bytes(); assert_eq!(bytes.get_i32(), 8); // len assert_eq!(bytes.get_i32(), 80877103); // request code @@ -316,7 +316,7 @@ mod test { #[test] fn test_gssenc() { let gss = Startup::gss_enc(); - let mut bytes = gss.to_bytes().unwrap(); + let mut bytes = gss.to_bytes(); assert_eq!(bytes.get_i32(), 8); // len assert_eq!(bytes.get_i32(), 80877104); // request code @@ -340,7 +340,7 @@ mod test { unrecognized_options: vec![], }; - let bytes = startup.to_bytes().unwrap(); + let bytes = startup.to_bytes(); assert_eq!(bytes.clone().get_i32(), 41); } @@ -369,7 +369,7 @@ mod test { "postgres", vec![], ); - write.write_all(&startup.to_bytes().unwrap()).await.unwrap(); + write.write_all(&startup.to_bytes()).await.unwrap(); }); let startup = Startup::from_stream(&mut read).await.unwrap(); @@ -468,7 +468,7 @@ mod test { let cancel = Startup::Cancel { id: BackendKeyData::new_client(ProtocolVersion::V3_2), }; - let bytes = cancel.to_bytes().unwrap(); + let bytes = cancel.to_bytes(); let (mut write, mut read) = tokio::io::duplex(512); tokio::spawn(async move { diff --git a/pgdog/src/net/messages/mod.rs b/pgdog/src/net/messages/mod.rs index 1f726bca9..a125e7ab7 100644 --- a/pgdog/src/net/messages/mod.rs +++ b/pgdog/src/net/messages/mod.rs @@ -82,7 +82,7 @@ pub trait ToBytes { /// Create the protocol message as an array of bytes. /// The message must conform to the spec. No additional manipulation /// of the data will take place. - fn to_bytes(&self) -> Result; + fn to_bytes(&self) -> Bytes; } /// Convert a PostgreSQL wire protocol message to a Rust struct. @@ -98,7 +98,7 @@ pub trait Protocol: ToBytes + FromBytes + std::fmt::Debug { /// Convert to message. fn message(&self) -> Result { - Ok(Message::new(self.to_bytes()?)) + Ok(Message::new(self.to_bytes())) } /// Message is part of a stream and should not be buffered. @@ -185,8 +185,8 @@ impl std::fmt::Debug for Message { } impl ToBytes for Message { - fn to_bytes(&self) -> Result { - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.payload.clone() } } @@ -287,7 +287,7 @@ macro_rules! from_message { type Error = crate::net::Error; fn try_from(message: Message) -> Result<$ty, Self::Error> { - <$ty as FromBytes>::from_bytes(message.to_bytes()?) + <$ty as FromBytes>::from_bytes(message.to_bytes()) } } }; diff --git a/pgdog/src/net/messages/negotiate_protocol_version.rs b/pgdog/src/net/messages/negotiate_protocol_version.rs index 74b1badeb..0aeebad68 100644 --- a/pgdog/src/net/messages/negotiate_protocol_version.rs +++ b/pgdog/src/net/messages/negotiate_protocol_version.rs @@ -26,7 +26,7 @@ impl NegotiateProtocolVersion { } impl ToBytes for NegotiateProtocolVersion { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { let mut payload = Payload::named(self.code()); payload.put_i32(self.version.as_i32()); payload.put_i32(self.unrecognized_options.len() as i32); @@ -34,7 +34,7 @@ impl ToBytes for NegotiateProtocolVersion { payload.put_string(option); } - Ok(payload.freeze()) + payload.freeze() } } @@ -79,7 +79,7 @@ mod tests { vec!["_pq_.command_tag".into(), "_pq_.other".into()], ); - let roundtrip = NegotiateProtocolVersion::from_bytes(message.to_bytes().unwrap()).unwrap(); + let roundtrip = NegotiateProtocolVersion::from_bytes(message.to_bytes()).unwrap(); assert_eq!(roundtrip, message); } } diff --git a/pgdog/src/net/messages/no_data.rs b/pgdog/src/net/messages/no_data.rs index 61bc6424d..83fcbc4df 100644 --- a/pgdog/src/net/messages/no_data.rs +++ b/pgdog/src/net/messages/no_data.rs @@ -11,9 +11,9 @@ impl FromBytes for NoData { } impl ToBytes for NoData { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let payload = Payload::named(self.code()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/notice_response.rs b/pgdog/src/net/messages/notice_response.rs index 5101b8a25..b2425f5cc 100644 --- a/pgdog/src/net/messages/notice_response.rs +++ b/pgdog/src/net/messages/notice_response.rs @@ -16,11 +16,11 @@ impl FromBytes for NoticeResponse { } impl ToBytes for NoticeResponse { - fn to_bytes(&self) -> Result { - let mut message = BytesMut::from(self.message.to_bytes()?); + fn to_bytes(&self) -> Bytes { + let mut message = BytesMut::from(self.message.to_bytes()); message[0] = self.code() as u8; - Ok(message.freeze()) + message.freeze() } } diff --git a/pgdog/src/net/messages/notification_response.rs b/pgdog/src/net/messages/notification_response.rs index d08a25de5..1b4ac72ef 100644 --- a/pgdog/src/net/messages/notification_response.rs +++ b/pgdog/src/net/messages/notification_response.rs @@ -57,8 +57,8 @@ impl FromBytes for NotificationResponse { } impl ToBytes for NotificationResponse { - fn to_bytes(&self) -> Result { - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.payload.clone() } } diff --git a/pgdog/src/net/messages/parameter_description.rs b/pgdog/src/net/messages/parameter_description.rs index 729abf7ea..cedc4ebc2 100644 --- a/pgdog/src/net/messages/parameter_description.rs +++ b/pgdog/src/net/messages/parameter_description.rs @@ -20,14 +20,14 @@ impl FromBytes for ParameterDescription { } impl ToBytes for ParameterDescription { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put_u16(self.params.len() as u16); for param in &self.params { payload.put_i32(*param); } - Ok(payload.freeze()) + payload.freeze() } } @@ -60,7 +60,7 @@ mod test { params: params.clone(), }; - let bytes = description.to_bytes().unwrap(); + let bytes = description.to_bytes(); let mut buf = bytes.clone(); assert_eq!(buf.get_u8(), b't'); let len = buf.get_i32(); @@ -79,7 +79,7 @@ mod test { params: params.clone(), }; - let bytes = description.to_bytes().unwrap(); + let bytes = description.to_bytes(); let mut buf = bytes.clone(); assert_eq!(buf.get_u8(), b't'); let len = buf.get_i32(); diff --git a/pgdog/src/net/messages/parameter_status.rs b/pgdog/src/net/messages/parameter_status.rs index 15c573989..4decff06c 100644 --- a/pgdog/src/net/messages/parameter_status.rs +++ b/pgdog/src/net/messages/parameter_status.rs @@ -73,13 +73,13 @@ impl ParameterStatus { } impl ToBytes for ParameterStatus { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { let mut payload = Payload::named(self.code()); payload.put_string(&self.name); - payload.put(self.value.to_bytes()?); + payload.put(self.value.to_bytes()); - Ok(payload.freeze()) + payload.freeze() } } @@ -168,7 +168,7 @@ mod test { value: ParameterValue::String("UTF8".into()), }; - let bytes = original.to_bytes().unwrap(); + let bytes = original.to_bytes(); let parsed = ParameterStatus::from_bytes(bytes).unwrap(); assert_eq!(parsed.name, original.name); diff --git a/pgdog/src/net/messages/parse.rs b/pgdog/src/net/messages/parse.rs index 08a753123..e02ea9789 100644 --- a/pgdog/src/net/messages/parse.rs +++ b/pgdog/src/net/messages/parse.rs @@ -185,10 +185,10 @@ impl FromBytes for Parse { } impl ToBytes for Parse { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { // Fast path when the contents haven't been changed. if let Some(ref original) = self.original { - return Ok(original.clone()); + return original.clone(); } let mut payload = Payload::named(self.code()); @@ -198,7 +198,7 @@ impl ToBytes for Parse { payload.put(self.query.clone()); payload.put(self.data_types.clone()); - Ok(payload.freeze()) + payload.freeze() } } @@ -217,7 +217,7 @@ mod test { #[test] fn test_parse() { let parse = Parse::named("test", "SELECT $1"); - let b = parse.to_bytes().unwrap(); + let b = parse.to_bytes(); assert_eq!(parse.len(), b.len()); } @@ -241,7 +241,7 @@ mod test { assert_eq!(parse.query(), "SELECT * FROM users"); assert_eq!(&parse.query[..], b"SELECT * FROM users\0"); assert_eq!(&parse.name[..], b"__pgdog_1\0"); - assert_eq!(parse.to_bytes().unwrap().len(), parse.len()); + assert_eq!(parse.to_bytes().len(), parse.len()); let mut b = BytesMut::new(); b.put_u8(b'P'); diff --git a/pgdog/src/net/messages/parse_complete.rs b/pgdog/src/net/messages/parse_complete.rs index 08137bc49..3924f9816 100644 --- a/pgdog/src/net/messages/parse_complete.rs +++ b/pgdog/src/net/messages/parse_complete.rs @@ -14,9 +14,9 @@ impl FromBytes for ParseComplete { } impl ToBytes for ParseComplete { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let payload = Payload::named(self.code()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/payload.rs b/pgdog/src/net/messages/payload.rs index bda4513da..014f6ed53 100644 --- a/pgdog/src/net/messages/payload.rs +++ b/pgdog/src/net/messages/payload.rs @@ -51,7 +51,7 @@ impl Payload { /// Finish assembly and return final bytes array. pub fn freeze(self) -> Bytes { use super::ToBytes; - self.to_bytes().unwrap() + self.to_bytes() } /// Add a C-style string to the payload. It will be NULL-terminated @@ -78,7 +78,7 @@ impl DerefMut for Payload { } impl super::ToBytes for Payload { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { let len = if self.with_len { Some(self.bytes.len() as i32 + 4) // self } else { @@ -99,6 +99,6 @@ impl super::ToBytes for Payload { } buf.put_slice(&self.bytes); - Ok(buf.freeze()) + buf.freeze() } } diff --git a/pgdog/src/net/messages/query.rs b/pgdog/src/net/messages/query.rs index 95dd17731..4bfb21884 100644 --- a/pgdog/src/net/messages/query.rs +++ b/pgdog/src/net/messages/query.rs @@ -63,8 +63,8 @@ impl FromBytes for Query { } impl ToBytes for Query { - fn to_bytes(&self) -> Result { - Ok(self.payload.clone()) + fn to_bytes(&self) -> Bytes { + self.payload.clone() } } @@ -87,7 +87,7 @@ mod test { #[test] fn test_query() { let query = Query::new("SELECT 1, 2, 3"); - let query = Query::from_bytes(query.to_bytes().unwrap()).unwrap(); + let query = Query::from_bytes(query.to_bytes()).unwrap(); assert_eq!(query.query(), "SELECT 1, 2, 3"); } } diff --git a/pgdog/src/net/messages/replication/hot_standby_feedback.rs b/pgdog/src/net/messages/replication/hot_standby_feedback.rs index 3896c5f41..7b72f1550 100644 --- a/pgdog/src/net/messages/replication/hot_standby_feedback.rs +++ b/pgdog/src/net/messages/replication/hot_standby_feedback.rs @@ -27,7 +27,7 @@ impl FromBytes for HotStandbyFeedback { } impl ToBytes for HotStandbyFeedback { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = BytesMut::new(); payload.put_u8(b'h'); payload.put_i64(self.system_clock); @@ -36,7 +36,7 @@ impl ToBytes for HotStandbyFeedback { payload.put_i32(self.catalog_min); payload.put_i32(self.epoch_catalog_min); - Ok(payload.freeze()) + payload.freeze() } } @@ -54,7 +54,7 @@ mod tests { epoch_catalog_min: 13, }; - let bytes = feedback.to_bytes().expect("serialize hot standby feedback"); + let bytes = feedback.to_bytes(); let decoded = HotStandbyFeedback::from_bytes(bytes).expect("decode hot standby feedback"); assert_eq!(decoded.system_clock, 1234); diff --git a/pgdog/src/net/messages/replication/keep_alive.rs b/pgdog/src/net/messages/replication/keep_alive.rs index 81e191dc0..9edb75778 100644 --- a/pgdog/src/net/messages/replication/keep_alive.rs +++ b/pgdog/src/net/messages/replication/keep_alive.rs @@ -15,7 +15,7 @@ pub struct KeepAlive { impl KeepAlive { pub fn wrapped(self) -> Result { - Ok(CopyData::new(&ReplicationMeta::KeepAlive(self).to_bytes()?)) + Ok(CopyData::new(&ReplicationMeta::KeepAlive(self).to_bytes())) } /// Origin expects reply. @@ -36,14 +36,14 @@ impl FromBytes for KeepAlive { } impl ToBytes for KeepAlive { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = BytesMut::new(); payload.put_u8(b'k'); payload.put_i64(self.wal_end); payload.put_i64(self.system_clock); payload.put_u8(self.reply); - Ok(payload.freeze()) + payload.freeze() } } @@ -61,7 +61,7 @@ mod tests { assert!(ka.reply()); - let bytes = ka.to_bytes().expect("serialize keepalive"); + let bytes = ka.to_bytes(); let decoded = KeepAlive::from_bytes(bytes).expect("decode keepalive"); assert_eq!(decoded.wal_end, 9876); diff --git a/pgdog/src/net/messages/replication/logical/begin.rs b/pgdog/src/net/messages/replication/logical/begin.rs index 5878f8194..2a4338478 100644 --- a/pgdog/src/net/messages/replication/logical/begin.rs +++ b/pgdog/src/net/messages/replication/logical/begin.rs @@ -22,14 +22,14 @@ impl FromBytes for Begin { } impl ToBytes for Begin { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::new(); bytes.put_u8(self.code() as u8); bytes.put_i64(self.final_transaction_lsn); bytes.put_i64(self.commit_timestamp); bytes.put_i32(self.xid); - Ok(bytes.freeze()) + bytes.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/commit.rs b/pgdog/src/net/messages/replication/logical/commit.rs index e76ebf256..82b097b25 100644 --- a/pgdog/src/net/messages/replication/logical/commit.rs +++ b/pgdog/src/net/messages/replication/logical/commit.rs @@ -24,7 +24,7 @@ impl FromBytes for Commit { } impl ToBytes for Commit { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::new(); bytes.put_u8(self.code() as u8); bytes.put_i8(self.flags); @@ -32,7 +32,7 @@ impl ToBytes for Commit { bytes.put_i64(self.end_lsn); bytes.put_i64(self.commit_timestamp); - Ok(bytes.freeze()) + bytes.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/delete.rs b/pgdog/src/net/messages/replication/logical/delete.rs index 8a4e66fb2..a480f045a 100644 --- a/pgdog/src/net/messages/replication/logical/delete.rs +++ b/pgdog/src/net/messages/replication/logical/delete.rs @@ -34,18 +34,18 @@ impl Delete { } impl ToBytes for Delete { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut buf = BytesMut::new(); buf.put_u8(b'D'); buf.put_u32(self.oid.0); if let Some(ref key) = self.key { buf.put_u8(b'K'); - buf.put(key.to_bytes()?); + buf.put(key.to_bytes()); } else if let Some(ref old) = self.old { buf.put_u8(b'O'); - buf.put(old.to_bytes()?); + buf.put(old.to_bytes()); } - Ok(buf.freeze()) + buf.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/insert.rs b/pgdog/src/net/messages/replication/logical/insert.rs index c914ae10e..552a78c19 100644 --- a/pgdog/src/net/messages/replication/logical/insert.rs +++ b/pgdog/src/net/messages/replication/logical/insert.rs @@ -23,13 +23,13 @@ impl Insert { } impl ToBytes for Insert { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut buf = BytesMut::new(); buf.put_u8(b'I'); buf.put_u32(self.oid.0); buf.put_u8(b'N'); - buf.put(self.tuple_data.to_bytes()?); - Ok(buf.freeze()) + buf.put(self.tuple_data.to_bytes()); + buf.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/relation.rs b/pgdog/src/net/messages/replication/logical/relation.rs index e679afebe..f6bc82451 100644 --- a/pgdog/src/net/messages/replication/logical/relation.rs +++ b/pgdog/src/net/messages/replication/logical/relation.rs @@ -55,7 +55,7 @@ impl Column { } impl ToBytes for Relation { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::wrapped('R'); payload.put_u32(self.oid.0); payload.put_string(&self.namespace); @@ -70,7 +70,7 @@ impl ToBytes for Relation { payload.put_i32(column.type_modifier); } - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/stream_start.rs b/pgdog/src/net/messages/replication/logical/stream_start.rs index dfbdecdc7..f6724b1f3 100644 --- a/pgdog/src/net/messages/replication/logical/stream_start.rs +++ b/pgdog/src/net/messages/replication/logical/stream_start.rs @@ -19,12 +19,12 @@ impl FromBytes for StreamStart { } impl ToBytes for StreamStart { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = BytesMut::new(); payload.put_u8(b'S'); payload.put_i32(self.xid); payload.put_i8(self.first); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/tuple_data.rs b/pgdog/src/net/messages/replication/logical/tuple_data.rs index 1cdfe2b15..7c0a33f45 100644 --- a/pgdog/src/net/messages/replication/logical/tuple_data.rs +++ b/pgdog/src/net/messages/replication/logical/tuple_data.rs @@ -224,7 +224,7 @@ impl FromBytes for TupleData { } impl ToBytes for TupleData { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut buf = BytesMut::new(); buf.put_i16(self.columns.len() as i16); for col in &self.columns { @@ -243,7 +243,7 @@ impl ToBytes for TupleData { } } } - Ok(buf.freeze()) + buf.freeze() } } diff --git a/pgdog/src/net/messages/replication/logical/update.rs b/pgdog/src/net/messages/replication/logical/update.rs index bdcb6ab25..161a3545d 100644 --- a/pgdog/src/net/messages/replication/logical/update.rs +++ b/pgdog/src/net/messages/replication/logical/update.rs @@ -80,7 +80,7 @@ impl FromBytes for Update { } impl ToBytes for Update { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { use bytes::BufMut; let mut buf = bytes::BytesMut::new(); buf.put_u8(b'U'); @@ -88,17 +88,17 @@ impl ToBytes for Update { match &self.identity { UpdateIdentity::Key(key) => { buf.put_u8(b'K'); - buf.put(key.to_bytes()?); + buf.put(key.to_bytes()); } UpdateIdentity::Old(old) => { buf.put_u8(b'O'); - buf.put(old.to_bytes()?); + buf.put(old.to_bytes()); } UpdateIdentity::Nothing => {} } buf.put_u8(b'N'); - buf.put(self.new.to_bytes()?); - Ok(buf.freeze()) + buf.put(self.new.to_bytes()); + buf.freeze() } } @@ -228,7 +228,7 @@ mod test { } fn assert_round_trip(u: &Update) { - let bytes = u.to_bytes().unwrap(); + let bytes = u.to_bytes(); let parsed = Update::from_bytes(bytes).unwrap(); assert_eq!(parsed.oid, u.oid); assert_columns_match(&parsed.new, &u.new, "new.columns"); diff --git a/pgdog/src/net/messages/replication/mod.rs b/pgdog/src/net/messages/replication/mod.rs index 28cb37c10..149f6668b 100644 --- a/pgdog/src/net/messages/replication/mod.rs +++ b/pgdog/src/net/messages/replication/mod.rs @@ -39,7 +39,7 @@ impl FromBytes for ReplicationMeta { } impl ToBytes for ReplicationMeta { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { match self { Self::HotStandbyFeedback(hot) => hot.to_bytes(), Self::StatusUpdate(status) => status.to_bytes(), @@ -79,7 +79,7 @@ mod tests { ReplicationMeta::KeepAlive(keepalive.clone()), ReplicationMeta::StatusUpdate(status.clone()), ] { - let bytes = meta.to_bytes().expect("serialize replication meta"); + let bytes = meta.to_bytes(); let decoded = ReplicationMeta::from_bytes(bytes).expect("decode replication meta"); match (meta, decoded) { ( diff --git a/pgdog/src/net/messages/replication/status_update.rs b/pgdog/src/net/messages/replication/status_update.rs index 9f5e0fb33..7759162e1 100644 --- a/pgdog/src/net/messages/replication/status_update.rs +++ b/pgdog/src/net/messages/replication/status_update.rs @@ -21,7 +21,7 @@ pub struct StatusUpdate { impl StatusUpdate { pub fn wrapped(self) -> Result { Ok(CopyData::new( - &ReplicationMeta::StatusUpdate(self).to_bytes()?, + &ReplicationMeta::StatusUpdate(self).to_bytes(), )) } @@ -65,7 +65,7 @@ impl FromBytes for StatusUpdate { } impl ToBytes for StatusUpdate { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = BytesMut::new(); payload.put_u8(b'r'); @@ -75,7 +75,7 @@ impl ToBytes for StatusUpdate { payload.put_i64(self.system_clock); payload.put_u8(self.reply); - Ok(payload.freeze()) + payload.freeze() } } @@ -92,7 +92,7 @@ mod test { system_clock: 4, reply: 5, }; - let su = StatusUpdate::from_bytes(su.to_bytes().unwrap()).unwrap(); + let su = StatusUpdate::from_bytes(su.to_bytes()).unwrap(); assert_eq!(su.last_applied, 1); assert_eq!(su.last_flushed, 2); assert_eq!(su.last_written, 3); diff --git a/pgdog/src/net/messages/replication/xlog_data.rs b/pgdog/src/net/messages/replication/xlog_data.rs index faf40a8f5..08e0c1fb7 100644 --- a/pgdog/src/net/messages/replication/xlog_data.rs +++ b/pgdog/src/net/messages/replication/xlog_data.rs @@ -51,13 +51,13 @@ impl XLogData { starting_point: 0, current_end: 0, system_clock: system_clock - 1, // simulates this to be an older message - bytes: relation.to_bytes()?, + bytes: relation.to_bytes(), }) } /// Convert to message. pub fn to_message(&self) -> Result { - Ok(Message::new(CopyData::bytes(self.to_bytes()?).to_bytes()?)) + Ok(Message::new(CopyData::bytes(self.to_bytes()).to_bytes())) } /// Extract payload. @@ -129,14 +129,14 @@ impl FromBytes for XLogData { } impl ToBytes for XLogData { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = BytesMut::new(); payload.put_u8(self.code() as u8); payload.put_i64(self.starting_point); payload.put_i64(self.current_end); payload.put_i64(self.system_clock); payload.put(self.bytes.clone()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/rfq.rs b/pgdog/src/net/messages/rfq.rs index 9f292c406..b76a8e93b 100644 --- a/pgdog/src/net/messages/rfq.rs +++ b/pgdog/src/net/messages/rfq.rs @@ -50,11 +50,11 @@ impl ReadyForQuery { } impl ToBytes for ReadyForQuery { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { let mut payload = Payload::named(self.code()); payload.put_u8(self.status as u8); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/row_description.rs b/pgdog/src/net/messages/row_description.rs index 3be01c50d..00d86402b 100644 --- a/pgdog/src/net/messages/row_description.rs +++ b/pgdog/src/net/messages/row_description.rs @@ -305,7 +305,7 @@ impl FromBytes for RowDescription { } impl ToBytes for RowDescription { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut payload = Payload::named(self.code()); payload.put_i16(self.fields.len() as i16); @@ -319,7 +319,7 @@ impl ToBytes for RowDescription { payload.put_i16(field.format); } - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/messages/sync.rs b/pgdog/src/net/messages/sync.rs index d537d1102..c55c7b4f0 100644 --- a/pgdog/src/net/messages/sync.rs +++ b/pgdog/src/net/messages/sync.rs @@ -28,8 +28,8 @@ impl FromBytes for Sync { } impl ToBytes for Sync { - fn to_bytes(&self) -> Result { - Ok(Payload::named('S').freeze()) + fn to_bytes(&self) -> Bytes { + Payload::named('S').freeze() } } @@ -45,6 +45,6 @@ mod test { #[test] fn test_sync() { - assert_eq!(Sync.len(), Sync.to_bytes().unwrap().len()); + assert_eq!(Sync.len(), Sync.to_bytes().len()); } } diff --git a/pgdog/src/net/messages/terminate.rs b/pgdog/src/net/messages/terminate.rs index 06320db76..a1c1dc55f 100644 --- a/pgdog/src/net/messages/terminate.rs +++ b/pgdog/src/net/messages/terminate.rs @@ -16,9 +16,9 @@ impl FromBytes for Terminate { } impl ToBytes for Terminate { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let payload = Payload::named(self.code()); - Ok(payload.freeze()) + payload.freeze() } } diff --git a/pgdog/src/net/parameter.rs b/pgdog/src/net/parameter.rs index dc85eee2f..c73daea25 100644 --- a/pgdog/src/net/parameter.rs +++ b/pgdog/src/net/parameter.rs @@ -68,7 +68,7 @@ pub enum ParameterValue { } impl ToBytes for ParameterValue { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::new(); match self { Self::String(string) => bytes.put_slice(string.as_bytes()), @@ -84,7 +84,7 @@ impl ToBytes for ParameterValue { } bytes.put_u8(0); - Ok(bytes.freeze()) + bytes.freeze() } } @@ -649,7 +649,7 @@ mod test { #[test] fn test_parameter_value_to_bytes_string() { let value = ParameterValue::String("test".into()); - let bytes = value.to_bytes().unwrap(); + let bytes = value.to_bytes(); assert_eq!(&bytes[..], b"test\0"); } @@ -657,7 +657,7 @@ mod test { #[test] fn test_parameter_value_to_bytes_tuple() { let value = ParameterValue::Tuple(vec!["a".into(), "b".into()]); - let bytes = value.to_bytes().unwrap(); + let bytes = value.to_bytes(); assert_eq!(&bytes[..], b"a, b\0"); } diff --git a/pgdog/src/net/protocol_message.rs b/pgdog/src/net/protocol_message.rs index 694800145..955d5d12f 100644 --- a/pgdog/src/net/protocol_message.rs +++ b/pgdog/src/net/protocol_message.rs @@ -114,7 +114,7 @@ impl FromBytes for ProtocolMessage { } impl ToBytes for ProtocolMessage { - fn to_bytes(&self) -> Result { + fn to_bytes(&self) -> bytes::Bytes { match self { Self::Bind(bind) => bind.to_bytes(), Self::Parse(parse) => parse.to_bytes(), diff --git a/pgdog/src/net/stream.rs b/pgdog/src/net/stream.rs index bd8644b07..08f305aa4 100644 --- a/pgdog/src/net/stream.rs +++ b/pgdog/src/net/stream.rs @@ -176,7 +176,7 @@ impl Stream { pub async fn send(&mut self, message: &impl Protocol) -> Result { self.io_in_progress = true; let result = async { - let bytes = message.to_bytes()?; + let bytes = message.to_bytes(); match &mut self.inner { StreamInner::Plain(ref mut stream) => eof(stream.write_all(&bytes).await)?,