Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Pipelines - Reduce lock contention
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Feb 22, 2019
1 parent bca4408 commit 4209a5c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 50 deletions.
154 changes: 104 additions & 50 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public sealed partial class Pipe
private readonly PipeScheduler _readerScheduler;
private readonly PipeScheduler _writerScheduler;

private int _pooledSegmentCount;
private readonly BufferSegment[] _bufferSegmentPool;
// Temporary list to hold Segments return while being reset
private readonly BufferSegment[] _bufferSegmentsToReturn;

private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer;
Expand All @@ -52,8 +55,6 @@ public sealed partial class Pipe
private long _length;
private long _currentWriteLength;

private int _pooledSegmentCount;

private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable;

Expand Down Expand Up @@ -97,6 +98,7 @@ public Pipe(PipeOptions options)
}

_bufferSegmentPool = new BufferSegment[SegmentPoolSize];
_bufferSegmentsToReturn = new BufferSegment[SegmentPoolSize];

_operationState = default;
_readerCompletion = default;
Expand Down Expand Up @@ -177,43 +179,48 @@ private void AllocateWriteHeadIfNeeded(int sizeHint)

private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (_sync)
if (_writingHead == null)
{
_operationState.BeginWrite();
// We need to allocate memory to write since nobody has written before
BufferSegment newSegment = AllocateSegment(sizeHint);

if (_writingHead == null)
lock (_sync)
{
// We need to allocate memory to write since nobody has written before
BufferSegment newSegment = AllocateSegment(sizeHint);

_operationState.BeginWrite();
// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
}
else

return;
}
else if (!_operationState.IsWritingActive)
{
lock (_sync)
{
int bytesLeftInBuffer = _writingMemory.Length;
_operationState.BeginWrite();
}
}

if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
if (_buffered > 0)
{
// Flush buffered data to the segment
_writingHead.End += _buffered;
_buffered = 0;
}
// Have to recheck as if writing wasn't active AdvanceReader may have disposed any available Memory
if (_writingMemory.Length == 0 || _writingMemory.Length < sizeHint)
{
if (_buffered > 0)
{
// Flush buffered data to the segment
_writingHead.End += _buffered;
_buffered = 0;
}

BufferSegment newSegment = AllocateSegment(sizeHint);
BufferSegment newSegment = AllocateSegment(sizeHint);

_writingHead.SetNext(newSegment);
_writingHead = newSegment;
}
}
_writingHead.SetNext(newSegment);
_writingHead = newSegment;
}
}

private BufferSegment AllocateSegment(int sizeHint)
{
BufferSegment newSegment = CreateSegmentUnsynchronized();
BufferSegment newSegment = CreateSegmentSynchronized();

if (_pool is null)
{
Expand Down Expand Up @@ -245,23 +252,63 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
return adjustedToMaximumSize;
}

private BufferSegment CreateSegmentUnsynchronized()
private BufferSegment CreateSegmentSynchronized()
{
if (_pooledSegmentCount > 0)
BufferSegment[] segmentPool = _bufferSegmentPool;
lock (segmentPool)
{
_pooledSegmentCount--;
return _bufferSegmentPool[_pooledSegmentCount];
int index = _pooledSegmentCount - 1;
if ((uint)index < (uint)segmentPool.Length)
{
_pooledSegmentCount = index;
return segmentPool[index];
}
}

return new BufferSegment();
}

private void ReturnSegmentUnsynchronized(BufferSegment segment)
private void ReturnSegments(BufferSegment from, BufferSegment toExclusive)
{
if (_pooledSegmentCount < _bufferSegmentPool.Length)
Debug.Assert(from != null);
Debug.Assert(from != toExclusive);

// Reset the Segments and return their data out of lock
BufferSegment[] segmentToReturn = _bufferSegmentsToReturn;
int count = 0;
do
{
_bufferSegmentPool[_pooledSegmentCount] = segment;
_pooledSegmentCount++;
BufferSegment next = from.NextSegment;
Debug.Assert(next != null || toExclusive == null);

from.ResetMemory();

if ((uint)count < (uint)segmentToReturn.Length)
{
// Store in temporary list while preforming expensive resets
segmentToReturn[count] = from;
count++;
}

from = next;
} while (from != toExclusive);

// Add the Segments back to pool from the temporary list under lock
BufferSegment[] segmentPool = _bufferSegmentPool;
lock (segmentPool)
{
int index = _pooledSegmentCount;
for (int i = 0; i < count; i++)
{
if ((uint)index < (uint)segmentPool.Length)
{
segmentPool[index] = segmentToReturn[i];
index++;
}
segmentToReturn[i] = null;
}

_pooledSegmentCount = index;
}
}

Expand Down Expand Up @@ -418,6 +465,7 @@ private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, Buf
BufferSegment returnEnd = null;

CompletionData completionData = default;
bool isReadComplete;

lock (_sync)
{
Expand Down Expand Up @@ -496,18 +544,24 @@ private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, Buf
_readerAwaitable.SetUncompleted();
}

while (returnStart != null && returnStart != returnEnd)
{
BufferSegment next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
}
isReadComplete = _operationState.TryEndRead();
}

_operationState.EndRead();
if (isReadComplete)
{
TrySchedule(_writerScheduler, completionData);
}

TrySchedule(_writerScheduler, completionData);
if (returnStart != null && returnStart != returnEnd)
{
ReturnSegments(returnStart, returnEnd);
}

if (!isReadComplete)
{
// Segments need to be returned (above) prior to the throw.
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -737,6 +791,7 @@ private static void ExecuteWithExecutionContext(object state)

private void CompletePipe()
{
BufferSegment segment;
lock (_sync)
{
if (_disposed)
Expand All @@ -745,22 +800,21 @@ private void CompletePipe()
}

_disposed = true;
// Return all segments
// Get segment chain to return
// if _readHead is null we need to try return _commitHead
// because there might be a block allocated for writing
BufferSegment segment = _readHead ?? _readTail;
while (segment != null)
{
BufferSegment returnSegment = segment;
segment = segment.NextSegment;

returnSegment.ResetMemory();
}
segment = _readHead ?? _readTail;

_writingHead = null;
_readHead = null;
_readTail = null;
}

// Return all segments
if (segment != null)
{
ReturnSegments(segment, toExclusive: null);
}
}

internal ValueTaskSourceStatus GetReadAsyncStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ public void EndRead()
_state &= ~(State.Reading | State.ReadingTentative);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryEndRead()
{
if ((_state & State.Reading) != State.Reading &&
(_state & State.ReadingTentative) != State.ReadingTentative)
{
return false;
}

_state &= ~(State.Reading | State.ReadingTentative);
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
Expand Down

0 comments on commit 4209a5c

Please sign in to comment.