Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[*] fetch by predicate #58

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions PowerThreadPool/Collections/ConcurrentSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public ConcurrentSet(IEnumerable<T> items)
/// </summary>
public int Count => _dictionary.Count;

/// <summary>
/// Checks whether the collection contains the specified value.
/// </summary>
/// <param name="value">The value to locate in the collection.</param>
/// <returns>True if the value is found in the collection; otherwise, false.</returns>
public bool Contains(T value)
{
return _dictionary.ContainsKey(value);
}

/// <summary>
/// Clear all items.
/// </summary>
Expand Down
61 changes: 55 additions & 6 deletions PowerThreadPool/Core/PowerThreadPool.Control.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using PowerThreadPool.Collections;
Expand Down Expand Up @@ -315,15 +316,19 @@ public List<ExecuteResult<TResult>> Fetch<TResult>(IEnumerable<string> idList, b
if (executeResultBase != null)
{
resultList.Add(executeResultBase.ToTypedResult<TResult>());

if (removeAfterFetch && _aliveWorkDic.TryRemove(id, out WorkBase work))
{
RemoveWorkFromGroup(work.Group, work);
}
}
else
{
workList.Add(workBase);

if (removeAfterFetch)
{
_resultDic.TryRemove(id, out _);
if (_aliveWorkDic.TryRemove(id, out WorkBase work))
{
RemoveWorkFromGroup(work.Group, work);
}
}
}
}
else
Expand Down Expand Up @@ -447,6 +452,50 @@ public Task<List<ExecuteResult<object>>> FetchAsync(IEnumerable<string> idList,
}
#endif

/// <summary>
/// Fetch the work result.
/// </summary>
/// <param name="predicate">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="removeAfterFetch">remove the result from storage</param>
/// <returns>Return a list of work result</returns>
public List<ExecuteResult<TResult>> Fetch<TResult>(Func<ExecuteResult<TResult>, bool> predicate, bool removeAfterFetch = false)
{
List<string> idList = new List<string>();

foreach (KeyValuePair<string, ExecuteResultBase> pair in _resultDic)
{
if (predicate(pair.Value.ToTypedResult<TResult>()))
{
idList.Add(pair.Value.ID);
}
}

return Fetch<TResult>(idList, removeAfterFetch);
}

/// <summary>
/// Fetch the work result.
/// </summary>
/// <param name="predicate">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="predicateID">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="removeAfterFetch">remove the result from storage</param>
/// <returns>Return a list of work result</returns>
internal List<ExecuteResult<TResult>> Fetch<TResult>(Func<ExecuteResult<TResult>, bool> predicate, Func<ExecuteResult<TResult>, bool> predicateID, bool removeAfterFetch = false)
{
List<string> idList = new List<string>();

foreach (KeyValuePair<string, ExecuteResultBase> pair in _resultDic)
{
ExecuteResult<TResult> typedResult = pair.Value.ToTypedResult<TResult>();
if (predicate(typedResult) && predicateID(typedResult))
{
idList.Add(pair.Value.ID);
}
}

return Fetch<TResult>(idList, removeAfterFetch);
}

/// <summary>
/// Stop all works
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion PowerThreadPool/Core/PowerThreadPool.Parallel.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using PowerThreadPool.Groups;
using PowerThreadPool.Options;
Expand Down
17 changes: 16 additions & 1 deletion PowerThreadPool/Groups/Group.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using PowerThreadPool.Collections;
using PowerThreadPool.Results;

namespace PowerThreadPool.Groups
Expand Down Expand Up @@ -138,6 +140,19 @@ public Task<List<ExecuteResult<object>>> FetchAsync(bool removeAfterFetch = fals
}
#endif

/// <summary>
/// Fetch the work result.
/// </summary>
/// <param name="predicate">a function to test each source element for a condition; the second parameter of the function represents the index of the source element</param>
/// <param name="removeAfterFetch">remove the result from storage</param>
/// <returns>Return a list of work result</returns>
public List<ExecuteResult<TResult>> Fetch<TResult>(Func<ExecuteResult<TResult>, bool> predicate, bool removeAfterFetch = false)
{
ConcurrentSet<string> idList = (ConcurrentSet<string>)_powerPool.GetGroupMemberList(Name);
Func<ExecuteResult<TResult>, bool> predicateID = e => idList.Contains(e.ID);
return _powerPool.Fetch(predicate, predicateID, removeAfterFetch);
}

/// <summary>
/// Stop all the work belonging to the group.
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion PowerThreadPool/Works/Work.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Linq;
using System.Threading;
using PowerThreadPool.Collections;
using PowerThreadPool.Helpers;
Expand Down
106 changes: 106 additions & 0 deletions UnitTest/ControlTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,58 @@ public async void TestFetchByIDListAsyncSuspended()
}
}

[Fact]
public void TestFetchByPredicate()
{
PowerPool powerPool = new PowerPool();
string id1 = powerPool.QueueWorkItem(() =>
{
return 1;
}, new WorkOption()
{
ShouldStoreResult = true
});
string id2 = powerPool.QueueWorkItem(() =>
{
return 2;
}, new WorkOption()
{
ShouldStoreResult = true
});
string id3 = powerPool.QueueWorkItem(() =>
{
return 3;
}, new WorkOption()
{
ShouldStoreResult = true
});
string id4 = powerPool.QueueWorkItem(() =>
{
return 4;
}, new WorkOption()
{
ShouldStoreResult = true
});

powerPool.Wait();

List<ExecuteResult<int>> resList = powerPool.Fetch<int>(x => x.Result >= 3);

Assert.True(resList.Count == 2);
Assert.True(resList[0].ID == id3 || resList[0].ID == id4);
Assert.True(resList[1].ID == id3 || resList[1].ID == id4);

resList = powerPool.Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 2);
Assert.True(resList[0].ID == id3 || resList[0].ID == id4);
Assert.True(resList[1].ID == id3 || resList[1].ID == id4);

resList = powerPool.Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 0);
}

[Fact]
public void TestFetchObjByGroupObject()
{
Expand Down Expand Up @@ -2228,6 +2280,60 @@ public async void TestFetchByGroupObjectAsync()
}
}

[Fact]
public void TestFetchByPredicateByGroupObject()
{
PowerPool powerPool = new PowerPool();
string id1 = powerPool.QueueWorkItem(() =>
{
return 1;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "A",
});
string id2 = powerPool.QueueWorkItem(() =>
{
return 2;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "B",
});
string id3 = powerPool.QueueWorkItem(() =>
{
return 3;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "A",
});
string id4 = powerPool.QueueWorkItem(() =>
{
return 4;
}, new WorkOption()
{
ShouldStoreResult = true,
Group = "B",
});

powerPool.Wait();

List<ExecuteResult<int>> resList = powerPool.GetGroup("B").Fetch<int>(x => x.Result >= 3);

Assert.True(resList.Count == 1);
Assert.True(resList[0].ID == id4);

resList = powerPool.GetGroup("B").Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 1);
Assert.True(resList[0].ID == id4);

resList = powerPool.GetGroup("B").Fetch<int>(x => x.Result >= 3, true);

Assert.True(resList.Count == 0);
}

[Fact]
public void TestPauseWorkTimer()
{
Expand Down
Loading