Skip to content

Commit b2969ed

Browse files
Always read from underlying stream in StreamPipeReader (#118041)
1 parent 0255b73 commit b2969ed

File tree

2 files changed

+39
-69
lines changed

2 files changed

+39
-69
lines changed

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ internal sealed class StreamPipeReader : PipeReader
1616

1717
private CancellationTokenSource? _internalTokenSource;
1818
private bool _isReaderCompleted;
19-
private bool _isStreamCompleted;
2019

2120
private BufferSegment? _readHead;
2221
private int _readIndex;
@@ -231,12 +230,6 @@ private ValueTask<ReadResult> ReadInternalAsync(int? minimumSize, CancellationTo
231230
}
232231
}
233232

234-
if (_isStreamCompleted)
235-
{
236-
ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
237-
return new ValueTask<ReadResult>(completedResult);
238-
}
239-
240233
return Core(this, minimumSize, tokenSource, cancellationToken);
241234

242235
#if NET
@@ -253,6 +246,7 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSiz
253246
using (reg)
254247
{
255248
var isCanceled = false;
249+
bool isCompleted = false;
256250
try
257251
{
258252
// This optimization only makes sense if we don't have anything buffered
@@ -277,7 +271,7 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSiz
277271

278272
if (length == 0)
279273
{
280-
reader._isStreamCompleted = true;
274+
isCompleted = true;
281275
break;
282276
}
283277
} while (minimumSize != null && reader._bufferedBytes < minimumSize);
@@ -302,7 +296,7 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSiz
302296
}
303297
}
304298

305-
return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
299+
return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, isCompleted);
306300
}
307301
}
308302
}
@@ -362,11 +356,6 @@ public override async Task CopyToAsync(PipeWriter destination, CancellationToken
362356
}
363357
}
364358

365-
if (_isStreamCompleted)
366-
{
367-
return;
368-
}
369-
370359
await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(false);
371360
}
372361
catch (OperationCanceledException)
@@ -423,11 +412,6 @@ public override async Task CopyToAsync(Stream destination, CancellationToken can
423412
}
424413
}
425414

426-
if (_isStreamCompleted)
427-
{
428-
return;
429-
}
430-
431415
await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(false);
432416
}
433417
catch (OperationCanceledException)
@@ -465,7 +449,7 @@ public override bool TryRead(out ReadResult result)
465449
private bool TryReadInternal(CancellationTokenSource source, out ReadResult result)
466450
{
467451
bool isCancellationRequested = source.IsCancellationRequested;
468-
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))
452+
if (isCancellationRequested || (_bufferedBytes > 0 && !_examinedEverything))
469453
{
470454
if (isCancellationRequested)
471455
{
@@ -474,7 +458,7 @@ private bool TryReadInternal(CancellationTokenSource source, out ReadResult resu
474458

475459
ReadOnlySequence<byte> buffer = GetCurrentReadOnlySequence();
476460

477-
result = new ReadResult(buffer, isCancellationRequested, _isStreamCompleted);
461+
result = new ReadResult(buffer, isCancellationRequested, false);
478462
return true;
479463
}
480464

src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs

Lines changed: 34 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,14 @@ public async Task ReadWithDifferentSettings(int bytesInBuffer, int bufferSize, i
151151
[Fact]
152152
public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
153153
{
154-
var stream = new ThrowAfterZeroByteReadStream();
154+
byte[] helloBytes = "Hello World"u8.ToArray();
155+
var stream = new MemoryStream(helloBytes);
155156
PipeReader reader = PipeReader.Create(stream);
156157
ReadResult readResult = await reader.ReadAsync();
157-
Assert.True(readResult.Buffer.IsEmpty);
158+
Assert.Equal(helloBytes.Length, readResult.Buffer.Length);
159+
reader.AdvanceTo(readResult.Buffer.End);
160+
161+
readResult = await reader.ReadAsync();
158162
Assert.True(readResult.IsCompleted);
159163
reader.AdvanceTo(readResult.Buffer.End);
160164

@@ -166,12 +170,38 @@ public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
166170
}
167171

168172
[Fact]
169-
public async Task BufferingDataPastEndOfStreamCanBeReadAgain()
173+
public async Task ReadAsyncAfterReceivingCompletedReadResultAndResettingStreamPositionWorks()
170174
{
171175
byte[] helloBytes = "Hello World"u8.ToArray();
172-
var stream = new ThrowAfterZeroByteReadStream(helloBytes);
176+
var stream = new MemoryStream(helloBytes);
173177
PipeReader reader = PipeReader.Create(stream);
178+
ReadResult readResult = await reader.ReadAsync();
179+
Assert.Equal(helloBytes, readResult.Buffer.ToArray());
180+
Assert.False(readResult.IsCompleted);
181+
reader.AdvanceTo(readResult.Buffer.End);
182+
183+
readResult = await reader.ReadAsync();
184+
Assert.True(readResult.IsCompleted);
185+
186+
// Reset the stream position to the beginning
187+
stream.Position = 0;
188+
189+
readResult = await reader.ReadAsync();
190+
Assert.Equal(helloBytes, readResult.Buffer.ToArray());
191+
Assert.False(readResult.IsCompleted);
192+
reader.AdvanceTo(readResult.Buffer.End);
174193

194+
readResult = await reader.ReadAsync();
195+
Assert.True(readResult.IsCompleted);
196+
reader.Complete();
197+
}
198+
199+
[Fact]
200+
public async Task BufferingDataPastEndOfStreamCanBeReadAgain()
201+
{
202+
byte[] helloBytes = "Hello World"u8.ToArray();
203+
var stream = new MemoryStream(helloBytes);
204+
PipeReader reader = PipeReader.Create(stream);
175205

176206
ReadResult readResult = await reader.ReadAsync();
177207
ReadOnlySequence<byte> buffer = readResult.Buffer;
@@ -669,50 +699,6 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
669699
#endif
670700
}
671701

672-
private class ThrowAfterZeroByteReadStream : MemoryStream
673-
{
674-
public ThrowAfterZeroByteReadStream()
675-
{
676-
677-
}
678-
679-
public ThrowAfterZeroByteReadStream(byte[] buffer) : base(buffer)
680-
{
681-
682-
}
683-
684-
private bool _throwOnNextCallToRead;
685-
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
686-
{
687-
if (_throwOnNextCallToRead)
688-
{
689-
throw new Exception();
690-
}
691-
var bytes = await base.ReadAsync(buffer, offset, count, cancellationToken);
692-
if (bytes == 0)
693-
{
694-
_throwOnNextCallToRead = true;
695-
}
696-
return bytes;
697-
}
698-
699-
#if NET
700-
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
701-
{
702-
if (_throwOnNextCallToRead)
703-
{
704-
throw new Exception();
705-
}
706-
var bytes = await base.ReadAsync(destination, cancellationToken);
707-
if (bytes == 0)
708-
{
709-
_throwOnNextCallToRead = true;
710-
}
711-
return bytes;
712-
}
713-
#endif
714-
}
715-
716702
private class DisposalTrackingStream : MemoryStream
717703
{
718704
public bool DisposeCalled { get; private set; }

0 commit comments

Comments
 (0)