Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions tokio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,6 @@ pub use self::metadata::metadata;

mod open_options;
pub use self::open_options::OpenOptions;
cfg_io_uring! {
pub(crate) use self::open_options::UringOpenOptions;
}

mod read;
pub use self::read::read;
Expand Down Expand Up @@ -298,6 +295,13 @@ cfg_windows! {
pub use self::symlink_file::symlink_file;
}

cfg_io_uring! {
pub(crate) mod read_uring;
pub(crate) use self::read_uring::read_uring;

pub(crate) use self::open_options::UringOpenOptions;
}

use std::io;

#[cfg(not(test))]
Expand Down
28 changes: 28 additions & 0 deletions tokio/src/fs/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ use std::{io, path::Path};
///
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
///
/// # io_uring support
///
/// On Linux, you can also use io_uring for executing system calls. To enable
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
/// runtime option.
///
/// Support for io_uring is currently experimental, so its behavior may change
/// or it may be removed in future versions.
///
/// # Examples
///
/// ```no_run
Expand All @@ -45,5 +55,23 @@ use std::{io, path::Path};
/// ```
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why cfg_io_uring! { is not used here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment its following the currently used code block macro here in tokio/src/fs/open_options.rs, I can change those in a different "refactor" PR https://github.com/tokio-rs/tokio/blob/master/tokio/src/fs/open_options.rs#L592

{
use crate::fs::read_uring;

let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return read_uring(&path).await;
}
}

asyncify(move || std::fs::read(path)).await
}
135 changes: 135 additions & 0 deletions tokio/src/fs/read_uring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::fs::OpenOptions;
use crate::runtime::driver::op::Op;

use std::io;
use std::io::ErrorKind;
use std::os::fd::OwnedFd;
use std::path::Path;

// this algorithm is inspired from rust std lib version 1.90.0
// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
const PROBE_SIZE: usize = 32;
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;

// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 MiB at time
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;

pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
let file = OpenOptions::new().read(true).open(path).await?;

// TODO: use io uring in the future to obtain metadata
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();

let fd: OwnedFd = file
.try_into_std()
.expect("unexpected in-flight operation detected")
.into();

let mut buf = Vec::new();

if let Some(size_hint) = size_hint {
buf.try_reserve(size_hint)?;
}

read_to_end_uring(fd, buf).await
}

async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
let mut offset = 0;
let start_cap = buf.capacity();

loop {
if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() > PROBE_SIZE {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary increasing of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;

if size_read == 0 {
return Ok(r_buf);
}

buf = r_buf;
fd = r_fd;
}

// buf is full, need more capacity
if buf.len() == buf.capacity() {
buf.try_reserve(PROBE_SIZE)?;
}

// prepare the spare capacity to be read into
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);

// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
// is less than u32::MAX
let read_len = buf_len as u32;

// read into spare capacity
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?;

if size_read == 0 {
return Ok(r_buf);
}

fd = r_fd;
buf = r_buf;
}
}

async fn small_probe_read(
fd: OwnedFd,
mut buf: Vec<u8>,
offset: &mut u64,
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
let read_len = PROBE_SIZE_U32;

let mut temp_arr = [0; PROBE_SIZE];
let back_bytes_len = buf.len() - PROBE_SIZE;

temp_arr.copy_from_slice(&buf[back_bytes_len..]);

// We're decreasing the length of the buffer and len is greater
// than PROBE_SIZE. So we can read into the discarded length
buf.truncate(back_bytes_len);

let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?;

r_buf.try_reserve(PROBE_SIZE)?;
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle allocation failure by error rather than panic by allocating before calling splice. Note that this doesn't allocate if size_read == 0 since the capacity is guaranteed enough in that case.

Suggested change
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
r_buf.try_reserve(PROBE_SIZE)?;
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);

Copy link
Contributor Author

@Daksh14 Daksh14 Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn Will this splice write in the reserved area? Because we can only index in the length and not capacity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The length of r_buf before the op_read is exactly back_bytes_len, so after the op_read we have back_bytes_len <= r_buf.len() which is the requirement for using splice.


Ok((size_read, r_fd, r_buf))
}

async fn op_read(
mut fd: OwnedFd,
mut buf: Vec<u8>,
offset: &mut u64,
mut read_len: u32,
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
loop {
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;

match res {
Err(e) if e.kind() == ErrorKind::Interrupted => {
buf = r_buf;
fd = r_fd;
}
Err(e) => return Err(e),
Ok(size_read) => {
*offset += size_read as u64;
read_len -= size_read;

if read_len == 0 || size_read == 0 {
return Ok((size_read, r_fd, r_buf));
}

buf = r_buf;
fd = r_fd;
}
}
}
}
1 change: 1 addition & 0 deletions tokio/src/io/uring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod open;
pub(crate) mod read;
pub(crate) mod utils;
pub(crate) mod write;
61 changes: 61 additions & 0 deletions tokio/src/io/uring/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};

use io_uring::{opcode, types};
use std::io::{self, Error};
use std::os::fd::{AsRawFd, OwnedFd};

#[derive(Debug)]
pub(crate) struct Read {
fd: OwnedFd,
buf: Vec<u8>,
}

impl Completable for Read {
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);

fn complete(self, cqe: CqeResult) -> Self::Output {
let mut buf = self.buf;

if let Ok(len) = cqe.result {
let new_len = buf.len() + len as usize;
// SAFETY: Kernel read len bytes
unsafe { buf.set_len(new_len) };
}

(cqe.result, self.fd, buf)
}

fn complete_with_error(self, err: Error) -> Self::Output {
(Err(err), self.fd, self.buf)
}
}

impl Cancellable for Read {
fn cancel(self) -> CancelData {
CancelData::Read(self)
}
}

impl Op<Read> {
// Submit a request to read a FD at given length and offset into a
// dynamic buffer with uninitialized memory. The read happens on unitialized
// buffer and no overwriting happens.

// SAFETY: The `len` of the amount to be read and the buffer that is passed
// should have capacity > len.
//
// If `len` read is higher than vector capacity then setting its length by
// the caller in terms of size_read can be unsound.
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
// don't overwrite on already written part
assert!(buf.spare_capacity_mut().len() >= len as usize);
let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast();

let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
.offset(offset)
.build();

// SAFETY: Parameters are valid for the entire duration of the operation
unsafe { Op::new(read_op, Read { fd, buf }) }
}
}
2 changes: 2 additions & 0 deletions tokio/src/runtime/driver/op.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::io::uring::open::Open;
use crate::io::uring::read::Read;
use crate::io::uring::write::Write;
use crate::runtime::Handle;

Expand All @@ -17,6 +18,7 @@ use std::task::{Context, Poll, Waker};
pub(crate) enum CancelData {
Open(Open),
Write(Write),
Read(Read),
}

#[derive(Debug)]
Expand Down
Loading