@@ -39,20 +39,18 @@ async fn read_to_end_uring(
3939 mut buf : Vec < u8 > ,
4040) -> io:: Result < Vec < u8 > > {
4141 let mut offset = 0 ;
42-
4342 let start_cap = buf. capacity ( ) ;
4443
4544 // if buffer has no room and no size_hint, start with a small probe_read from 0 offset
4645 if ( size_hint. is_none ( ) || size_hint == Some ( 0 ) ) && buf. capacity ( ) - buf. len ( ) < PROBE_SIZE {
47- let ( size_read, r_fd, r_buf) = small_probe_read ( fd, buf, offset) . await ?;
46+ let ( size_read, r_fd, r_buf) = small_probe_read ( fd, buf, & mut offset) . await ?;
4847
4948 if size_read == 0 {
5049 return Ok ( r_buf) ;
5150 }
5251
53- buf = r_buf;
5452 fd = r_fd;
55- offset += size_read as u64 ;
53+ buf = r_buf ;
5654 }
5755
5856 loop {
@@ -61,7 +59,7 @@ async fn read_to_end_uring(
6159 // and see if it returns `Ok(0)`. If so, we've avoided an
6260 // unnecessary increasing of the capacity. But if not, append the
6361 // probe buffer to the primary buffer and let its capacity grow.
64- let ( size_read, r_fd, r_buf) = small_probe_read ( fd, buf, offset) . await ?;
62+ let ( size_read, r_fd, r_buf) = small_probe_read ( fd, buf, & mut offset) . await ?;
6563
6664 if size_read == 0 {
6765 return Ok ( r_buf) ;
@@ -77,91 +75,80 @@ async fn read_to_end_uring(
7775 buf. try_reserve ( PROBE_SIZE ) ?;
7876 }
7977
80- // doesn't matter if we have a valid size_hint or not, if we do more
81- // than 2 consecutive_short_reads, gradually increase the buffer
82- // capacity to read more data at a time
83-
8478 // prepare the spare capacity to be read into
8579 let buf_len = usize:: min ( buf. spare_capacity_mut ( ) . len ( ) , MAX_READ_SIZE ) ;
8680
87- // SAFETY: buf_len cannot be greater than u32::MAX because max_read_size
81+ // buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
8882 // is u32::MAX
8983 let mut read_len = buf_len as u32 ;
9084
91- loop {
92- // read into spare capacity
93- let ( res, r_fd, r_buf) = Op :: read ( fd, buf, read_len, offset) . await ;
94-
95- match res {
96- Ok ( 0 ) => return Ok ( r_buf) ,
97- Ok ( size_read) => {
98- fd = r_fd;
99- buf = r_buf;
100- offset += size_read as u64 ;
101- read_len -= size_read;
102-
103- // keep reading if there's something left to be read
104- if read_len > 0 {
105- continue ;
106- } else {
107- break ;
108- }
109- }
110- Err ( e) if e. kind ( ) == ErrorKind :: Interrupted => {
111- buf = r_buf;
112- fd = r_fd;
85+ // read into spare capacity
86+ let ( size_read, r_fd, r_buf) = op_read ( fd, buf, & mut offset, & mut read_len) . await ?;
11387
114- continue ;
115- }
116- Err ( e) => return Err ( e) ,
117- }
88+ if size_read == 0 {
89+ return Ok ( r_buf) ;
11890 }
91+
92+ fd = r_fd;
93+ buf = r_buf;
11994 }
12095}
12196
12297async fn small_probe_read (
123- mut fd : OwnedFd ,
98+ fd : OwnedFd ,
12499 mut buf : Vec < u8 > ,
125- offset : u64 ,
100+ offset : & mut u64 ,
126101) -> io:: Result < ( u32 , OwnedFd , Vec < u8 > ) > {
127- let mut temp_arr = [ 0 ; PROBE_SIZE ] ;
128- let has_enough = buf. len ( ) > PROBE_SIZE ;
129-
130- if has_enough {
131- // if we have more than PROBE_SIZE bytes in the buffer already then
132- // don't call reserve as we might potentially read 0 bytes
133- let back_bytes_len = buf. len ( ) - PROBE_SIZE ;
134- temp_arr. copy_from_slice ( & buf[ back_bytes_len..] ) ;
135- // We're decreasing the length of the buffer and len is greater
136- // than PROBE_SIZE. So we can read into the discarded length
137- buf. truncate ( back_bytes_len) ;
138- } else {
139- // we don't even have PROBE_SIZE length in the buffer, we need this
140- // reservation
141- buf. reserve_exact ( PROBE_SIZE ) ;
102+ let mut read_len = PROBE_SIZE_U32 ;
103+
104+ if buf. len ( ) < PROBE_SIZE {
105+ buf. try_reserve ( PROBE_SIZE ) ?;
106+
107+ return op_read ( fd, buf, offset, & mut read_len) . await ;
142108 }
143109
110+ let mut temp_arr = [ 0 ; PROBE_SIZE ] ;
111+ let back_bytes_len = buf. len ( ) - PROBE_SIZE ;
112+
113+ temp_arr. copy_from_slice ( & buf[ back_bytes_len..] ) ;
114+
115+ // We're decreasing the length of the buffer and len is greater
116+ // than PROBE_SIZE. So we can read into the discarded length
117+ buf. truncate ( back_bytes_len) ;
118+
119+ let ( size_read, r_fd, mut r_buf) = op_read ( fd, buf, offset, & mut read_len) . await ?;
120+
121+ r_buf. splice ( back_bytes_len..back_bytes_len, temp_arr) ;
122+
123+ Ok ( ( size_read, r_fd, r_buf) )
124+ }
125+
126+ async fn op_read (
127+ mut fd : OwnedFd ,
128+ mut buf : Vec < u8 > ,
129+ offset : & mut u64 ,
130+ read_len : & mut u32 ,
131+ ) -> io:: Result < ( u32 , OwnedFd , Vec < u8 > ) > {
144132 loop {
145- let ( res, r_fd, mut r_buf) = Op :: read ( fd, buf, PROBE_SIZE_U32 , offset) . await ;
133+ let ( res, r_fd, r_buf) = Op :: read ( fd, buf, * read_len , * offset) . await ;
146134
147135 match res {
148- // return early if we inserted into reserved PROBE_SIZE
149- // bytes
150- Ok ( size_read) if !has_enough => return Ok ( ( size_read, r_fd, r_buf) ) ,
136+ Err ( e) if e. kind ( ) == ErrorKind :: Interrupted => {
137+ buf = r_buf;
138+ fd = r_fd;
139+ }
140+ Err ( e) => return Err ( e) ,
151141 Ok ( size_read) => {
152- let old_len = r_buf. len ( ) - ( size_read as usize ) ;
142+ * offset += size_read as u64 ;
143+ * read_len -= size_read;
153144
154- r_buf. splice ( old_len..old_len, temp_arr) ;
145+ if * read_len == 0 || size_read == 0 {
146+ return Ok ( ( size_read, r_fd, r_buf) ) ;
147+ }
155148
156- return Ok ( ( size_read, r_fd, r_buf) ) ;
157- }
158- Err ( e) if e. kind ( ) == ErrorKind :: Interrupted => {
159149 buf = r_buf;
160150 fd = r_fd;
161-
162- continue ;
163151 }
164- Err ( e) => return Err ( e) ,
165152 }
166153 }
167154}
0 commit comments