Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ThreadPoolScheduler #5

Merged
merged 2 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions Scripts/Format/VRMImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public static void LoadVrmAsync(VRMImporterContext ctx, ArraySegment<Byte> chunk
var schedulable = Schedulable.Create();

schedulable
.AddTask(MainThreadDispatcher.Instance.ThreadScheduler, () =>
.AddTask(Scheduler.ThreadPool, () =>
{
ctx.GLTF.baseDir = Path.GetDirectoryName(ctx.Path);
foreach (var buffer in ctx.GLTF.buffers)
Expand All @@ -573,44 +573,44 @@ public static void LoadVrmAsync(VRMImporterContext ctx, ArraySegment<Byte> chunk
}
return Unit.Default;
})
.ContinueWith(MainThreadDispatcher.Instance.ThreadScheduler, _ =>
.ContinueWith(Scheduler.ThreadPool, _ =>
{
return glTF_VRM_Material.Parse(ctx.Json);
})
.ContinueWith(MainThreadDispatcher.Instance.UnityScheduler, x =>
.ContinueWith(Scheduler.MainThread, x =>
{
// material function
ctx.CreateMaterial = VRMImporter.GetMaterialFunc(x);
})
.OnExecute(MainThreadDispatcher.Instance.UnityScheduler, parent =>
.OnExecute(Scheduler.ThreadPool, parent =>
{
// textures
for (int i = 0; i < ctx.GLTF.textures.Count; ++i)
{
var index = i;
parent.AddTask(MainThreadDispatcher.Instance.UnityScheduler,
parent.AddTask(Scheduler.MainThread,
() => gltfImporter.ImportTexture(ctx.GLTF, index))
.ContinueWith(MainThreadDispatcher.Instance.ThreadScheduler, x => ctx.Textures.Add(x));
.ContinueWith(Scheduler.ThreadPool, x => ctx.Textures.Add(x));
}
})
.ContinueWithCoroutine(MainThreadDispatcher.Instance.UnityScheduler, () => LoadMaterials(ctx))
.OnExecute(MainThreadDispatcher.Instance.UnityScheduler, parent =>
.ContinueWithCoroutine(Scheduler.MainThread, () => LoadMaterials(ctx))
.OnExecute(Scheduler.ThreadPool, parent =>
{
// meshes
for (int i = 0; i < ctx.GLTF.meshes.Count; ++i)
{
var index = i;
parent.AddTask(MainThreadDispatcher.Instance.ThreadScheduler,
parent.AddTask(Scheduler.ThreadPool,
() => gltfImporter.ReadMesh(ctx, index))
.ContinueWith(MainThreadDispatcher.Instance.UnityScheduler, x => gltfImporter.BuildMesh(ctx, x))
.ContinueWith(MainThreadDispatcher.Instance.ThreadScheduler, x => ctx.Meshes.Add(x))
.ContinueWith(Scheduler.MainThread, x => gltfImporter.BuildMesh(ctx, x))
.ContinueWith(Scheduler.ThreadPool, x => ctx.Meshes.Add(x))
;
}
})
.ContinueWithCoroutine(MainThreadDispatcher.Instance.UnityScheduler, () => LoadNodes(ctx))
.ContinueWithCoroutine(MainThreadDispatcher.Instance.UnityScheduler, () => BuildHierarchy(ctx))
.ContinueWith(MainThreadDispatcher.Instance.UnityScheduler, _ => VRMImporter.OnLoadModel(ctx))
.Subscribe(MainThreadDispatcher.Instance.UnityScheduler,
.ContinueWithCoroutine(Scheduler.MainThread, () => LoadNodes(ctx))
.ContinueWithCoroutine(Scheduler.MainThread, () => BuildHierarchy(ctx))
.ContinueWith(Scheduler.MainThread, _ => VRMImporter.OnLoadModel(ctx))
.Subscribe(Scheduler.MainThread,
_ =>
{
/*
Expand Down
39 changes: 3 additions & 36 deletions Scripts/UniTask/MainThreadDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Generic;
using UnityEngine;


namespace UniTask
{
/// <summary>
Expand All @@ -11,37 +10,6 @@ namespace UniTask
/// </summary>
public class MainThreadDispatcher : MonoBehaviour
{
StepScheduler m_unityScheduler;
/// <summary>
/// Dequeueとタスク実行がUnityのMainThread上であるこを保証する
/// </summary>
public StepScheduler UnityScheduler
{
get
{
if (m_unityScheduler == null)
{
m_unityScheduler = new StepScheduler();
}
return m_unityScheduler;
}
}

ThreadScheduler m_threadScheduler;
/// <summary>
/// Dequeuとタスク実行がWorkerThread上で実行される
/// </summary>
public ThreadScheduler ThreadScheduler
{
get
{
if (m_threadScheduler == null)
{
m_threadScheduler = new ThreadScheduler();
}
return m_threadScheduler;
}
}

[Header("Debug")]
public int TaskCount;
Expand All @@ -61,7 +29,7 @@ IEnumerable<Transform> Ancestors(Transform t)

private void Update()
{
TaskCount = UnityScheduler.UpdateAndGetTaskCount();
TaskCount = Scheduler.MainThread.UpdateAndGetTaskCount();
}

static MainThreadDispatcher instance;
Expand Down Expand Up @@ -166,10 +134,9 @@ void OnDestroy()
initialized = instance != null;
}

if (m_threadScheduler != null)
if (Scheduler.SingleWorkerThread != null)
{
m_threadScheduler.Dispose();
m_threadScheduler = null;
Scheduler.SingleWorkerThread.Dispose();
}
}

Expand Down
6 changes: 4 additions & 2 deletions Scripts/UniTask/Scheduler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace UniTask
using System;

namespace UniTask
{
public interface IScheduler
public interface IScheduler : IDisposable
{
void Enqueue(TaskChain item);
}
Expand Down
52 changes: 37 additions & 15 deletions Scripts/UniTask/StepScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,30 +1,52 @@
namespace UniTask
{
public class StepScheduler : IScheduler
public static partial class Scheduler
{
LockQueue<TaskChain> m_taskQueue = new LockQueue<TaskChain>();
public void Enqueue(TaskChain item)
private static StepScheduler mainThread;

public static StepScheduler MainThread
{
m_taskQueue.Enqueue(item);
get
{
if (mainThread != null) return mainThread;
mainThread = new StepScheduler();
MainThreadDispatcher.Initialize();
return mainThread;
}
}

TaskChain m_chain;
public int UpdateAndGetTaskCount()
public class StepScheduler : IScheduler
{
if (m_chain != null)
LockQueue<TaskChain> m_taskQueue = new LockQueue<TaskChain>();

public void Enqueue(TaskChain item)
{
var status=m_chain.Next();
if(status==ExecutionStatus.Continue)
m_taskQueue.Enqueue(item);
}

TaskChain m_chain;

public int UpdateAndGetTaskCount()
{
if (m_chain != null)
{
// m_item継続中
return m_taskQueue.Count;
var status = m_chain.Next();
if (status == ExecutionStatus.Continue)
{
// m_item継続中
return m_taskQueue.Count;
}
m_chain = null;
}
m_chain = null;

int count;
m_chain = m_taskQueue.Dequeue(out count);
return count;
}

int count;
m_chain = m_taskQueue.Dequeue(out count);
return count;
public void Dispose()
{
}
}
}
}
2 changes: 1 addition & 1 deletion Scripts/UniTask/TaskChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static TaskChain Schedule(ISchedulable schedulable, Action<Exception> onE
if (item.Enumerator.Current.Schedulder == null)
{
// default
MainThreadDispatcher.Instance.UnityScheduler.Enqueue(item);
Scheduler.MainThread.Enqueue(item);
}
else
{
Expand Down
42 changes: 42 additions & 0 deletions Scripts/UniTask/ThreadPoolScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;

namespace UniTask
{
public static partial class Scheduler
{
private static IScheduler threadPool;

public static IScheduler ThreadPool
{
get { return threadPool ?? (threadPool = new ThreadPoolScheduler()); }
}

public class ThreadPoolScheduler : IScheduler
{
public void Enqueue(TaskChain item)
{
System.Threading.ThreadPool.QueueUserWorkItem(_ =>
{
if (item == null)
{
return;
}

while (true)
{
var status = item.Next();
if (status != ExecutionStatus.Continue)
{
break;
}
}

});
}

public void Dispose()
{
}
}
}
}
Loading