-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
fs: support io_uring with tokio::fs::read
#7696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
f4c97c2 to
7e73f17
Compare
tokio/src/fs/read.rs
Outdated
| let mut offset = 0; | ||
|
|
||
| while size_read < size { | ||
| let left_to_read = (size - size_read) as u32; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let left_to_read = (size - size_read) as u32; | |
| let left_to_read = u32::try_from(size - size_read).unwrap_or(u32::MAX); |
To properly support files bigger than 4GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max read size at a time is u32::MAX, we read the rest in other next iterations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, if we know we're reading more than u32::MAX then we can batch 2 read requests to avoid extra syscalls
6237a4c to
636cfb8
Compare
ADD-SP
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks complicated, I will review it incrementally.
6e0abae to
8120700
Compare
mox692
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't checked all of the details in read_uring.rs, but left some comments I've noticed so far.
e6c6ce7 to
b9c3885
Compare
19bda66 to
c1f8dc9
Compare
b29ed97 to
2acee44
Compare
6261e0e to
974be76
Compare
|
|
||
| let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, &mut read_len).await?; | ||
|
|
||
| r_buf.splice(back_bytes_len..back_bytes_len, temp_arr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can handle allocation failure by error rather than panic by allocating before calling splice. Note that this doesn't allocate if size_read == 0 since the capacity is guaranteed enough in that case.
| r_buf.splice(back_bytes_len..back_bytes_len, temp_arr); | |
| r_buf.try_reserve(PROBE_SIZE)?; | |
| r_buf.splice(back_bytes_len..back_bytes_len, temp_arr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Darksonn Will this splice write in the reserved area? Because we can only index in the length and not capacity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The length of r_buf before the op_read is exactly back_bytes_len, so after the op_read we have back_bytes_len <= r_buf.len() which is the requirement for using splice.
tokio/tests/fs_uring_read.rs
Outdated
| std::thread::spawn(move || { | ||
| let rt: Runtime = rx.recv().unwrap(); | ||
| rt.shutdown_timeout(Duration::from_millis(300)); | ||
| done_tx.send(()).unwrap(); | ||
| }); | ||
|
|
||
| tx.send(rt).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equivalent to just moving the runtime.
| std::thread::spawn(move || { | |
| let rt: Runtime = rx.recv().unwrap(); | |
| rt.shutdown_timeout(Duration::from_millis(300)); | |
| done_tx.send(()).unwrap(); | |
| }); | |
| tx.send(rt).unwrap(); | |
| std::thread::spawn(move || { | |
| rt.shutdown_timeout(Duration::from_millis(300)); | |
| done_tx.send(()).unwrap(); | |
| }); | |
If you wanted an actual sleep for the loop to make some progress, then what you have now does not achieve that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Darksonn Should I sleep in the std::thread so the loop can make progress? Is that allowed or I shouldn't do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Poll the
read(path).awaitmanually to submit it to the kernel. - Then increase the semaphore.
- Keep the spawn task pending for ever.
So that we can wait on the semaphore and then shutdown the runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this work for multi-thread runtime? For current thread runtime, this won't work. But I believe we can apply the similar pattern to the current thread runtime.
rt.spawn(async move {
let path = path[0].clone();
let mut futs = vec![];
// spawning a bunch of uring operations.
for _ in 0..N {
let path = path.clone();
let cl = Arc::clone(&cl);
let fut = Box::pin(read(path));
assert_pending!(fut.poll(cx));
futs.push(fut);
}
pending_forever().await;
});
std::thread::spawn(move || {
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});
done_rx.recv().unwrap();
tokio/tests/fs_uring_read.rs
Outdated
| poll_fn(|cx| { | ||
| let fut = read(&path[0]); | ||
|
|
||
| // If io_uring is enabled (and not falling back to the thread pool), | ||
| // the first poll should return Pending. | ||
| let _pending = Box::pin(fut).poll_unpin(cx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code currently drops the read future right away. If you want the future to stay around until the handle.abort() call, then you need to modify this code.
| poll_fn(|cx| { | |
| let fut = read(&path[0]); | |
| // If io_uring is enabled (and not falling back to the thread pool), | |
| // the first poll should return Pending. | |
| let _pending = Box::pin(fut).poll_unpin(cx); | |
| let fut = read(&path[0]); | |
| tokio::pin!(fut); | |
| poll_fn(|cx| { | |
| // If io_uring is enabled (and not falling back to the thread pool), | |
| // the first poll should return Pending. | |
| let _pending = fut.as_mut().poll(cx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed a slightly different version of this
902191e to
52dcdca
Compare
b599115 to
44f8f98
Compare
tokio/src/fs/read_uring.rs
Outdated
| let start_cap = buf.capacity(); | ||
|
|
||
| // if buffer has no room and no size_hint, start with a small probe_read from 0 offset | ||
| if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just delete this if. If the size hint is zero/missing, then we always have buf.capacity() == 0. If we delete it, then it will fall through to the loop, which will do the same thing as what this block does.
Reading 64 MB chunks at a time and keeping the kernel busy surpases
std::fs::read time with unoptimized io_uring one being 1.12% fast
1475f00 to
b6cfb9b
Compare
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why cfg_io_uring! { is not used here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment its following the currently used code block macro here in tokio/src/fs/open_options.rs, I can change those in a different "refactor" PR https://github.com/tokio-rs/tokio/blob/master/tokio/src/fs/open_options.rs#L592
tokio/src/fs/read_uring.rs
Outdated
|
|
||
| // Max bytes we can read using io uring submission at a time | ||
| // SAFETY: cannot be higher than u32::MAX for safe cast | ||
| // Set to read max 64 blocks at time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of block is meant here ?
File system blocks are usually around 4096 bytes. Here we are talking about a block of 1MiB.
| // Set to read max 64 blocks at time | |
| // Set to read max 64 MiB at time |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment this number came after benchmarking with io-uring against std::fs::read as its a reasonable amount of data for kernel to copy
tokio/tests/fs_uring_read.rs
Outdated
|
|
||
| // Wait for the first poll | ||
|
|
||
| let _ = rx.try_recv(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why try_recv() ?
This may return Empty before the task has a chance to be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an assert with recv await
31431a2 to
4e69595
Compare
Fix tests fail on allocation failure
Doc fix and fix double subtraction of offset
Fix typos and use assert the stopped task's first poll
67ac09f to
b7ccee7
Compare
Motivation
We slowly use io uring everywhere here I made the simple change of supporting
fs::readhowever it might not be as simple. Let me know if the unsafe I used is correct or not.We currently use the blocking
std::fs::MetaDatato obtain file size for buffer capacity and extend the length of the vector according to the bytes read in the CQE. This implementation sounds good on paper to me.Later we should implement an internal statx helper, in this PR or a seperate PR to make our uring implementation less painful to use. As this pr failed #7616
Lets put statx helper in different PR to avoid merging an inefficient read implementation given io uring is about being more efficient in file IO
Solution
Continue adopting io uring
strace on a
tokio::fs::readafter this change