Skip to content

Commit

Permalink
Merge pull request #58 from ZjzMisaka/fetch-by-predicate
Browse files Browse the repository at this point in the history
[*] fetch by predicate
  • Loading branch information
ZjzMisaka authored Sep 22, 2024
2 parents 4fe11d9 + 8e19954 commit d09249b
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 9 deletions.
10 changes: 10 additions & 0 deletions PowerThreadPool/Collections/ConcurrentSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public ConcurrentSet(IEnumerable<T> items)
/// </summary>
public int Count => _dictionary.Count;

/// <summary>
/// Checks whether the collection contains the specified value.
/// </summary>
/// <param name="value">The value to locate in the collection.</param>
/// <returns>True if the value is found in the collection; otherwise, false.</returns>
public bool Contains(T value)
{
return _dictionary.ContainsKey(value);
}

/// <summary>
/// Clear all items.
/// </summary>
Expand Down
61 changes: 55 additions & 6 deletions PowerThreadPool/Core/PowerThreadPool.Control.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using PowerThreadPool.Collections;
Expand Down Expand Up @@ -315,15 +316,19 @@ public List<ExecuteResult<TResult>> Fetch<TResult>(IEnumerable<string> idList, b
if (executeResultBase != null)
{
resultList.Add(executeResultBase.ToTypedResult<TResult>());

if (removeAfterFetch && _aliveWorkDic.TryRemove(id, out WorkBase work))
{
RemoveWorkFromGroup(work.Group, work);
}
}
else
{
workList.Add(workBase);

if (removeAfterFetch)
{
_resultDic.TryRemove(id, out _);
if (_aliveWorkDic.TryRemove(id, out WorkBase work))
{
RemoveWorkFromGroup(work.Group, work);
}
}
}
}
else
Expand Down Expand Up @@ -447,6 +452,50 @@ public Task<List<ExecuteResult<object>>> FetchAsync(IEnumerable<string> idList,
}
#endif

/// <summary>
/// Fetch the work result.
/// </summary>
/// <param name="predicate">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="removeAfterFetch">remove the result from storage</param>
/// <returns>Return a list of work result</returns>
public List<ExecuteResult<TResult>> Fetch<TResult>(Func<ExecuteResult<TResult>, bool> predicate, bool removeAfterFetch = false)
{
List<string> idList = new List<string>();

foreach (KeyValuePair<string, ExecuteResultBase> pair in _resultDic)
{
if (predicate(pair.Value.ToTypedResult<TResult>()))
{
idList.Add(pair.Value.ID);
}
}

return Fetch<TResult>(idList, removeAfterFetch);
}

/// <summary>
/// Fetch the work result.
/// </summary>
/// <param name="predicate">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="predicateID">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="removeAfterFetch">remove the result from storage</param>
/// <returns>Return a list of work result</returns>
internal List<ExecuteResult<TResult>> Fetch<TResult>(Func<ExecuteResult<TResult>, bool> predicate, Func<ExecuteResult<TResult>, bool> predicateID, bool removeAfterFetch = false)
{
List<string> idList = new List<string>();

foreach (KeyValuePair<string, ExecuteResultBase> pair in _resultDic)
{
ExecuteResult<TResult> typedResult = pair.Value.ToTypedResult<TResult>();
if (predicate(typedResult) && predicateID(typedResult))
{
idList.Add(pair.Value.ID);
}
}

return Fetch<TResult>(idList, removeAfterFetch);
}

/// <summary>
/// Stop all works
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion PowerThreadPool/Core/PowerThreadPool.Parallel.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using PowerThreadPool.Groups;
using PowerThreadPool.Options;
Expand Down
17 changes: 16 additions & 1 deletion PowerThreadPool/Groups/Group.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using PowerThreadPool.Collections;
using PowerThreadPool.Results;

namespace PowerThreadPool.Groups
Expand Down Expand Up @@ -138,6 +140,19 @@ public Task<List<ExecuteResult<object>>> FetchAsync(bool removeAfterFetch = fals
}
#endif

/// <summary>
/// Fetch the work result.
/// </summary>
/// <param name="predicate">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="removeAfterFetch">remove the result from storage</param>
/// <returns>Return a list of work result</returns>
public List<ExecuteResult<TResult>> Fetch<TResult>(Func<ExecuteResult<TResult>, bool> predicate, bool removeAfterFetch = false)
{
ConcurrentSet<string> idList = (ConcurrentSet<string>)_powerPool.GetGroupMemberList(Name);
Func<ExecuteResult<TResult>, bool> predicateID = e => idList.Contains(e.ID);
return _powerPool.Fetch(predicate, predicateID, removeAfterFetch);
}

/// <summary>
/// Stop all the work belonging to the group.
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion PowerThreadPool/Works/Work.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Linq;
using System.Threading;
using PowerThreadPool.Collections;
using PowerThreadPool.Helpers;
Expand Down
106 changes: 106 additions & 0 deletions UnitTest/ControlTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,58 @@ public async void TestFetchByIDListAsyncSuspended()
}
}

[Fact]
public void TestFetchByPredicate()
{
PowerPool powerPool = new PowerPool();
string id1 = powerPool.QueueWorkItem(() =>
{
return 1;
}, new WorkOption()
{
ShouldStoreResult = true
});
string id2 = powerPool.QueueWorkItem(() =>
{
return 2;
}, new WorkOption()
{
ShouldStoreResult = true
});
string id3 = powerPool.QueueWorkItem(() =>
{
return 3;
}, new WorkOption()
{
ShouldStoreResult = true
});
string id4 = powerPool.QueueWorkItem(() =>
{
return 4;
}, new WorkOption()
{
ShouldStoreResult = true
});

powerPool.Wait();

List<ExecuteResult<int>> resList = powerPool.Fetch<int>(x => x.Result >= 3);

Assert.True(resList.Count == 2);
Assert.True(resList[0].ID == id3 || resList[0].ID == id4);
Assert.True(resList[1].ID == id3 || resList[1].ID == id4);

resList = powerPool.Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 2);
Assert.True(resList[0].ID == id3 || resList[0].ID == id4);
Assert.True(resList[1].ID == id3 || resList[1].ID == id4);

resList = powerPool.Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 0);
}

[Fact]
public void TestFetchObjByGroupObject()
{
Expand Down Expand Up @@ -2228,6 +2280,60 @@ public async void TestFetchByGroupObjectAsync()
}
}

[Fact]
public void TestFetchByPredicateByGroupObject()
{
PowerPool powerPool = new PowerPool();
string id1 = powerPool.QueueWorkItem(() =>
{
return 1;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "A",
});
string id2 = powerPool.QueueWorkItem(() =>
{
return 2;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "B",
});
string id3 = powerPool.QueueWorkItem(() =>
{
return 3;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "A",
});
string id4 = powerPool.QueueWorkItem(() =>
{
return 4;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "B",
});

powerPool.Wait();

List<ExecuteResult<int>> resList = powerPool.GetGroup("B").Fetch<int>(x => x.Result >= 3);

Assert.True(resList.Count == 1);
Assert.True(resList[0].ID == id4);

resList = powerPool.GetGroup("B").Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 1);
Assert.True(resList[0].ID == id4);

resList = powerPool.GetGroup("B").Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 0);
}

[Fact]
public void TestPauseWorkTimer()
{
Expand Down

0 comments on commit d09249b

Please sign in to comment.