Skip to content

Commit 936aef2

Browse files
authored
perf(expr): support writer-style #[function] for bytea type (#23727)
Signed-off-by: Mingzhuo Yin <[email protected]>
1 parent 58daf88 commit 936aef2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+449
-325
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/src/array/bytes_array.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::io::Write;
1516
use std::iter;
16-
use std::mem::size_of;
17+
use std::mem::{ManuallyDrop, size_of};
1718

1819
use risingwave_common_estimate_size::EstimateSize;
1920
use risingwave_pb::common::Buffer;
@@ -250,33 +251,13 @@ impl BytesArrayBuilder {
250251
}
251252
}
252253

254+
/// Note: dropping an unfinished `BytesWriter` will rollback the partial data
253255
pub struct BytesWriter<'a> {
254256
builder: &'a mut BytesArrayBuilder,
255257
}
256258

257-
impl<'a> BytesWriter<'a> {
258-
/// `write_ref` will consume `BytesWriter` and pass the ownership of `builder` to `BytesGuard`.
259-
pub fn write_ref(self, value: &[u8]) {
260-
self.builder.append(Some(value));
261-
}
262-
263-
/// `begin` will create a `PartialBytesWriter`, which allow multiple appendings to create a new
264-
/// record.
265-
pub fn begin(self) -> PartialBytesWriter<'a> {
266-
PartialBytesWriter {
267-
builder: self.builder,
268-
}
269-
}
270-
}
271-
272-
pub struct PartialBytesWriter<'a> {
273-
builder: &'a mut BytesArrayBuilder,
274-
}
275-
276-
impl PartialBytesWriter<'_> {
259+
impl BytesWriter<'_> {
277260
/// `write_ref` will append partial dirty data to `builder`.
278-
/// `PartialBytesWriter::write_ref` is different from `BytesWriter::write_ref`
279-
/// in that it allows us to call it multiple times.
280261
pub fn write_ref(&mut self, value: &[u8]) {
281262
// SAFETY: We'll clean the dirty `builder` in the `drop`.
282263
unsafe { self.builder.append_partial(value) }
@@ -286,10 +267,27 @@ impl PartialBytesWriter<'_> {
286267
/// Exactly one new record was appended and the `builder` can be safely used.
287268
pub fn finish(self) {
288269
self.builder.finish_partial();
270+
let _ = ManuallyDrop::new(self); // Prevent drop
271+
}
272+
}
273+
274+
impl Write for BytesWriter<'_> {
275+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
276+
self.write_ref(buf);
277+
Ok(buf.len())
278+
}
279+
280+
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
281+
self.write_ref(buf);
282+
Ok(())
283+
}
284+
285+
fn flush(&mut self) -> std::io::Result<()> {
286+
Ok(())
289287
}
290288
}
291289

292-
impl Drop for PartialBytesWriter<'_> {
290+
impl Drop for BytesWriter<'_> {
293291
fn drop(&mut self) {
294292
// If `finish` is not called, we should rollback the data.
295293
self.builder.rollback_partial();

src/common/src/array/utf8_array.rs

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::fmt::{Display, Write};
1717
use risingwave_common_estimate_size::EstimateSize;
1818
use risingwave_pb::data::{ArrayType, PbArray};
1919

20-
use super::bytes_array::{BytesWriter, PartialBytesWriter};
20+
use super::bytes_array::BytesWriter;
2121
use super::{Array, ArrayBuilder, BytesArray, BytesArrayBuilder, DataType};
2222
use crate::bitmap::Bitmap;
2323

@@ -105,7 +105,7 @@ impl Utf8Array {
105105
let mut builder = Utf8ArrayBuilder::new(iter.size_hint().0);
106106
for e in iter {
107107
if let Some(s) = e {
108-
let mut writer = builder.writer().begin();
108+
let mut writer = builder.writer();
109109
write!(writer, "{}", s).unwrap();
110110
writer.finish();
111111
} else {
@@ -177,7 +177,7 @@ impl Utf8ArrayBuilder {
177177
/// Append an element as the `Display` format to the array.
178178
pub fn append_display(&mut self, value: Option<impl Display>) {
179179
if let Some(s) = value {
180-
let mut writer = self.writer().begin();
180+
let mut writer = self.writer();
181181
write!(writer, "{}", s).unwrap();
182182
writer.finish();
183183
} else {
@@ -186,35 +186,20 @@ impl Utf8ArrayBuilder {
186186
}
187187
}
188188

189+
/// Note: dropping an unfinished `StringWriter` will rollback the partial data, which is the behavior of the inner `BytesWriter`.
189190
pub struct StringWriter<'a> {
190191
bytes: BytesWriter<'a>,
191192
}
192193

193-
impl<'a> StringWriter<'a> {
194-
/// `begin` will create a `PartialStringWriter`, which allow multiple appendings to create a new
195-
/// record.
196-
pub fn begin(self) -> PartialStringWriter<'a> {
197-
PartialStringWriter {
198-
bytes: self.bytes.begin(),
199-
}
200-
}
201-
}
202-
203-
// Note: dropping an unfinished `PartialStringWriter` will rollback the partial data, which is the
204-
// behavior of the inner `PartialBytesWriter`.
205-
pub struct PartialStringWriter<'a> {
206-
bytes: PartialBytesWriter<'a>,
207-
}
208-
209-
impl PartialStringWriter<'_> {
194+
impl StringWriter<'_> {
210195
/// `finish` will be called while the entire record is written.
211196
/// Exactly one new record was appended and the `builder` can be safely used.
212197
pub fn finish(self) {
213198
self.bytes.finish()
214199
}
215200
}
216201

217-
impl Write for PartialStringWriter<'_> {
202+
impl Write for StringWriter<'_> {
218203
fn write_str(&mut self, s: &str) -> std::fmt::Result {
219204
self.bytes.write_ref(s.as_bytes());
220205
Ok(())
@@ -245,15 +230,14 @@ mod tests {
245230
}
246231

247232
#[test]
248-
fn test_utf8_partial_writer() {
233+
fn test_utf8_writer() {
249234
let mut builder = Utf8ArrayBuilder::new(0);
250235
{
251-
let writer = builder.writer();
252-
let mut partial_writer = writer.begin();
236+
let mut writer = builder.writer();
253237
for _ in 0..2 {
254-
partial_writer.write_str("ran").unwrap();
238+
writer.write_str("ran").unwrap();
255239
}
256-
partial_writer.finish()
240+
writer.finish()
257241
};
258242
let array = builder.finish();
259243
assert_eq!(array.len(), 1);
@@ -262,33 +246,30 @@ mod tests {
262246
}
263247

264248
#[test]
265-
fn test_utf8_partial_writer_failed() {
249+
fn test_utf8_writer_failed() {
266250
let mut builder = Utf8ArrayBuilder::new(0);
267251
// Write a record.
268252
{
269-
let writer = builder.writer();
270-
let mut partial_writer = writer.begin();
271-
partial_writer.write_str("Dia").unwrap();
272-
partial_writer.write_str("na").unwrap();
273-
partial_writer.finish()
253+
let mut writer = builder.writer();
254+
writer.write_str("Dia").unwrap();
255+
writer.write_str("na").unwrap();
256+
writer.finish()
274257
};
275258

276259
// Write a record failed.
277260
{
278-
let writer = builder.writer();
279-
let mut partial_writer = writer.begin();
280-
partial_writer.write_str("Ca").unwrap();
281-
partial_writer.write_str("rol").unwrap();
261+
let mut writer = builder.writer();
262+
writer.write_str("Ca").unwrap();
263+
writer.write_str("rol").unwrap();
282264
// We don't finish here.
283265
};
284266

285267
// Write a record.
286268
{
287-
let writer = builder.writer();
288-
let mut partial_writer = writer.begin();
289-
partial_writer.write_str("Ki").unwrap();
290-
partial_writer.write_str("ra").unwrap();
291-
partial_writer.finish()
269+
let mut writer = builder.writer();
270+
writer.write_str("Ki").unwrap();
271+
writer.write_str("ra").unwrap();
272+
writer.finish()
292273
};
293274

294275
// Verify only two valid records.

src/common/src/cast/mod.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use itertools::Itertools;
16+
use thiserror_ext::AsReport;
1617

1718
use crate::types::{Timestamp, Timestamptz};
1819

@@ -113,11 +114,11 @@ pub fn i64_to_timestamp(t: i64) -> Result<Timestamp> {
113114
}
114115

115116
/// Refer to PostgreSQL's implementation <https://github.com/postgres/postgres/blob/5cb54fc310fb84287cbdc74533f3420490a2f63a/src/backend/utils/adt/varlena.c#L276-L288>
116-
pub fn str_to_bytea(elem: &str) -> Result<Box<[u8]>> {
117+
pub fn str_to_bytea(elem: &str, writer: &mut impl std::io::Write) -> Result<()> {
117118
if let Some(remainder) = elem.strip_prefix(r"\x") {
118-
Ok(parse_bytes_hex(remainder)?.into())
119+
Ok(parse_bytes_hex(remainder, writer)?)
119120
} else {
120-
Ok(parse_bytes_traditional(elem)?.into())
121+
Ok(parse_bytes_traditional(elem, writer)?)
121122
}
122123
}
123124

@@ -132,9 +133,7 @@ fn get_hex(c: u8) -> Result<u8> {
132133
}
133134

134135
/// Refer to <https://www.postgresql.org/docs/current/datatype-binary.html#id-1.5.7.12.10> for specification.
135-
pub fn parse_bytes_hex(s: &str) -> Result<Vec<u8>> {
136-
let mut res = Vec::with_capacity(s.len() / 2);
137-
136+
pub fn parse_bytes_hex(s: &str, writer: &mut impl std::io::Write) -> Result<()> {
138137
let mut bytes = s.bytes();
139138
while let Some(c) = bytes.next() {
140139
// white spaces are tolerated
@@ -146,31 +145,35 @@ pub fn parse_bytes_hex(s: &str) -> Result<Vec<u8>> {
146145
match bytes.next() {
147146
Some(c) => {
148147
let v2 = get_hex(c)?;
149-
res.push((v1 << 4) | v2);
148+
writer
149+
.write_all(&[(v1 << 4) | v2])
150+
.map_err(|e| e.to_report_string())?;
150151
}
151152
None => return Err("invalid hexadecimal data: odd number of digits".to_owned()),
152153
}
153154
}
154155

155-
Ok(res)
156+
Ok(())
156157
}
157158

158159
/// Refer to <https://www.postgresql.org/docs/current/datatype-binary.html#id-1.5.7.12.10> for specification.
159-
pub fn parse_bytes_traditional(s: &str) -> Result<Vec<u8>> {
160+
pub fn parse_bytes_traditional(s: &str, writer: &mut impl std::io::Write) -> Result<()> {
160161
let mut bytes = s.bytes();
161162

162-
let mut res = Vec::new();
163163
while let Some(b) = bytes.next() {
164164
if b != b'\\' {
165-
res.push(b);
165+
writer.write_all(&[b]).map_err(|e| e.to_report_string())?;
166166
} else {
167167
match bytes.next() {
168168
Some(b'\\') => {
169-
res.push(b'\\');
169+
writer.write_all(b"\\").map_err(|e| e.to_report_string())?;
170170
}
171171
Some(b1 @ b'0'..=b'3') => match bytes.next_tuple() {
172172
Some((b2 @ b'0'..=b'7', b3 @ b'0'..=b'7')) => {
173-
res.push(((b1 - b'0') << 6) + ((b2 - b'0') << 3) + (b3 - b'0'));
173+
let byte = (b1 - b'0') << 6 | (b2 - b'0') << 3 | (b3 - b'0');
174+
writer
175+
.write_all(&[byte])
176+
.map_err(|e| e.to_report_string())?;
174177
}
175178
_ => {
176179
// one backslash, not followed by another or ### valid octal
@@ -185,7 +188,7 @@ pub fn parse_bytes_traditional(s: &str) -> Result<Vec<u8>> {
185188
}
186189
}
187190

188-
Ok(res)
191+
Ok(())
189192
}
190193

191194
#[cfg(test)]
@@ -206,6 +209,13 @@ mod tests {
206209
#[test]
207210
fn test_bytea() {
208211
use crate::types::ToText;
212+
213+
fn str_to_bytea(s: &str) -> Result<Box<[u8]>> {
214+
let mut buf = Vec::new();
215+
super::str_to_bytea(s, &mut buf)?;
216+
Ok(buf.into())
217+
}
218+
209219
assert_eq!(str_to_bytea("fgo").unwrap().as_ref().to_text(), r"\x66676f");
210220
assert_eq!(
211221
str_to_bytea(r"\xDeadBeef").unwrap().as_ref().to_text(),

src/common/src/types/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,11 @@ impl ScalarImpl {
10911091
DataType::List(_) => ListValue::from_str(s, data_type)?.into(),
10921092
DataType::Struct(st) => StructValue::from_str(s, st)?.into(),
10931093
DataType::Jsonb => JsonbVal::from_str(s)?.into(),
1094-
DataType::Bytea => str_to_bytea(s)?.into(),
1094+
DataType::Bytea => {
1095+
let mut buf = Vec::new();
1096+
str_to_bytea(s, &mut buf)?;
1097+
buf.into()
1098+
}
10951099
DataType::Vector(size) => VectorVal::from_text(s, *size)?.into(),
10961100
DataType::Map(_m) => return Err("map from text is not supported".into()),
10971101
})

src/connector/src/parser/unified/json.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,9 @@ impl JsonParseOptions {
643643

644644
match self.bytea_handling {
645645
ByteaHandling::Standard => {
646-
str_to_bytea(value_str).map_err(|_| create_error())?.into()
646+
let mut buf = Vec::new();
647+
str_to_bytea(value_str, &mut buf).map_err(|_| create_error())?;
648+
buf.into()
647649
}
648650
ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
649651
.decode(value_str)

src/expr/impl/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ workspace-hack = { path = "../../workspace-hack" }
7474
[dev-dependencies]
7575
criterion = { workspace = true }
7676
expect-test = "1"
77+
hex-literal = "1.1.0"
7778
tokio = { version = "0.2", package = "madsim-tokio", features = [
7879
"rt",
7980
"macros",

src/expr/impl/src/scalar/array_length.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt::Write;
16-
1715
use risingwave_common::array::ListRef;
1816
use risingwave_expr::{ExprError, Result, function};
1917

@@ -185,6 +183,6 @@ fn array_length_of_dim(array: ListRef<'_>, d: i32) -> Result<Option<i32>> {
185183
/// select array_dims(array[array[]::int[]]); -- would be `[1:1][1:0]` after multidimensional support
186184
/// ```
187185
#[function("array_dims(anyarray) -> varchar")]
188-
fn array_dims(array: ListRef<'_>, writer: &mut impl Write) {
186+
fn array_dims(array: ListRef<'_>, writer: &mut impl std::fmt::Write) {
189187
write!(writer, "[1:{}]", array.len()).unwrap();
190188
}

0 commit comments

Comments
 (0)