Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for submitted threads to end before terminating a command line procedure - new implementation #580

Merged
merged 5 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dotnet/src/dotnetframework/GxClasses/Core/GXApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,8 @@ public virtual short GetHttpSecure()
{
try
{
if (HttpContext==null)
return 0;
if (_HttpContext.Request.GetIsSecureFrontEnd())
{
GXLogging.Debug(log, "Front-End-Https header activated");
Expand Down Expand Up @@ -3414,7 +3416,7 @@ public IGxSession GetSession()
}

internal bool IsStandalone => this._session is GxSession || this._isSumbited || this.HttpContext == null;

internal bool IsSubmited => this._isSumbited;
internal void SetSession(IGxSession value)
{
if (value != null)
Expand Down
67 changes: 66 additions & 1 deletion dotnet/src/dotnetframework/GxClasses/Core/GXUtilsCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
using GeneXus.Services;
using GeneXus.Http;
using System.Security;
using System.Threading.Tasks;
using System.Drawing.Imaging;

namespace GeneXus.Utils
Expand All @@ -56,7 +57,6 @@ public class GxDefaultProps
public static string WORKSTATION = "GX_WRKST";

}

public class ThreadSafeRandom
{
private static RNGCryptoServiceProvider random = new RNGCryptoServiceProvider();
Expand All @@ -67,6 +67,8 @@ public static double NextDouble()
byte[] bytes = new Byte[8];
random.GetBytes(bytes);



ulong ul = BitConverter.ToUInt64(bytes, 0) / (1 << 11);
double d = ul / (double)(1UL << 53);
return d;
Expand Down Expand Up @@ -5740,5 +5742,68 @@ public static string EncodeNonAsciiCharacters(string value)
return sb.ToString();
}
}
internal class ThreadUtil
{
static readonly ILog log = log4net.LogManager.GetLogger(typeof(ThreadUtil));
private static ConcurrentDictionary<Guid, ManualResetEvent> events = new ConcurrentDictionary<Guid, ManualResetEvent>();
const int MAX_WAIT_HANDLES = 64;
internal static void Submit(WaitCallback callbak, object state)
{
try
{
ManualResetEvent resetEvent = new ManualResetEvent(false);
Guid eventGuid = Guid.NewGuid();
ThreadPool.QueueUserWorkItem(
arg =>
{
callbak(state);
resetEvent.Set();
events.TryRemove(eventGuid, out ManualResetEvent _);

});
events[eventGuid]= resetEvent;
}
catch (Exception ex)
{
GXLogging.Error(log, $"Submit error", ex);
}
}
internal static void WaitForEnd()
{
int remainingSubmits = events.Count;
if (remainingSubmits > 0)
{
GXLogging.Debug(log, "Waiting for " + remainingSubmits + " submitted procs to end...");
WaitForAll();
events.Clear();
}
}
public static void WaitForAll()
{
try
{
List<ManualResetEvent> evtList =new List<ManualResetEvent> ();
foreach (ManualResetEvent evt in events.Values)
{
evtList.Add(evt);
//Avoid WaitHandle.WaitAll limitation. It can only handle 64 waithandles
if (evtList.Count == MAX_WAIT_HANDLES)
{
WaitHandle.WaitAll(evtList.ToArray());
evtList.Clear();
}
}
if (evtList.Count>0)
{
WaitHandle.WaitAll(evtList.ToArray());
evtList.Clear();
}
}
catch (Exception ex)
{
GXLogging.Error(log, $"WaitForAll pending threads error", ex);
}
}
}

}
6 changes: 6 additions & 0 deletions dotnet/src/dotnetframework/GxClasses/Model/GXWebProcedure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace GeneXus.Procedure
using GeneXus.Http;
using System.Threading;
using GeneXus.Mime;
using GeneXus.Utils;

public class GXWebProcedure : GXHttpHandler
{
Expand Down Expand Up @@ -191,5 +192,10 @@ protected static WaitCallback PropagateCulture(WaitCallback action)
{
return GXProcedure.PropagateCulture(action);
}
protected void Submit(Action<object> executeMethod, object state)
{
ThreadUtil.Submit(PropagateCulture(new WaitCallback(executeMethod)), state);
}

}
}
7 changes: 7 additions & 0 deletions dotnet/src/dotnetframework/GxClasses/Model/gxproc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public bool DisconnectAtCleanup
get{ return disconnectUserAtCleanup;}
set{ disconnectUserAtCleanup=value;}
}
protected void Submit(Action<object> executeMethod, object state)
{
ThreadUtil.Submit(PropagateCulture(new WaitCallback(executeMethod)), state);
}
public static WaitCallback PropagateCulture(WaitCallback action)
{
var currentCulture = Thread.CurrentThread.CurrentCulture;
Expand All @@ -87,6 +91,9 @@ protected void exitApplication()
}
private void exitApplication(bool flushBatchCursor)
{
if (IsMain && !(context as GxContext).IsSubmited)
ThreadUtil.WaitForEnd();

if (flushBatchCursor)
{
foreach (IGxDataStore ds in context.DataStores)
Expand Down
2 changes: 1 addition & 1 deletion dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net6.0</TargetFrameworks>
<NoWarn>CS8032;1701;1702;NU1701</NoWarn>
Expand Down
88 changes: 88 additions & 0 deletions dotnet/test/DotNetCoreUnitTest/Submit/SubmitTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using System.Threading;
using GeneXus.Application;
using GeneXus.Procedure;
using GeneXus.Utils;
using Lucene.Net.Support;
using Xunit;
using System.Collections.Concurrent;


namespace UnitTesting
{
public class SubmitTest
{
internal static ConcurrentDictionary<Guid, int> numbers = new ConcurrentDictionary<Guid, int>();
[Fact]
public void SubmitAll()
{
MainProc proc = new MainProc();
proc.execute();
}
}
public class MainProc : GXProcedure
{
const int THREAD_NUMBER = 2000;
public MainProc()
{
context = new GxContext();
IsMain = true;
}
public void execute()
{
initialize();
executePrivate();
}

void executePrivate()
{
for (int i = 0; i < THREAD_NUMBER; i++)
{
SubmitProc proc = new SubmitProc();
proc.executeSubmit();
}
cleanup();
Assert.Equal(THREAD_NUMBER, SubmitTest.numbers.Count);
}
public override void cleanup()
{
ExitApp();
}

public override void initialize()
{

}
}
public class SubmitProc : GXProcedure
{
public SubmitProc()
{
context = new GxContext();
}
public void executeSubmit()
{
SubmitProc submitProc;
submitProc = new SubmitProc();
submitProc.context.SetSubmitInitialConfig(context);
submitProc.initialize();
Submit(executePrivateCatch, submitProc);
}
void executePrivateCatch(object stateInfo)
{
((SubmitProc)stateInfo).executePrivate();
}
void executePrivate()
{
int millisecondsToWait = (int)ThreadSafeRandom.NextDouble()*10;
Thread.Sleep(millisecondsToWait);
SubmitTest.numbers[Guid.NewGuid()]=Thread.CurrentThread.ManagedThreadId;
}

public override void initialize()
{

}
}
}