From 769f309b9259dbf014c03c01c04218476a89b734 Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Fri, 17 Oct 2025 12:51:01 +0530 Subject: [PATCH 1/6] io-uring: Implement tokio::fs::read --- tokio/src/fs/read.rs | 53 +++++++++++ tokio/src/io/uring/mod.rs | 1 + tokio/src/io/uring/read.rs | 42 +++++++++ tokio/src/runtime/driver/op.rs | 2 + tokio/tests/fs_uring_read.rs | 155 +++++++++++++++++++++++++++++++++ 5 files changed, 253 insertions(+) create mode 100644 tokio/src/io/uring/read.rs create mode 100644 tokio/tests/fs_uring_read.rs diff --git a/tokio/src/fs/read.rs b/tokio/src/fs/read.rs index 36edf2a5b1e..371c1ead6a6 100644 --- a/tokio/src/fs/read.rs +++ b/tokio/src/fs/read.rs @@ -45,5 +45,58 @@ use std::{io, path::Path}; /// ``` pub async fn read(path: impl AsRef) -> io::Result> { let path = path.as_ref().to_owned(); + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init()? { + return read_uring(&path).await; + } + } + asyncify(move || std::fs::read(path)).await } + +#[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] +async fn read_uring(path: &Path) -> io::Result> { + use crate::{fs::OpenOptions, runtime::driver::op::Op}; + use std::os::fd::OwnedFd; + + let file = OpenOptions::new().read(true).open(path).await?; + + let size = file + .metadata() + .await + .map(|m| m.len() as usize) + .ok() + .unwrap_or(0); + + let buf = Vec::with_capacity(size); + + let fd: OwnedFd = file + .try_into_std() + .expect("unexpected in-flight operation detected") + .into(); + + let (read_size, mut buf) = Op::read(fd, buf)?.await?; + + // SAFETY: + // 1. The buffer is initialized with `size` capacity + // 2. the read_size is the number of bytes read from the file + unsafe { buf.set_len(read_size as _) }; + + Ok(buf) +} diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index 4899d0a4a0b..facad596f63 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod open; +pub(crate) mod read; pub(crate) mod utils; pub(crate) mod write; diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs new file mode 100644 index 00000000000..c7232f93cb5 --- /dev/null +++ b/tokio/src/io/uring/read.rs @@ -0,0 +1,42 @@ +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use io_uring::{opcode, types}; +use std::{ + io, + os::fd::{AsRawFd, OwnedFd}, +}; + +#[derive(Debug)] +pub(crate) struct Read { + buf: Vec, + fd: OwnedFd, +} + +impl Completable for Read { + type Output = (u32, Vec); + fn complete(self, cqe: CqeResult) -> io::Result { + let res = cqe.result?; + + Ok((res, self.buf)) + } +} + +impl Cancellable for Read { + fn cancel(self) -> CancelData { + CancelData::Read(self) + } +} + +impl Op { + /// Submit a request to open a file. + pub(crate) fn read(fd: OwnedFd, mut buf: Vec) -> io::Result { + let buf_mut_ptr = buf.as_mut_ptr(); + let cap = buf.capacity() as _; + + let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, cap).build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + let op = unsafe { Op::new(read_op, Read { fd, buf }) }; + + Ok(op) + } +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 37945cf5a0d..d2b9289ceee 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,4 +1,5 @@ use crate::io::uring::open::Open; +use crate::io::uring::read::Read; use crate::io::uring::write::Write; use crate::runtime::Handle; @@ -17,6 +18,7 @@ use std::task::{Context, Poll, Waker}; pub(crate) enum CancelData { Open(Open), Write(Write), + Read(Read), } #[derive(Debug)] diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs new file mode 100644 index 00000000000..6f83c321249 --- /dev/null +++ b/tokio/tests/fs_uring_read.rs @@ -0,0 +1,155 @@ +//! Uring file operations tests. + +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use futures::future::FutureExt; +use std::io::Write; +use std::sync::mpsc; +use std::task::Poll; +use std::time::Duration; +use std::{future::poll_fn, path::PathBuf}; +use tempfile::NamedTempFile; +use tokio::{ + fs::read, + runtime::{Builder, Runtime}, +}; +use tokio_util::task::TaskTracker; + +fn multi_rt(n: usize) -> Box Runtime> { + Box::new(move || { + Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() + }) +} + +fn current_rt() -> Box Runtime> { + Box::new(|| Builder::new_current_thread().enable_all().build().unwrap()) +} + +fn rt_combinations() -> Vec Runtime>> { + vec![ + current_rt(), + multi_rt(1), + multi_rt(2), + multi_rt(8), + multi_rt(64), + multi_rt(256), + ] +} + +#[test] +fn shutdown_runtime_while_performing_io_uring_ops() { + fn run(rt: Runtime) { + let (tx, rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + + let (_tmp, path) = create_tmp_files(1); + rt.spawn(async move { + let path = path[0].clone(); + + // spawning a bunch of uring operations. + loop { + let path = path.clone(); + tokio::spawn(async move { + let bytes = read(path).await.unwrap(); + + assert_eq!(bytes, vec![20; 2]); + }); + + // Avoid busy looping. + tokio::task::yield_now().await; + } + }); + + std::thread::spawn(move || { + let rt: Runtime = rx.recv().unwrap(); + rt.shutdown_timeout(Duration::from_millis(300)); + done_tx.send(()).unwrap(); + }); + + tx.send(rt).unwrap(); + done_rx.recv().unwrap(); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[test] +fn read_many_files() { + fn run(rt: Runtime) { + const NUM_FILES: usize = 512; + + let (_tmp_files, paths): (Vec, Vec) = create_tmp_files(NUM_FILES); + + rt.block_on(async move { + let tracker = TaskTracker::new(); + + for i in 0..10_000 { + let path = paths.get(i % NUM_FILES).unwrap().clone(); + tracker.spawn(async move { + let bytes = read(path).await.unwrap(); + assert_eq!(bytes, vec![20; 2]); + }); + } + tracker.close(); + tracker.wait().await; + }); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[tokio::test] +async fn cancel_op_future() { + let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + poll_fn(|cx| { + let fut = read(&path[0]); + + // If io_uring is enabled (and not falling back to the thread pool), + // the first poll should return Pending. + let _pending = Box::pin(fut).poll_unpin(cx); + + tx.send(()).unwrap(); + + Poll::<()>::Pending + }) + .await; + }); + + // Wait for the first poll + rx.recv().await.unwrap(); + + handle.abort(); + + let res = handle.await.unwrap_err(); + assert!(res.is_cancelled()); +} + +fn create_tmp_files(num_files: usize) -> (Vec, Vec) { + let mut files = Vec::with_capacity(num_files); + for _ in 0..num_files { + let mut tmp = NamedTempFile::new().unwrap(); + let buf = vec![20; 2]; + tmp.write_all(&buf).unwrap(); + let path = tmp.path().to_path_buf(); + files.push((tmp, path)); + } + + files.into_iter().unzip() +} From ea77283962078f18089629a84c3cdf575925a37a Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Sun, 19 Oct 2025 15:52:40 +0530 Subject: [PATCH 2/6] io-uring: re-think fs read with std lib's implementation --- tokio/src/fs/mod.rs | 10 +- tokio/src/fs/read.rs | 39 +------- tokio/src/fs/read_uring.rs | 179 +++++++++++++++++++++++++++++++++++ tokio/src/io/uring/read.rs | 48 ++++++---- tokio/tests/fs_uring_read.rs | 57 ++++++++++- 5 files changed, 272 insertions(+), 61 deletions(-) create mode 100644 tokio/src/fs/read_uring.rs diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index d677645eccb..8701d3a1083 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -237,9 +237,6 @@ pub use self::metadata::metadata; mod open_options; pub use self::open_options::OpenOptions; -cfg_io_uring! { - pub(crate) use self::open_options::UringOpenOptions; -} mod read; pub use self::read::read; @@ -298,6 +295,13 @@ cfg_windows! { pub use self::symlink_file::symlink_file; } +cfg_io_uring! { + pub(crate) mod read_uring; + pub(crate) use self::read_uring::read_uring; + + pub(crate) use self::open_options::UringOpenOptions; +} + use std::io; #[cfg(not(test))] diff --git a/tokio/src/fs/read.rs b/tokio/src/fs/read.rs index 371c1ead6a6..7ea94a577cd 100644 --- a/tokio/src/fs/read.rs +++ b/tokio/src/fs/read.rs @@ -54,6 +54,8 @@ pub async fn read(path: impl AsRef) -> io::Result> { target_os = "linux" ))] { + use crate::fs::read_uring; + let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); if driver_handle.check_and_init()? { @@ -63,40 +65,3 @@ pub async fn read(path: impl AsRef) -> io::Result> { asyncify(move || std::fs::read(path)).await } - -#[cfg(all( - tokio_unstable, - feature = "io-uring", - feature = "rt", - feature = "fs", - target_os = "linux" -))] -async fn read_uring(path: &Path) -> io::Result> { - use crate::{fs::OpenOptions, runtime::driver::op::Op}; - use std::os::fd::OwnedFd; - - let file = OpenOptions::new().read(true).open(path).await?; - - let size = file - .metadata() - .await - .map(|m| m.len() as usize) - .ok() - .unwrap_or(0); - - let buf = Vec::with_capacity(size); - - let fd: OwnedFd = file - .try_into_std() - .expect("unexpected in-flight operation detected") - .into(); - - let (read_size, mut buf) = Op::read(fd, buf)?.await?; - - // SAFETY: - // 1. The buffer is initialized with `size` capacity - // 2. the read_size is the number of bytes read from the file - unsafe { buf.set_len(read_size as _) }; - - Ok(buf) -} diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs new file mode 100644 index 00000000000..edb1926e73a --- /dev/null +++ b/tokio/src/fs/read_uring.rs @@ -0,0 +1,179 @@ +use crate::fs::OpenOptions; +use crate::runtime::driver::op::Op; + +use std::io::ErrorKind; +use std::os::fd::OwnedFd; +use std::path::Path; +use std::{cmp, io}; + +// this algorithm is inspired from rust std lib version 1.90.0 +// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409 +const PROBE_SIZE: usize = 32; +const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; + +// Max bytes we can read using io uring submission at a time +// SAFETY: cannot be higher than u32::MAX for safe cast +const MAX_READ_SIZE: usize = u32::MAX as usize; + +pub(crate) async fn read_uring(path: &Path) -> io::Result> { + let file = OpenOptions::new().read(true).open(path).await?; + + // TODO: use io uring in the future to obtain metadata + let size_hint: Option = file.metadata().await.map(|m| m.len() as usize).ok(); + + let fd: OwnedFd = file + .try_into_std() + .expect("unexpected in-flight operation detected") + .into(); + + // extra single capacity for the whole size to fit without any reallocation + let buf = Vec::with_capacity(size_hint.unwrap_or(0).saturating_add(1)); + + read_to_end_uring(size_hint, fd, buf).await +} + +async fn read_to_end_uring( + size_hint: Option, + mut fd: OwnedFd, + mut buf: Vec, +) -> io::Result> { + let mut offset = 0; + let mut consecutive_short_reads = 0; + + let start_cap = buf.capacity(); + + // if buffer has no room and no size_hint, start with a small probe_read from 0 offset + if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < (PROBE_SIZE) { + let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; + + if size_read == 0 { + return Ok(r_buf); + } + + buf = r_buf; + fd = r_fd; + offset += size_read as u64; + } + + loop { + if buf.len() == buf.capacity() && buf.capacity() == start_cap { + // The buffer might be an exact fit. Let's read into a probe buffer + // and see if it returns `Ok(0)`. If so, we've avoided an + // unnecessary increasing of the capacity. But if not, append the + // probe buffer to the primary buffer and let its capacity grow. + let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; + + if size_read == 0 { + return Ok(r_buf); + } + + buf = r_buf; + fd = r_fd; + offset += size_read as u64; + } + + // buf is full, need more capacity + if buf.len() == buf.capacity() { + if consecutive_short_reads > 1 { + buf.try_reserve(PROBE_SIZE.saturating_mul(consecutive_short_reads))?; + } else { + buf.try_reserve(PROBE_SIZE)?; + } + } + + // doesn't matter if we have a valid size_hint or not, if we do more + // than 2 consecutive_short_reads, gradually increase the buffer + // capacity to read more data at a time + + // prepare the spare capacity to be read into + let spare = buf.capacity() - buf.len(); + let buf_len = cmp::min(spare, MAX_READ_SIZE); + + // SAFETY: buf_len cannot be greater than u32::MAX because max_read_size + // is u32::MAX + let mut read_len = buf_len as u32; + + loop { + // read into spare capacity + let (res, r_fd, mut r_buf) = Op::read(fd, buf, read_len, offset).await; + + match res { + Ok(0) => return Ok(r_buf), + Ok(size_read) => { + let new_len = size_read as usize + r_buf.len(); + // SAFETY: We didn't read more than what as reserved + // as capacity, the _size_read was initialized by the kernel + // via a mutable pointer + unsafe { r_buf.set_len(new_len) } + + let requested = read_len; + + fd = r_fd; + buf = r_buf; + offset += size_read as u64; + read_len -= size_read; + + // 1. In case of no size_hint and a large file, if we keep reading + // PROBE_SIZE, we want to increment number of short reads in order to gradually + // increase read size per Op submission + // 2. In case of small reads by the kernel, also gradually increase + // read size per Op submission to read files in lesser cycles + if size_read <= requested { + consecutive_short_reads += 1; + } else { + consecutive_short_reads = 0; + } + + // keep reading if there's something left to be read + if read_len > 0 { + continue; + } else { + break; + } + } + Err(e) if e.kind() == ErrorKind::Interrupted => { + buf = r_buf; + fd = r_fd; + + continue; + } + Err(e) => return Err(e), + } + } + } +} + +async fn small_probe_read( + mut fd: OwnedFd, + mut buf: Vec, + offset: u64, +) -> io::Result<(u32, OwnedFd, Vec)> { + // don't reserve more than PROBE_SIZE or double the capacity using + // try_reserve beacuse we'll be reading only PROBE_SIZE length + buf.reserve_exact(PROBE_SIZE); + + loop { + let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await; + + match res { + Ok(size_read) => { + let size_read_usize = size_read as usize; + + let new_len = size_read_usize + r_buf.len(); + // SAFETY: We didn't read more than what as reserved + // as capacity, the _size_read was initialized by the kernel + // via a mutable pointer + unsafe { r_buf.set_len(new_len) } + + return Ok((size_read, r_fd, r_buf)); + } + Err(e) if e.kind() == ErrorKind::Interrupted => { + buf = r_buf; + fd = r_fd; + + continue; + } + Err(e) => return Err(e), + } + } +} diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index c7232f93cb5..cb7591b2137 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -1,22 +1,24 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; + use io_uring::{opcode, types}; -use std::{ - io, - os::fd::{AsRawFd, OwnedFd}, -}; +use std::io::{self, Error}; +use std::os::fd::{AsRawFd, OwnedFd}; #[derive(Debug)] pub(crate) struct Read { - buf: Vec, fd: OwnedFd, + buf: Vec, } impl Completable for Read { - type Output = (u32, Vec); - fn complete(self, cqe: CqeResult) -> io::Result { - let res = cqe.result?; + type Output = (io::Result, OwnedFd, Vec); - Ok((res, self.buf)) + fn complete(self, cqe: CqeResult) -> Self::Output { + (cqe.result, self.fd, self.buf) + } + + fn complete_with_error(self, err: Error) -> Self::Output { + (Err(err), self.fd, self.buf) } } @@ -27,16 +29,26 @@ impl Cancellable for Read { } impl Op { - /// Submit a request to open a file. - pub(crate) fn read(fd: OwnedFd, mut buf: Vec) -> io::Result { - let buf_mut_ptr = buf.as_mut_ptr(); - let cap = buf.capacity() as _; - - let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, cap).build(); + // Submit a request to read a FD at given length and offset into a + // dynamic buffer with uinitialized memory. The read happens on unitialized + // buffer and no overwiting happens. + + // SAFETY: The `len` of the amount to be read and the buffer that is passed + // should have capacity > len. + // + // If `len` read is higher than vector capacity then setting its length by + // the caller in terms of size_read can be unsound. + pub(crate) fn read(fd: OwnedFd, mut buf: Vec, len: u32, offset: u64) -> Self { + // don't overwrite on already written part + let written = buf.len(); + let slice: &mut [u8] = &mut buf[written..]; + let buf_mut_ptr = slice.as_mut_ptr(); + + let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) + .offset(offset) + .build(); // SAFETY: Parameters are valid for the entire duration of the operation - let op = unsafe { Op::new(read_op, Read { fd, buf }) }; - - Ok(op) + unsafe { Op::new(read_op, Read { fd, buf }) } } } diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs index 6f83c321249..0b6bbe81166 100644 --- a/tokio/tests/fs_uring_read.rs +++ b/tokio/tests/fs_uring_read.rs @@ -62,7 +62,7 @@ fn shutdown_runtime_while_performing_io_uring_ops() { tokio::spawn(async move { let bytes = read(path).await.unwrap(); - assert_eq!(bytes, vec![20; 2]); + assert_eq!(bytes, vec![20; 1023]); }); // Avoid busy looping. @@ -99,7 +99,7 @@ fn read_many_files() { let path = paths.get(i % NUM_FILES).unwrap().clone(); tracker.spawn(async move { let bytes = read(path).await.unwrap(); - assert_eq!(bytes, vec![20; 2]); + assert_eq!(bytes, vec![20; 1023]); }); } tracker.close(); @@ -112,6 +112,33 @@ fn read_many_files() { } } +#[test] +fn read_small_large_files() { + let rt = multi_rt(256); + rt().block_on(async move { + let tracker = TaskTracker::new(); + + tracker.spawn(async move { + let (_tmp, path) = create_large_temp_file(); + + let bytes = read(path).await.unwrap(); + + assert_eq!(bytes, create_buf(5000)); + }); + + tracker.spawn(async move { + let (_tmp, path) = create_small_temp_file(); + + let bytes = read(path).await.unwrap(); + + assert_eq!(bytes, create_buf(20)); + }); + + tracker.close(); + tracker.wait().await; + }); +} + #[tokio::test] async fn cancel_op_future() { let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); @@ -145,7 +172,7 @@ fn create_tmp_files(num_files: usize) -> (Vec, Vec) { let mut files = Vec::with_capacity(num_files); for _ in 0..num_files { let mut tmp = NamedTempFile::new().unwrap(); - let buf = vec![20; 2]; + let buf = vec![20; 1023]; tmp.write_all(&buf).unwrap(); let path = tmp.path().to_path_buf(); files.push((tmp, path)); @@ -153,3 +180,27 @@ fn create_tmp_files(num_files: usize) -> (Vec, Vec) { files.into_iter().unzip() } + +fn create_large_temp_file() -> (NamedTempFile, PathBuf) { + let mut tmp = NamedTempFile::new().unwrap(); + let buf = create_buf(5000); + + tmp.write_all(&buf).unwrap(); + let path = tmp.path().to_path_buf(); + + (tmp, path) +} + +fn create_small_temp_file() -> (NamedTempFile, PathBuf) { + let mut tmp = NamedTempFile::new().unwrap(); + let buf = create_buf(20); + + tmp.write_all(&buf).unwrap(); + let path = tmp.path().to_path_buf(); + + (tmp, path) +} + +fn create_buf(length: usize) -> Vec { + (0..length).map(|i| i as u8).collect() +} From 71a687ae65981e95d3d07bf5be5a729731c3bb6d Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Wed, 5 Nov 2025 21:27:40 +0530 Subject: [PATCH 3/6] io-uring: Use stack method to prevent realloc in case of 0 size read --- tokio/src/fs/read_uring.rs | 67 +++++++++++++++--------------------- tokio/src/io/uring/read.rs | 15 +++++--- tokio/tests/fs_uring_read.rs | 28 +++++---------- 3 files changed, 46 insertions(+), 64 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index edb1926e73a..037ca3eda85 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -1,10 +1,10 @@ use crate::fs::OpenOptions; use crate::runtime::driver::op::Op; +use std::io; use std::io::ErrorKind; use std::os::fd::OwnedFd; use std::path::Path; -use std::{cmp, io}; // this algorithm is inspired from rust std lib version 1.90.0 // https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409 @@ -27,7 +27,7 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { .into(); // extra single capacity for the whole size to fit without any reallocation - let buf = Vec::with_capacity(size_hint.unwrap_or(0).saturating_add(1)); + let buf = Vec::with_capacity(size_hint.unwrap_or(0)); read_to_end_uring(size_hint, fd, buf).await } @@ -38,12 +38,11 @@ async fn read_to_end_uring( mut buf: Vec, ) -> io::Result> { let mut offset = 0; - let mut consecutive_short_reads = 0; let start_cap = buf.capacity(); // if buffer has no room and no size_hint, start with a small probe_read from 0 offset - if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < (PROBE_SIZE) { + if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE { let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; if size_read == 0 { @@ -74,11 +73,7 @@ async fn read_to_end_uring( // buf is full, need more capacity if buf.len() == buf.capacity() { - if consecutive_short_reads > 1 { - buf.try_reserve(PROBE_SIZE.saturating_mul(consecutive_short_reads))?; - } else { - buf.try_reserve(PROBE_SIZE)?; - } + buf.try_reserve(PROBE_SIZE)?; } // doesn't matter if we have a valid size_hint or not, if we do more @@ -86,8 +81,7 @@ async fn read_to_end_uring( // capacity to read more data at a time // prepare the spare capacity to be read into - let spare = buf.capacity() - buf.len(); - let buf_len = cmp::min(spare, MAX_READ_SIZE); + let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE); // SAFETY: buf_len cannot be greater than u32::MAX because max_read_size // is u32::MAX @@ -95,35 +89,16 @@ async fn read_to_end_uring( loop { // read into spare capacity - let (res, r_fd, mut r_buf) = Op::read(fd, buf, read_len, offset).await; + let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await; match res { Ok(0) => return Ok(r_buf), Ok(size_read) => { - let new_len = size_read as usize + r_buf.len(); - // SAFETY: We didn't read more than what as reserved - // as capacity, the _size_read was initialized by the kernel - // via a mutable pointer - unsafe { r_buf.set_len(new_len) } - - let requested = read_len; - fd = r_fd; buf = r_buf; offset += size_read as u64; read_len -= size_read; - // 1. In case of no size_hint and a large file, if we keep reading - // PROBE_SIZE, we want to increment number of short reads in order to gradually - // increase read size per Op submission - // 2. In case of small reads by the kernel, also gradually increase - // read size per Op submission to read files in lesser cycles - if size_read <= requested { - consecutive_short_reads += 1; - } else { - consecutive_short_reads = 0; - } - // keep reading if there's something left to be read if read_len > 0 { continue; @@ -148,22 +123,34 @@ async fn small_probe_read( mut buf: Vec, offset: u64, ) -> io::Result<(u32, OwnedFd, Vec)> { - // don't reserve more than PROBE_SIZE or double the capacity using - // try_reserve beacuse we'll be reading only PROBE_SIZE length - buf.reserve_exact(PROBE_SIZE); + let mut temp_arr = [0; PROBE_SIZE]; + let has_enough = buf.len() > PROBE_SIZE; + + if has_enough { + // if we have more than PROBE_SIZE bytes in the buffer already then + // don't call reserve as we might potentially read 0 bytes + let back_bytes_len = buf.len() - PROBE_SIZE; + temp_arr.copy_from_slice(&buf[back_bytes_len..]); + // We're decreasing the length of the buffer and len is greater + // than PROBE_SIZE. So we can read into the discarded length + buf.truncate(back_bytes_len); + } else { + // we don't even have PROBE_SIZE length in the buffer, we need this + // reservation + buf.reserve_exact(PROBE_SIZE); + } loop { let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await; match res { + // return early if we inserted into reserved PROBE_SIZE + // bytes + Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)), Ok(size_read) => { - let size_read_usize = size_read as usize; + let old_len = r_buf.len() - (size_read as usize); - let new_len = size_read_usize + r_buf.len(); - // SAFETY: We didn't read more than what as reserved - // as capacity, the _size_read was initialized by the kernel - // via a mutable pointer - unsafe { r_buf.set_len(new_len) } + r_buf.splice(old_len..old_len, temp_arr); return Ok((size_read, r_fd, r_buf)); } diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index cb7591b2137..1c9f6429305 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -14,7 +14,15 @@ impl Completable for Read { type Output = (io::Result, OwnedFd, Vec); fn complete(self, cqe: CqeResult) -> Self::Output { - (cqe.result, self.fd, self.buf) + let mut buf = self.buf; + + if let Ok(len) = cqe.result { + let new_len = buf.len() + len as usize; + // SAFETY: Kernel read len bytes + unsafe { buf.set_len(new_len) }; + } + + (cqe.result, self.fd, buf) } fn complete_with_error(self, err: Error) -> Self::Output { @@ -40,9 +48,8 @@ impl Op { // the caller in terms of size_read can be unsound. pub(crate) fn read(fd: OwnedFd, mut buf: Vec, len: u32, offset: u64) -> Self { // don't overwrite on already written part - let written = buf.len(); - let slice: &mut [u8] = &mut buf[written..]; - let buf_mut_ptr = slice.as_mut_ptr(); + assert!(buf.spare_capacity_mut().len() <= len as usize); + let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast(); let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) .offset(offset) diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs index 0b6bbe81166..aee3f8a6c94 100644 --- a/tokio/tests/fs_uring_read.rs +++ b/tokio/tests/fs_uring_read.rs @@ -112,31 +112,19 @@ fn read_many_files() { } } -#[test] -fn read_small_large_files() { - let rt = multi_rt(256); - rt().block_on(async move { - let tracker = TaskTracker::new(); - - tracker.spawn(async move { - let (_tmp, path) = create_large_temp_file(); +#[tokio::test] +async fn read_small_large_files() { + let (_tmp, path) = create_large_temp_file(); - let bytes = read(path).await.unwrap(); + let bytes = read(path).await.unwrap(); - assert_eq!(bytes, create_buf(5000)); - }); + assert_eq!(bytes, create_buf(5000)); - tracker.spawn(async move { - let (_tmp, path) = create_small_temp_file(); + let (_tmp, path) = create_small_temp_file(); - let bytes = read(path).await.unwrap(); + let bytes = read(path).await.unwrap(); - assert_eq!(bytes, create_buf(20)); - }); - - tracker.close(); - tracker.wait().await; - }); + assert_eq!(bytes, create_buf(20)); } #[tokio::test] From e686c4bb61eacad343d7fbc4e03b73a63a8fa675 Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Tue, 11 Nov 2025 12:25:04 +0530 Subject: [PATCH 4/6] io-uring: Fix check and reduce MAX_READ_SIZE to 64 blocks Reading 64 MB chunks at a time and keeping the kernel busy surpases std::fs::read time with unoptimized io_uring one being 1.12% fast --- tokio/src/fs/read_uring.rs | 3 ++- tokio/src/io/uring/read.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 037ca3eda85..15f452ef0d8 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -13,7 +13,8 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; // Max bytes we can read using io uring submission at a time // SAFETY: cannot be higher than u32::MAX for safe cast -const MAX_READ_SIZE: usize = u32::MAX as usize; +// Set to read max 64 blocks at time +const MAX_READ_SIZE: usize = 64 * 1024 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { let file = OpenOptions::new().read(true).open(path).await?; diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index 1c9f6429305..8e37bc13d21 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -48,7 +48,7 @@ impl Op { // the caller in terms of size_read can be unsound. pub(crate) fn read(fd: OwnedFd, mut buf: Vec, len: u32, offset: u64) -> Self { // don't overwrite on already written part - assert!(buf.spare_capacity_mut().len() <= len as usize); + assert!(buf.spare_capacity_mut().len() >= len as usize); let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast(); let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) From 4ad9b05a9699d6f77815fd9c559eddc5a4c97a4c Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Wed, 12 Nov 2025 17:03:21 +0530 Subject: [PATCH 5/6] io-uring: Create `op_read` helper function Fix tests fail on allocation failure Doc fix and fix double subtraction of offset Fix typos and use assert the stopped task's first poll Add comments and panic on failed u32 conversion --- tokio/src/fs/read.rs | 10 +++ tokio/src/fs/read_uring.rs | 148 ++++++++++++++--------------------- tokio/src/io/uring/read.rs | 4 +- tokio/tests/fs_uring_read.rs | 54 +++++++------ 4 files changed, 102 insertions(+), 114 deletions(-) diff --git a/tokio/src/fs/read.rs b/tokio/src/fs/read.rs index 7ea94a577cd..955c3592c85 100644 --- a/tokio/src/fs/read.rs +++ b/tokio/src/fs/read.rs @@ -30,6 +30,16 @@ use std::{io, path::Path}; /// /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted /// +/// # io_uring support +/// +/// On Linux, you can also use io_uring for executing system calls. To enable +/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time, +/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring` +/// runtime option. +/// +/// Support for io_uring is currently experimental, so its behavior may change +/// or it may be removed in future versions. +/// /// # Examples /// /// ```no_run diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 15f452ef0d8..ae3be4ad6ee 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -13,7 +13,7 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; // Max bytes we can read using io uring submission at a time // SAFETY: cannot be higher than u32::MAX for safe cast -// Set to read max 64 blocks at time +// Set to read max 64 MiB at time const MAX_READ_SIZE: usize = 64 * 1024 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { @@ -27,41 +27,26 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { .expect("unexpected in-flight operation detected") .into(); - // extra single capacity for the whole size to fit without any reallocation - let buf = Vec::with_capacity(size_hint.unwrap_or(0)); + let mut buf = Vec::new(); - read_to_end_uring(size_hint, fd, buf).await + if let Some(size_hint) = size_hint { + buf.try_reserve(size_hint)?; + } + + read_to_end_uring(fd, buf).await } -async fn read_to_end_uring( - size_hint: Option, - mut fd: OwnedFd, - mut buf: Vec, -) -> io::Result> { +async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec) -> io::Result> { let mut offset = 0; - let start_cap = buf.capacity(); - // if buffer has no room and no size_hint, start with a small probe_read from 0 offset - if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE { - let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; - - if size_read == 0 { - return Ok(r_buf); - } - - buf = r_buf; - fd = r_fd; - offset += size_read as u64; - } - loop { - if buf.len() == buf.capacity() && buf.capacity() == start_cap { + if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() > PROBE_SIZE { // The buffer might be an exact fit. Let's read into a probe buffer // and see if it returns `Ok(0)`. If so, we've avoided an // unnecessary increasing of the capacity. But if not, append the // probe buffer to the primary buffer and let its capacity grow. - let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; + let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?; if size_read == 0 { return Ok(r_buf); @@ -69,7 +54,6 @@ async fn read_to_end_uring( buf = r_buf; fd = r_fd; - offset += size_read as u64; } // buf is full, need more capacity @@ -77,91 +61,77 @@ async fn read_to_end_uring( buf.try_reserve(PROBE_SIZE)?; } - // doesn't matter if we have a valid size_hint or not, if we do more - // than 2 consecutive_short_reads, gradually increase the buffer - // capacity to read more data at a time - // prepare the spare capacity to be read into let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE); - // SAFETY: buf_len cannot be greater than u32::MAX because max_read_size - // is u32::MAX - let mut read_len = buf_len as u32; - - loop { - // read into spare capacity - let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await; - - match res { - Ok(0) => return Ok(r_buf), - Ok(size_read) => { - fd = r_fd; - buf = r_buf; - offset += size_read as u64; - read_len -= size_read; - - // keep reading if there's something left to be read - if read_len > 0 { - continue; - } else { - break; - } - } - Err(e) if e.kind() == ErrorKind::Interrupted => { - buf = r_buf; - fd = r_fd; + // buf_len cannot be greater than u32::MAX because MAX_READ_SIZE + // is less than u32::MAX + let read_len = u32::try_from(buf_len).expect("buf_len must always fit in u32"); - continue; - } - Err(e) => return Err(e), - } + // read into spare capacity + let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?; + + if size_read == 0 { + return Ok(r_buf); } + + fd = r_fd; + buf = r_buf; } } async fn small_probe_read( - mut fd: OwnedFd, + fd: OwnedFd, mut buf: Vec, - offset: u64, + offset: &mut u64, ) -> io::Result<(u32, OwnedFd, Vec)> { + let read_len = PROBE_SIZE_U32; + let mut temp_arr = [0; PROBE_SIZE]; - let has_enough = buf.len() > PROBE_SIZE; - - if has_enough { - // if we have more than PROBE_SIZE bytes in the buffer already then - // don't call reserve as we might potentially read 0 bytes - let back_bytes_len = buf.len() - PROBE_SIZE; - temp_arr.copy_from_slice(&buf[back_bytes_len..]); - // We're decreasing the length of the buffer and len is greater - // than PROBE_SIZE. So we can read into the discarded length - buf.truncate(back_bytes_len); - } else { - // we don't even have PROBE_SIZE length in the buffer, we need this - // reservation - buf.reserve_exact(PROBE_SIZE); - } + // we don't call this function if buffer's length < PROBE_SIZE + let back_bytes_len = buf.len() - PROBE_SIZE; + + temp_arr.copy_from_slice(&buf[back_bytes_len..]); + + // We're decreasing the length of the buffer and len is greater + // than PROBE_SIZE. So we can read into the discarded length + buf.truncate(back_bytes_len); + let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?; + // If `size_read` returns zero due to reasons such as buffer's exact fit, + // then this `try_reserve` does not perform allocation. + r_buf.try_reserve(PROBE_SIZE)?; + r_buf.splice(back_bytes_len..back_bytes_len, temp_arr); + + Ok((size_read, r_fd, r_buf)) +} + +async fn op_read( + mut fd: OwnedFd, + mut buf: Vec, + offset: &mut u64, + mut read_len: u32, +) -> io::Result<(u32, OwnedFd, Vec)> { loop { - let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await; + let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await; match res { - // return early if we inserted into reserved PROBE_SIZE - // bytes - Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)), + Err(e) if e.kind() == ErrorKind::Interrupted => { + buf = r_buf; + fd = r_fd; + } + Err(e) => return Err(e), Ok(size_read) => { - let old_len = r_buf.len() - (size_read as usize); + *offset += size_read as u64; + read_len -= size_read; - r_buf.splice(old_len..old_len, temp_arr); + if read_len == 0 || size_read == 0 { + return Ok((size_read, r_fd, r_buf)); + } - return Ok((size_read, r_fd, r_buf)); - } - Err(e) if e.kind() == ErrorKind::Interrupted => { buf = r_buf; fd = r_fd; - - continue; } - Err(e) => return Err(e), } } } diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index 8e37bc13d21..9d6cc541254 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -38,8 +38,8 @@ impl Cancellable for Read { impl Op { // Submit a request to read a FD at given length and offset into a - // dynamic buffer with uinitialized memory. The read happens on unitialized - // buffer and no overwiting happens. + // dynamic buffer with uninitialized memory. The read happens on unitialized + // buffer and no overwriting happens. // SAFETY: The `len` of the amount to be read and the buffer that is passed // should have capacity > len. diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs index aee3f8a6c94..5e8335e70c2 100644 --- a/tokio/tests/fs_uring_read.rs +++ b/tokio/tests/fs_uring_read.rs @@ -8,17 +8,17 @@ target_os = "linux" ))] -use futures::future::FutureExt; +use futures::future::Future; +use std::future::poll_fn; use std::io::Write; +use std::path::PathBuf; use std::sync::mpsc; use std::task::Poll; use std::time::Duration; -use std::{future::poll_fn, path::PathBuf}; use tempfile::NamedTempFile; -use tokio::{ - fs::read, - runtime::{Builder, Runtime}, -}; +use tokio::fs::read; +use tokio::runtime::{Builder, Runtime}; +use tokio_test::assert_pending; use tokio_util::task::TaskTracker; fn multi_rt(n: usize) -> Box Runtime> { @@ -49,34 +49,38 @@ fn rt_combinations() -> Vec Runtime>> { #[test] fn shutdown_runtime_while_performing_io_uring_ops() { fn run(rt: Runtime) { - let (tx, rx) = mpsc::channel(); let (done_tx, done_rx) = mpsc::channel(); - let (_tmp, path) = create_tmp_files(1); + // keep 100 permits + const N: i32 = 100; rt.spawn(async move { let path = path[0].clone(); // spawning a bunch of uring operations. - loop { + let mut futs = vec![]; + + // spawning a bunch of uring operations. + for _ in 0..N { let path = path.clone(); - tokio::spawn(async move { - let bytes = read(path).await.unwrap(); + let mut fut = Box::pin(read(path)); - assert_eq!(bytes, vec![20; 1023]); - }); + poll_fn(|cx| { + assert_pending!(fut.as_mut().poll(cx)); + Poll::<()>::Pending + }) + .await; - // Avoid busy looping. - tokio::task::yield_now().await; + futs.push(fut); } + + tokio::task::yield_now().await; }); std::thread::spawn(move || { - let rt: Runtime = rx.recv().unwrap(); rt.shutdown_timeout(Duration::from_millis(300)); done_tx.send(()).unwrap(); }); - tx.send(rt).unwrap(); done_rx.recv().unwrap(); } @@ -130,17 +134,19 @@ async fn read_small_large_files() { #[tokio::test] async fn cancel_op_future() { let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); + let path = path[0].clone(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { - poll_fn(|cx| { - let fut = read(&path[0]); + let fut = read(path.clone()); + tokio::pin!(fut); + poll_fn(move |cx| { // If io_uring is enabled (and not falling back to the thread pool), // the first poll should return Pending. - let _pending = Box::pin(fut).poll_unpin(cx); - - tx.send(()).unwrap(); + assert_pending!(fut.as_mut().poll(cx)); + tx.send(true).unwrap(); Poll::<()>::Pending }) @@ -148,7 +154,9 @@ async fn cancel_op_future() { }); // Wait for the first poll - rx.recv().await.unwrap(); + + let val = rx.recv().await; + assert!(val.unwrap()); handle.abort(); From dfe83fef933cb059afafef30f026c8607350a8c1 Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Tue, 25 Nov 2025 23:35:02 +0530 Subject: [PATCH 6/6] io-uring: Don't return `size_read` but boolean for EOF check Check the EOF internally in the `op_read` function Fix test with noop waker --- tokio/src/fs/read_uring.rs | 28 +++++++++++++++++----------- tokio/tests/fs_uring_read.rs | 6 +++--- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index ae3be4ad6ee..5cf0542fc8e 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -41,14 +41,14 @@ async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec) -> io::Result PROBE_SIZE { + if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() >= PROBE_SIZE { // The buffer might be an exact fit. Let's read into a probe buffer // and see if it returns `Ok(0)`. If so, we've avoided an // unnecessary increasing of the capacity. But if not, append the // probe buffer to the primary buffer and let its capacity grow. - let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?; + let (r_fd, r_buf, is_eof) = small_probe_read(fd, buf, &mut offset).await?; - if size_read == 0 { + if is_eof { return Ok(r_buf); } @@ -69,9 +69,9 @@ async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec) -> io::Result, offset: &mut u64, -) -> io::Result<(u32, OwnedFd, Vec)> { +) -> io::Result<(OwnedFd, Vec, bool)> { let read_len = PROBE_SIZE_U32; let mut temp_arr = [0; PROBE_SIZE]; @@ -97,21 +97,25 @@ async fn small_probe_read( // than PROBE_SIZE. So we can read into the discarded length buf.truncate(back_bytes_len); - let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?; + let (r_fd, mut r_buf, is_eof) = op_read(fd, buf, offset, read_len).await?; // If `size_read` returns zero due to reasons such as buffer's exact fit, // then this `try_reserve` does not perform allocation. r_buf.try_reserve(PROBE_SIZE)?; r_buf.splice(back_bytes_len..back_bytes_len, temp_arr); - Ok((size_read, r_fd, r_buf)) + Ok((r_fd, r_buf, is_eof)) } +// Takes a amount of length to read and reads until we exhaust the given length +// to read or EOF reached. +// +// Returns size_read, the result and EOF reached or not async fn op_read( mut fd: OwnedFd, mut buf: Vec, offset: &mut u64, mut read_len: u32, -) -> io::Result<(u32, OwnedFd, Vec)> { +) -> io::Result<(OwnedFd, Vec, bool)> { loop { let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await; @@ -125,8 +129,10 @@ async fn op_read( *offset += size_read as u64; read_len -= size_read; - if read_len == 0 || size_read == 0 { - return Ok((size_read, r_fd, r_buf)); + let is_eof = size_read == 0; + + if read_len == 0 || is_eof { + return Ok((r_fd, r_buf, is_eof)); } buf = r_buf; diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs index 5e8335e70c2..4ce6d454ab6 100644 --- a/tokio/tests/fs_uring_read.rs +++ b/tokio/tests/fs_uring_read.rs @@ -13,7 +13,7 @@ use std::future::poll_fn; use std::io::Write; use std::path::PathBuf; use std::sync::mpsc; -use std::task::Poll; +use std::task::{Context, Poll, Waker}; use std::time::Duration; use tempfile::NamedTempFile; use tokio::fs::read; @@ -142,10 +142,10 @@ async fn cancel_op_future() { let fut = read(path.clone()); tokio::pin!(fut); - poll_fn(move |cx| { + poll_fn(move |_| { // If io_uring is enabled (and not falling back to the thread pool), // the first poll should return Pending. - assert_pending!(fut.as_mut().poll(cx)); + assert_pending!(fut.as_mut().poll(&mut Context::from_waker(Waker::noop()))); tx.send(true).unwrap(); Poll::<()>::Pending