Skip to content

Commit 4e69595

Browse files
committed
Fix tests fail on allocation failure
Doc fix and fix double subtraction of offset Fix shutdown op test Fix typos and use assert the stopped task's first poll
1 parent 32e26ce commit 4e69595

File tree

4 files changed

+61
-56
lines changed

4 files changed

+61
-56
lines changed

tokio/src/fs/read.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ use std::{io, path::Path};
3030
///
3131
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
3232
///
33+
/// # io_uring support
34+
///
35+
/// On Linux, you can also use io_uring for executing system calls. To enable
36+
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
37+
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
38+
/// runtime option.
39+
///
40+
/// Support for io_uring is currently experimental, so its behavior may change
41+
/// or it may be removed in future versions.
42+
///
3343
/// # Examples
3444
///
3545
/// ```no_run

tokio/src/fs/read_uring.rs

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,19 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
2727
.expect("unexpected in-flight operation detected")
2828
.into();
2929

30-
// extra single capacity for the whole size to fit without any reallocation
31-
let buf = Vec::with_capacity(size_hint.unwrap_or(0));
30+
let mut buf = Vec::new();
3231

33-
read_to_end_uring(size_hint, fd, buf).await
32+
if let Some(size_hint) = size_hint {
33+
buf.try_reserve(size_hint)?;
34+
}
35+
36+
read_to_end_uring(fd, buf).await
3437
}
3538

36-
async fn read_to_end_uring(
37-
size_hint: Option<usize>,
38-
mut fd: OwnedFd,
39-
mut buf: Vec<u8>,
40-
) -> io::Result<Vec<u8>> {
39+
async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
4140
let mut offset = 0;
4241
let start_cap = buf.capacity();
4342

44-
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
45-
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
46-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
47-
48-
if size_read == 0 {
49-
return Ok(r_buf);
50-
}
51-
52-
fd = r_fd;
53-
buf = r_buf;
54-
}
55-
5643
loop {
5744
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
5845
// The buffer might be an exact fit. Let's read into a probe buffer
@@ -67,7 +54,6 @@ async fn read_to_end_uring(
6754

6855
buf = r_buf;
6956
fd = r_fd;
70-
offset += size_read as u64;
7157
}
7258

7359
// buf is full, need more capacity
@@ -79,11 +65,11 @@ async fn read_to_end_uring(
7965
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
8066

8167
// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
82-
// is u32::MAX
83-
let mut read_len = buf_len as u32;
68+
// is less than u32::MAX
69+
let read_len = buf_len as u32;
8470

8571
// read into spare capacity
86-
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, &mut read_len).await?;
72+
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?;
8773

8874
if size_read == 0 {
8975
return Ok(r_buf);
@@ -99,12 +85,12 @@ async fn small_probe_read(
9985
mut buf: Vec<u8>,
10086
offset: &mut u64,
10187
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
102-
let mut read_len = PROBE_SIZE_U32;
88+
let read_len = PROBE_SIZE_U32;
10389

10490
if buf.len() < PROBE_SIZE {
10591
buf.try_reserve(PROBE_SIZE)?;
10692

107-
return op_read(fd, buf, offset, &mut read_len).await;
93+
return op_read(fd, buf, offset, read_len).await;
10894
}
10995

11096
let mut temp_arr = [0; PROBE_SIZE];
@@ -116,8 +102,9 @@ async fn small_probe_read(
116102
// than PROBE_SIZE. So we can read into the discarded length
117103
buf.truncate(back_bytes_len);
118104

119-
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, &mut read_len).await?;
105+
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?;
120106

107+
r_buf.try_reserve(PROBE_SIZE)?;
121108
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
122109

123110
Ok((size_read, r_fd, r_buf))
@@ -127,10 +114,10 @@ async fn op_read(
127114
mut fd: OwnedFd,
128115
mut buf: Vec<u8>,
129116
offset: &mut u64,
130-
read_len: &mut u32,
117+
mut read_len: u32,
131118
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
132119
loop {
133-
let (res, r_fd, r_buf) = Op::read(fd, buf, *read_len, *offset).await;
120+
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;
134121

135122
match res {
136123
Err(e) if e.kind() == ErrorKind::Interrupted => {
@@ -140,9 +127,9 @@ async fn op_read(
140127
Err(e) => return Err(e),
141128
Ok(size_read) => {
142129
*offset += size_read as u64;
143-
*read_len -= size_read;
130+
read_len -= size_read;
144131

145-
if *read_len == 0 || size_read == 0 {
132+
if read_len == 0 || size_read == 0 {
146133
return Ok((size_read, r_fd, r_buf));
147134
}
148135

tokio/src/io/uring/read.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ impl Cancellable for Read {
3838

3939
impl Op<Read> {
4040
// Submit a request to read a FD at given length and offset into a
41-
// dynamic buffer with uinitialized memory. The read happens on unitialized
42-
// buffer and no overwiting happens.
41+
// dynamic buffer with uninitialized memory. The read happens on unitialized
42+
// buffer and no overwriting happens.
4343

4444
// SAFETY: The `len` of the amount to be read and the buffer that is passed
4545
// should have capacity > len.

tokio/tests/fs_uring_read.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
target_os = "linux"
99
))]
1010

11-
use futures::future::FutureExt;
11+
use futures::future::Future;
12+
use std::future::poll_fn;
1213
use std::io::Write;
14+
use std::path::PathBuf;
1315
use std::sync::mpsc;
1416
use std::task::Poll;
1517
use std::time::Duration;
16-
use std::{future::poll_fn, path::PathBuf};
1718
use tempfile::NamedTempFile;
18-
use tokio::{
19-
fs::read,
20-
runtime::{Builder, Runtime},
21-
};
19+
use tokio::fs::read;
20+
use tokio::runtime::{Builder, Runtime};
21+
use tokio_test::assert_pending;
2222
use tokio_util::task::TaskTracker;
2323

2424
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
@@ -49,34 +49,38 @@ fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
4949
#[test]
5050
fn shutdown_runtime_while_performing_io_uring_ops() {
5151
fn run(rt: Runtime) {
52-
let (tx, rx) = mpsc::channel();
5352
let (done_tx, done_rx) = mpsc::channel();
54-
5553
let (_tmp, path) = create_tmp_files(1);
54+
// keep 100 permits
55+
const N: i32 = 100;
5656
rt.spawn(async move {
5757
let path = path[0].clone();
5858

5959
// spawning a bunch of uring operations.
60-
loop {
60+
let mut futs = vec![];
61+
62+
// spawning a bunch of uring operations.
63+
for _ in 0..N {
6164
let path = path.clone();
62-
tokio::spawn(async move {
63-
let bytes = read(path).await.unwrap();
65+
let mut fut = Box::pin(read(path));
6466

65-
assert_eq!(bytes, vec![20; 1023]);
66-
});
67+
poll_fn(|cx| {
68+
assert_pending!(fut.as_mut().poll(cx));
69+
Poll::<()>::Pending
70+
})
71+
.await;
6772

68-
// Avoid busy looping.
69-
tokio::task::yield_now().await;
73+
futs.push(fut);
7074
}
75+
76+
tokio::task::yield_now().await;
7177
});
7278

7379
std::thread::spawn(move || {
74-
let rt: Runtime = rx.recv().unwrap();
7580
rt.shutdown_timeout(Duration::from_millis(300));
7681
done_tx.send(()).unwrap();
7782
});
7883

79-
tx.send(rt).unwrap();
8084
done_rx.recv().unwrap();
8185
}
8286

@@ -130,25 +134,29 @@ async fn read_small_large_files() {
130134
#[tokio::test]
131135
async fn cancel_op_future() {
132136
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
137+
let path = path[0].clone();
133138

134139
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
140+
135141
let handle = tokio::spawn(async move {
136-
poll_fn(|cx| {
137-
let fut = read(&path[0]);
142+
let fut = read(path.clone());
143+
tokio::pin!(fut);
138144

145+
poll_fn(move |cx| {
139146
// If io_uring is enabled (and not falling back to the thread pool),
140147
// the first poll should return Pending.
141-
let _pending = Box::pin(fut).poll_unpin(cx);
142-
143-
tx.send(()).unwrap();
148+
assert_pending!(fut.as_mut().poll(cx));
149+
tx.send(true).unwrap();
144150

145151
Poll::<()>::Pending
146152
})
147153
.await;
148154
});
149155

150156
// Wait for the first poll
151-
rx.recv().await.unwrap();
157+
158+
let val = rx.recv().await;
159+
assert!(val.unwrap());
152160

153161
handle.abort();
154162

0 commit comments

Comments
 (0)