Skip to content

Conversation

@Daksh14
Copy link
Contributor

@Daksh14 Daksh14 commented Oct 19, 2025

Motivation

We slowly use io uring everywhere here I made the simple change of supporting fs::read however it might not be as simple. Let me know if the unsafe I used is correct or not.

We currently use the blocking std::fs::MetaData to obtain file size for buffer capacity and extend the length of the vector according to the bytes read in the CQE. This implementation sounds good on paper to me.

Later we should implement an internal statx helper, in this PR or a seperate PR to make our uring implementation less painful to use. As this pr failed #7616

Lets put statx helper in different PR to avoid merging an inefficient read implementation given io uring is about being more efficient in file IO

Solution

Continue adopting io uring

strace on a tokio::fs::read after this change

io_uring_setup(256, {flags=0, sq_thread_cpu=0, sq_thread_idle=0, sq_entries=256, cq_entries=512, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABL
E|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS
|IORING_FEAT_RSRC_TAGS|IORING_FEAT_CQE_SKIP|IORING_FEAT_LINKED_FILE|IORING_FEAT_REG_REG_RING|IORING_FEAT_RECVSEND_BUNDLE|IORING_FEAT_MIN_TIMEOUT|IORING_FEAT_RW_ATTR, sq_off=
{head=0, tail=4, ring_mask=16, ring_entries=24, flags=36, dropped=32, array=8256, user_addr=0}, cq_off={head=8, tail=12, ring_mask=20, ring_entries=28, overflow=44, cqes=64,
 flags=40, user_addr=0}}) = 9
mmap(NULL, 16384, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 9, 0x10000000) = 0xfaf0bf1e2000
mmap(NULL, 9280, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 9, 0) = 0xfaf0be71d000
epoll_ctl(5, EPOLL_CTL_ADD, 9, {events=EPOLLIN|EPOLLRDHUP|EPOLLET, data=0}) = 0
io_uring_enter(9, 1, 0, 0, NULL, 128)   = 1
futex(0xfaf0bf2557f0, FUTEX_WAIT_PRIVATE, 1, NULL) = 0
mmap(NULL, 2162688, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0xfaf0be50d000
mprotect(0xfaf0be51d000, 2097152, PROT_READ|PROT_WRITE) = 0
rt_sigprocmask(SIG_BLOCK, ~[], [], 8)   = 0
clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0xfaf0be71c150, parent_
tid=0xfaf0be71c150, exit_signal=0, stack=0xfaf0be50d000, stack_size=0x20e960, tls=0xfaf0be71c7a0} => {parent_tid=[746758]}, 88) = 746758
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
futex(0xfaf0be71c850, FUTEX_WAKE_PRIVATE, 1) = 1
io_uring_enter(9, 1, 0, 0, NULL, 128)   = 1
futex(0xfaf0bf2557f0, FUTEX_WAIT_PRIVATE, 1, NULL) = -1 EAGAIN (Resource temporarily unavailable)
close(10)                               = 0

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-fs Module: tokio/fs labels Oct 19, 2025
@Darksonn Darksonn changed the title Fs read io uring fs: support io_uring with tokio::fs::read Oct 20, 2025
let mut offset = 0;

while size_read < size {
let left_to_read = (size - size_read) as u32;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let left_to_read = (size - size_read) as u32;
let left_to_read = u32::try_from(size - size_read).unwrap_or(u32::MAX);

To properly support files bigger than 4GB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max read size at a time is u32::MAX, we read the rest in other next iterations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, if we know we're reading more than u32::MAX then we can batch 2 read requests to avoid extra syscalls

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 7 times, most recently from 6237a4c to 636cfb8 Compare October 27, 2025 13:36
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks complicated, I will review it incrementally.

Copy link
Member

@mox692 mox692 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked all of the details in read_uring.rs, but left some comments I've noticed so far.

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 2 times, most recently from e6c6ce7 to b9c3885 Compare November 2, 2025 17:12
@Daksh14 Daksh14 requested review from ADD-SP, martin-g and mox692 November 3, 2025 06:52
@Daksh14 Daksh14 requested a review from ADD-SP November 8, 2025 12:42
@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 4 times, most recently from 19bda66 to c1f8dc9 Compare November 12, 2025 11:35
@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 2 times, most recently from b29ed97 to 2acee44 Compare November 12, 2025 11:45
@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 2 times, most recently from 6261e0e to 974be76 Compare November 12, 2025 11:56

let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, &mut read_len).await?;

r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle allocation failure by error rather than panic by allocating before calling splice. Note that this doesn't allocate if size_read == 0 since the capacity is guaranteed enough in that case.

Suggested change
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
r_buf.try_reserve(PROBE_SIZE)?;
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);

Copy link
Contributor Author

@Daksh14 Daksh14 Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn Will this splice write in the reserved area? Because we can only index in the length and not capacity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The length of r_buf before the op_read is exactly back_bytes_len, so after the op_read we have back_bytes_len <= r_buf.len() which is the requirement for using splice.

Comment on lines 73 to 79
std::thread::spawn(move || {
let rt: Runtime = rx.recv().unwrap();
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});

tx.send(rt).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equivalent to just moving the runtime.

Suggested change
std::thread::spawn(move || {
let rt: Runtime = rx.recv().unwrap();
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});
tx.send(rt).unwrap();
std::thread::spawn(move || {
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});

If you wanted an actual sleep for the loop to make some progress, then what you have now does not achieve that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn Should I sleep in the std::thread so the loop can make progress? Is that allowed or I shouldn't do it

Copy link
Member

@ADD-SP ADD-SP Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Poll the read(path).await manually to submit it to the kernel.
  2. Then increase the semaphore.
  3. Keep the spawn task pending for ever.

So that we can wait on the semaphore and then shutdown the runtime.

Copy link
Member

@ADD-SP ADD-SP Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this work for multi-thread runtime? For current thread runtime, this won't work. But I believe we can apply the similar pattern to the current thread runtime.

        rt.spawn(async move {
            let path = path[0].clone();
            let mut futs = vec![];

            // spawning a bunch of uring operations.
            for _ in 0..N {
                let path = path.clone();
                let cl = Arc::clone(&cl);
                let fut = Box::pin(read(path));
                assert_pending!(fut.poll(cx));
                futs.push(fut);
            }

            pending_forever().await;
        });

        std::thread::spawn(move || {
            rt.shutdown_timeout(Duration::from_millis(300));
            done_tx.send(()).unwrap();
        });

        done_rx.recv().unwrap();

Comment on lines 136 to 141
poll_fn(|cx| {
let fut = read(&path[0]);

// If io_uring is enabled (and not falling back to the thread pool),
// the first poll should return Pending.
let _pending = Box::pin(fut).poll_unpin(cx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code currently drops the read future right away. If you want the future to stay around until the handle.abort() call, then you need to modify this code.

Suggested change
poll_fn(|cx| {
let fut = read(&path[0]);
// If io_uring is enabled (and not falling back to the thread pool),
// the first poll should return Pending.
let _pending = Box::pin(fut).poll_unpin(cx);
let fut = read(&path[0]);
tokio::pin!(fut);
poll_fn(|cx| {
// If io_uring is enabled (and not falling back to the thread pool),
// the first poll should return Pending.
let _pending = fut.as_mut().poll(cx);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pushed a slightly different version of this

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 3 times, most recently from b599115 to 44f8f98 Compare November 14, 2025 19:40
let start_cap = buf.capacity();

// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just delete this if. If the size hint is zero/missing, then we always have buf.capacity() == 0. If we delete it, then it will fall through to the loop, which will do the same thing as what this block does.

feature = "rt",
feature = "fs",
target_os = "linux"
))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why cfg_io_uring! { is not used here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment its following the currently used code block macro here in tokio/src/fs/open_options.rs, I can change those in a different "refactor" PR https://github.com/tokio-rs/tokio/blob/master/tokio/src/fs/open_options.rs#L592


// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 blocks at time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of block is meant here ?
File system blocks are usually around 4096 bytes. Here we are talking about a block of 1MiB.

Suggested change
// Set to read max 64 blocks at time
// Set to read max 64 MiB at time

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment this number came after benchmarking with io-uring against std::fs::read as its a reasonable amount of data for kernel to copy


// Wait for the first poll

let _ = rx.try_recv();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why try_recv() ?
This may return Empty before the task has a chance to be executed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assert with recv await

    Fix tests fail on allocation failure

    Doc fix and fix double subtraction of offset

    Fix typos and use assert the stopped task's first poll
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio Area: The main tokio crate M-fs Module: tokio/fs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants