Skip to content

Commit 6261e0e

Browse files
committed
io-uring: Create op_read helper function
1 parent 72f84b7 commit 6261e0e

File tree

1 file changed

+52
-65
lines changed

1 file changed

+52
-65
lines changed

tokio/src/fs/read_uring.rs

Lines changed: 52 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,18 @@ async fn read_to_end_uring(
3939
mut buf: Vec<u8>,
4040
) -> io::Result<Vec<u8>> {
4141
let mut offset = 0;
42-
4342
let start_cap = buf.capacity();
4443

4544
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
4645
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
47-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
46+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
4847

4948
if size_read == 0 {
5049
return Ok(r_buf);
5150
}
5251

53-
buf = r_buf;
5452
fd = r_fd;
55-
offset += size_read as u64;
53+
buf = r_buf;
5654
}
5755

5856
loop {
@@ -61,7 +59,7 @@ async fn read_to_end_uring(
6159
// and see if it returns `Ok(0)`. If so, we've avoided an
6260
// unnecessary increasing of the capacity. But if not, append the
6361
// probe buffer to the primary buffer and let its capacity grow.
64-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
62+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
6563

6664
if size_read == 0 {
6765
return Ok(r_buf);
@@ -77,91 +75,80 @@ async fn read_to_end_uring(
7775
buf.try_reserve(PROBE_SIZE)?;
7876
}
7977

80-
// doesn't matter if we have a valid size_hint or not, if we do more
81-
// than 2 consecutive_short_reads, gradually increase the buffer
82-
// capacity to read more data at a time
83-
8478
// prepare the spare capacity to be read into
8579
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
8680

87-
// SAFETY: buf_len cannot be greater than u32::MAX because max_read_size
81+
// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
8882
// is u32::MAX
8983
let mut read_len = buf_len as u32;
9084

91-
loop {
92-
// read into spare capacity
93-
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await;
94-
95-
match res {
96-
Ok(0) => return Ok(r_buf),
97-
Ok(size_read) => {
98-
fd = r_fd;
99-
buf = r_buf;
100-
offset += size_read as u64;
101-
read_len -= size_read;
102-
103-
// keep reading if there's something left to be read
104-
if read_len > 0 {
105-
continue;
106-
} else {
107-
break;
108-
}
109-
}
110-
Err(e) if e.kind() == ErrorKind::Interrupted => {
111-
buf = r_buf;
112-
fd = r_fd;
85+
// read into spare capacity
86+
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, &mut read_len).await?;
11387

114-
continue;
115-
}
116-
Err(e) => return Err(e),
117-
}
88+
if size_read == 0 {
89+
return Ok(r_buf);
11890
}
91+
92+
fd = r_fd;
93+
buf = r_buf;
11994
}
12095
}
12196

12297
async fn small_probe_read(
123-
mut fd: OwnedFd,
98+
fd: OwnedFd,
12499
mut buf: Vec<u8>,
125-
offset: u64,
100+
offset: &mut u64,
126101
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
127-
let mut temp_arr = [0; PROBE_SIZE];
128-
let has_enough = buf.len() > PROBE_SIZE;
129-
130-
if has_enough {
131-
// if we have more than PROBE_SIZE bytes in the buffer already then
132-
// don't call reserve as we might potentially read 0 bytes
133-
let back_bytes_len = buf.len() - PROBE_SIZE;
134-
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
135-
// We're decreasing the length of the buffer and len is greater
136-
// than PROBE_SIZE. So we can read into the discarded length
137-
buf.truncate(back_bytes_len);
138-
} else {
139-
// we don't even have PROBE_SIZE length in the buffer, we need this
140-
// reservation
141-
buf.reserve_exact(PROBE_SIZE);
102+
let mut read_len = PROBE_SIZE_U32;
103+
104+
if buf.len() < PROBE_SIZE {
105+
buf.try_reserve(PROBE_SIZE)?;
106+
107+
return op_read(fd, buf, offset, &mut read_len).await;
142108
}
143109

110+
let mut temp_arr = [0; PROBE_SIZE];
111+
let back_bytes_len = buf.len() - PROBE_SIZE;
112+
113+
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
114+
115+
// We're decreasing the length of the buffer and len is greater
116+
// than PROBE_SIZE. So we can read into the discarded length
117+
buf.truncate(back_bytes_len);
118+
119+
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, &mut read_len).await?;
120+
121+
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
122+
123+
Ok((size_read, r_fd, r_buf))
124+
}
125+
126+
async fn op_read(
127+
mut fd: OwnedFd,
128+
mut buf: Vec<u8>,
129+
offset: &mut u64,
130+
read_len: &mut u32,
131+
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
144132
loop {
145-
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;
133+
let (res, r_fd, r_buf) = Op::read(fd, buf, *read_len, *offset).await;
146134

147135
match res {
148-
// return early if we inserted into reserved PROBE_SIZE
149-
// bytes
150-
Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)),
136+
Err(e) if e.kind() == ErrorKind::Interrupted => {
137+
buf = r_buf;
138+
fd = r_fd;
139+
}
140+
Err(e) => return Err(e),
151141
Ok(size_read) => {
152-
let old_len = r_buf.len() - (size_read as usize);
142+
*offset += size_read as u64;
143+
*read_len -= size_read;
153144

154-
r_buf.splice(old_len..old_len, temp_arr);
145+
if *read_len == 0 || size_read == 0 {
146+
return Ok((size_read, r_fd, r_buf));
147+
}
155148

156-
return Ok((size_read, r_fd, r_buf));
157-
}
158-
Err(e) if e.kind() == ErrorKind::Interrupted => {
159149
buf = r_buf;
160150
fd = r_fd;
161-
162-
continue;
163151
}
164-
Err(e) => return Err(e),
165152
}
166153
}
167154
}

0 commit comments

Comments
 (0)