Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build Threading.Tasks.DataFlow and ComponentModel.Annotations for NetCoreAppCurrent #48667

Merged
merged 2 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.1</TargetFrameworks>
<TargetFrameworks>$(NetCoreAppCurrent);netstandard2.1</TargetFrameworks>
<ExcludeCurrentNetCoreAppFromPackage>true</ExcludeCurrentNetCoreAppFromPackage>
<Nullable>enable</Nullable>
<!--
Since many resource strings in this library are shown to an end-user,
Expand Down Expand Up @@ -59,4 +60,16 @@
<Compile Include="$(CommonPath)System\NotImplemented.cs"
Link="Common\System\NotImplemented.cs" />
</ItemGroup>
<ItemGroup Condition="$([MSBuild]::GetTargetFrameworkIdentifier('$(TargetFramework)')) == '.NETCoreApp'">
<Reference Include="System.Collections" />
<Reference Include="System.Collections.Concurrent" />
<Reference Include="System.ComponentModel" />
<Reference Include="System.ComponentModel.Primitives" />
<Reference Include="System.ComponentModel.TypeConverter" />
<Reference Include="System.Linq" />
<Reference Include="System.Runtime" />
<Reference Include="System.Resources.ResourceManager" />
<Reference Include="System.Text.RegularExpressions" />
<Reference Include="System.Threading" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private bool RunPredicate(T item)

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
#pragma warning disable 8617
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
#pragma warning restore 8617
{
// Validate arguments. Some targets may have a null source, but FilteredLinkPropagator
Expand Down Expand Up @@ -939,7 +939,7 @@ public static TOutput Receive<TOutput>(

// Do fast path checks for both cancellation and data already existing.
cancellationToken.ThrowIfCancellationRequested();
TOutput fastCheckedItem;
TOutput? fastCheckedItem;
var receivableSource = source as IReceivableSourceBlock<TOutput>;
if (receivableSource != null && receivableSource.TryReceive(null, out fastCheckedItem))
{
Expand Down Expand Up @@ -995,7 +995,7 @@ private static Task<TOutput> ReceiveCore<TOutput>(
{
try
{
TOutput fastCheckedItem;
TOutput? fastCheckedItem;
if (receivableSource.TryReceive(null, out fastCheckedItem))
{
return Task.FromResult<TOutput>(fastCheckedItem);
Expand Down Expand Up @@ -1091,7 +1091,7 @@ private static Task<TOutput> ReceiveCoreByLinking<TOutput>(ISourceBlock<TOutput>
// So we are racing to dispose of the unlinker.
if (Volatile.Read(ref target._cleanupReserved))
{
IDisposable disposableUnlink = Interlocked.CompareExchange(ref target._unlink, null, unlink);
IDisposable? disposableUnlink = Interlocked.CompareExchange<IDisposable?>(ref target._unlink, null, unlink);
if (disposableUnlink != null) disposableUnlink.Dispose();
}
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
{
// Accept the message if possible and complete this task with the message's value.
bool consumed = true;
T acceptedValue = consumeToAccept ? source!.ConsumeMessage(messageHeader, this, out consumed) : messageValue;
T? acceptedValue = consumeToAccept ? source!.ConsumeMessage(messageHeader, this, out consumed) : messageValue;
if (consumed)
{
status = DataflowMessageStatus.Accepted;
Expand Down Expand Up @@ -1962,7 +1962,7 @@ private static bool TryChooseFromSource<T>(
Debug.Assert(scheduler != null, "Expected a non-null scheduler");

// Try to receive from the source. If we can't, bail.
T result;
T? result;
var receivableSource = source as IReceivableSourceBlock<T>;
if (receivableSource == null || !receivableSource.TryReceive(out result))
{
Expand Down Expand Up @@ -2198,7 +2198,7 @@ public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T
if (consumeToAccept)
{
bool consumed;
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed);
messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down Expand Up @@ -1004,7 +1004,7 @@ private void ConsumeReservedMessagesNonGreedy()
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
bool consumed;
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
if (!consumed)
{
// The protocol broke down, so throw an exception, as this is fatal. Before doing so, though,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ private void ConsumeReservedMessagesGreedyBounded()
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
bool consumed;
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
if (consumed)
{
var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBloc
// messages, and thus when there may be a few stragglers we need to make a batch out of.
Action createBatchAction = () =>
{
if (_target1.Count > 0 || _target2.Count > 0)
if (_target1!.Count > 0 || _target2!.Count > 0)
{
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages()));
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages()));
}
};

Expand Down Expand Up @@ -329,9 +329,9 @@ public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBloc
// messages, and thus when there may be a few stragglers we need to make a batch out of.
Action createBatchAction = () =>
{
if (_target1.Count > 0 || _target2.Count > 0 || _target3.Count > 0)
if (_target1!.Count > 0 || _target2!.Count > 0 || _target3!.Count > 0)
{
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages(), _target3.GetAndEmptyMessages()));
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages(), _target3!.GetAndEmptyMessages()));
}
};

Expand Down Expand Up @@ -598,7 +598,7 @@ public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
_messages.Add(messageValue!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down Expand Up @@ -352,7 +352,7 @@ private bool ConsumeAndStoreOneMessageIfAvailable()
bool consumed = false;
try
{
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue!);
Expand Down Expand Up @@ -577,7 +577,7 @@ internal bool TryReceive(Predicate<TOutput>? filter, [MaybeNullWhen(false)] out
// synchronizing with other activities on the block.
// We don't want to execute the user-provided cloning delegate
// while holding the lock.
TOutput message;
TOutput? message;
bool isValid;
lock (OutgoingLock)
{
Expand Down Expand Up @@ -607,7 +607,7 @@ internal bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput>? items)
{
// Try to receive the one item this block may have.
// If we can, give back an array of one item. Otherwise, give back null.
TOutput item;
TOutput? item;
if (TryReceive(null, out item))
{
items = new TOutput[] { item };
Expand Down Expand Up @@ -683,7 +683,7 @@ private void OfferCurrentMessageToNewTarget(ITargetBlock<TOutput> target)
Common.ContractAssertMonitorStatus(ValueLock, held: false);

// Get the current message if there is one
TOutput currentMessage;
TOutput? currentMessage;
bool isValid;
lock (ValueLock)
{
Expand Down Expand Up @@ -725,7 +725,7 @@ private bool OfferToTargets()
Common.ContractAssertMonitorStatus(ValueLock, held: false);

DataflowMessageHeader header = default(DataflowMessageHeader);
TOutput message = default(TOutput);
TOutput? message = default(TOutput);
int numDequeuedMessages = 0;
lock (ValueLock)
{
Expand Down Expand Up @@ -1053,7 +1053,7 @@ internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions li
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (target == null) throw new ArgumentNullException(nameof(target));

TOutput valueToClone;
TOutput? valueToClone;
lock (OutgoingLock) // We may currently be calling out under this lock to the target; requires it to be reentrant
{
lock (ValueLock)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlo
// If someone else holds the reservation, bail.
if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);

TOutput messageToReoffer;
TOutput? messageToReoffer;
lock (ValueLock)
{
// If this is not the message at the head of the queue, bail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down Expand Up @@ -354,7 +354,7 @@ private bool ConsumeAndStoreOneMessageIfAvailable()
bool consumed = false;
try
{
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public JoinBlock(GroupingDataflowBlockOptions dataflowBlockOptions)
_sharedResources = new JoinBlockTargetSharedResources(this, targets,
() =>
{
_source.AddMessage(Tuple.Create(_target1.GetOneMessage(), _target2.GetOneMessage()));
_source.AddMessage(Tuple.Create(_target1!.GetOneMessage(), _target2!.GetOneMessage()));
},
exception =>
{
Volatile.Write(ref _sharedResources._hasExceptions, true);
Volatile.Write(ref _sharedResources!._hasExceptions, true);
_source.AddException(exception);
},
dataflowBlockOptions);
Expand Down Expand Up @@ -297,10 +297,10 @@ public JoinBlock(GroupingDataflowBlockOptions dataflowBlockOptions)
// Configure the targets
var targets = new JoinBlockTargetBase[3];
_sharedResources = new JoinBlockTargetSharedResources(this, targets,
() => _source.AddMessage(Tuple.Create(_target1.GetOneMessage(), _target2.GetOneMessage(), _target3.GetOneMessage())),
() => _source.AddMessage(Tuple.Create(_target1!.GetOneMessage(), _target2!.GetOneMessage(), _target3!.GetOneMessage())),
exception =>
{
Volatile.Write(ref _sharedResources._hasExceptions, true);
Volatile.Write(ref _sharedResources!._hasExceptions, true);
_source.AddException(exception);
},
dataflowBlockOptions);
Expand Down Expand Up @@ -660,7 +660,7 @@ internal override bool ConsumeReservedMessage()
Debug.Assert(_nonGreedy!.ReservedMessage.Key != null, "This target must have a reserved message");

bool consumed;
T consumedValue = _nonGreedy.ReservedMessage.Key.ConsumeMessage(_nonGreedy.ReservedMessage.Value, this, out consumed);
T? consumedValue = _nonGreedy.ReservedMessage.Key.ConsumeMessage(_nonGreedy.ReservedMessage.Value, this, out consumed);

// Null out our reservation
_nonGreedy.ReservedMessage = default(KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>);
Expand Down Expand Up @@ -721,7 +721,7 @@ internal override bool ConsumeOnePostponedMessage()

// Try to consume the popped message
bool consumed;
T consumedValue = next.Key.ConsumeMessage(next.Value, this, out consumed);
T? consumedValue = next.Key.ConsumeMessage(next.Value, this, out consumed);
if (consumed)
{
lock (_sharedResources.IncomingLock)
Expand Down Expand Up @@ -861,7 +861,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
if (_sharedResources._boundingState != null && HasTheHighestNumberOfMessagesAvailable) _sharedResources._boundingState.CurrentCount += 1; // track this new item against our bound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private TransformBlock(Func<TInput, TOutput>? transformSync, Func<TInput, Task<T
private void ProcessMessage(Func<TInput, TOutput> transform, KeyValuePair<TInput, long> messageWithId)
{
// Process the input message to get the output message
TOutput outputItem = default(TOutput);
TOutput? outputItem = default(TOutput);
bool itemIsValid = false;
try
{
Expand Down Expand Up @@ -272,7 +272,7 @@ private void AsyncCompleteProcessMessageWithTask(Task<TOutput> completed, KeyVal

bool isBounded = _target.IsBounded;
bool gotOutputItem = false;
TOutput outputItem = default(TOutput);
TOutput? outputItem = default(TOutput);

switch (completed.Status)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ bool IReceivableSourceBlock<T>.TryReceiveAll([NotNullWhen(true)] out IList<T>? i
// Try to receive the one item this block may have.
// If we can, give back an array of one item. Otherwise,
// give back null.
T item;
T? item;
if (TryReceive(null, out item))
{
items = new T[] { item };
Expand Down Expand Up @@ -357,7 +357,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
if (consumeToAccept)
{
bool consumed;
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down
Loading