Skip to content

Commit

Permalink
Fix condition raising System.InvalidOperationException on `KNetCompac…
Browse files Browse the repository at this point in the history
…tedReplicator` (#537)

* Fix condition "System.InvalidOperationException: Collection was modified; enumeration operation may not execute." can be raised if partitions are added/removed at the same time the method CheckConsumerSyncState is invoked

* Update workflows to use concurrency
  • Loading branch information
masesdevelopers authored Jul 4, 2024
1 parent 5b852ab commit 406dce8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ on:
default: false
type: boolean

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

# This workflow contains two jobs called "check_changes", "build_windows"
jobs:
# Verify if a build is needed
Expand Down
12 changes: 10 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@ name: "CodeQL"

on:
push:
branches: [ master ]
branches:
- master
- release/**
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
branches:
- master
- release/**
# Disabled to avoid error on schedule from check_changes job
# schedule:
# - cron: '26 23 * * 3'

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
# Verify if a check is needed
check_changes:
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/pullrequest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ name: CI_PULLREQUEST
# events but only for the master branch
on:
pull_request:
branches: [ master ]
branches:
- master
- release/**

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
# Verify if a build is needed
Expand Down
18 changes: 12 additions & 6 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public override void OnPartitionsLost(Java.Util.Collection<Org.Apache.Kafka.Comm
private AccessRightsType _accessrights = AccessRightsType.ReadWrite;
private UpdateModeTypes _updateMode = UpdateModeTypes.OnDelivery;
private Tuple<K, ManualResetEvent> _OnConsumeSyncWaiter = null;
private System.Collections.Generic.Dictionary<int, System.Collections.Generic.IList<int>> _consumerAssociatedPartition = new();
private Dictionary<int, System.Collections.Generic.IList<int>> _consumerAssociatedPartition = new();
private ManualResetEvent[] _assignmentWaiters;
private bool[] _assignmentWaitersStatus;
private long[] _lastPartitionLags = null;
Expand Down Expand Up @@ -600,7 +600,7 @@ public Func<IKNetCompactedReplicator<K, V, TJVMK, TJVMV>, bool, KeyValuePair<K,
public int Partitions { get { return _partitions; } set { CheckStarted(); _partitions = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{K, V, TJVMK, TJVMV}.ConsumerInstances"/>
public int? ConsumerInstances { get { return _consumerInstances.HasValue ? _consumerInstances.Value : _partitions; } set { CheckStarted(); _consumerInstances = value; } }
public int? ConsumerInstances { get { return _consumerInstances ?? _partitions; } set { CheckStarted(); _consumerInstances = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{K, V, TJVMK, TJVMV}.ReplicationFactor"/>
public short ReplicationFactor { get { return _replicationFactor; } set { CheckStarted(); _replicationFactor = value; } }
Expand Down Expand Up @@ -731,7 +731,7 @@ void CheckStarted()
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
int ConsumersToAllocate()
{
return ConsumerInstances.HasValue ? ConsumerInstances.Value : Partitions;
return ConsumerInstances ?? Partitions;
}

bool UpdateModeOnDelivery => (UpdateMode & UpdateModeTypes.OnDelivery) == UpdateModeTypes.OnDelivery;
Expand Down Expand Up @@ -1094,7 +1094,12 @@ void ConsumerPollHandler(object o)
bool CheckConsumerSyncState(int index)
{
bool lagInSync = true;
foreach (var partitionIndex in _consumerAssociatedPartition[index])
int[] partitionIndexes;
lock (_consumerAssociatedPartition)
{
partitionIndexes = _consumerAssociatedPartition[index].ToArray();
}
foreach (var partitionIndex in partitionIndexes)
{
var partitionLag = Interlocked.Read(ref _lastPartitionLags[partitionIndex]);
lagInSync &= partitionLag == 0;
Expand Down Expand Up @@ -1245,10 +1250,11 @@ public bool SyncWait(int timeout = Timeout.Infinite)
ValidateStarted();
Stopwatch watcher = Stopwatch.StartNew();
bool sync = false;
int matrixLength = ConsumersToAllocate();
bool[] syncs = new bool[matrixLength];
while (!sync && watcher.ElapsedMilliseconds < (uint)timeout)
{
bool[] syncs = new bool[ConsumersToAllocate()];
for (int i = 0; i < ConsumersToAllocate(); i++)
for (int i = 0; i < matrixLength; i++)
{
syncs[i] = CheckConsumerSyncState(i);
}
Expand Down

0 comments on commit 406dce8

Please sign in to comment.