diff --git a/std/parallelism.d b/std/parallelism.d index 51e37268311..27c0da090a5 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -74,23 +74,11 @@ License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0) */ module std.parallelism; -import core.atomic; -import core.exception; -import core.memory; -import core.sync.condition; -import core.thread; - -import std.algorithm; -import std.conv; -import std.exception; -import std.functional; -import std.math; -import std.meta; -import std.range; -import std.traits; -import std.typecons; - -version(OSX) +import core.thread; // : Thread; + +import std.traits; // : ReturnType; + +/*version(OSX) { version = useSysctlbyname; } @@ -101,7 +89,8 @@ else version(FreeBSD) else version(NetBSD) { version = useSysctlbyname; -} +}*/ +version = useSysctlbyname; version(Windows) @@ -110,6 +99,8 @@ version(Windows) shared static this() { import core.sys.windows.windows; + import core.sys.windows.winbase; + import std.algorithm.comparison : max; SYSTEM_INFO si; GetSystemInfo(&si); @@ -117,26 +108,26 @@ version(Windows) } } -else version(linux) -{ +//else version(linux) +//{ import core.sys.posix.unistd; shared static this() { totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); } -} -else version(Solaris) -{ +//} +//else version(Solaris) +//{ import core.sys.posix.unistd; shared static this() { totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); } -} -else version(useSysctlbyname) -{ +//} +//else version(useSysctlbyname) +//{ extern(C) int sysctlbyname( const char *, void *, size_t *, void *, size_t ); @@ -151,10 +142,10 @@ else version(useSysctlbyname) { auto nameStr = "hw.ncpu\0".ptr; } - else version(NetBSD) - { + //else version(NetBSD) + //{ auto nameStr = "hw.ncpu\0".ptr; - } + //} uint ans; size_t len = uint.sizeof; @@ -162,9 +153,9 @@ else version(useSysctlbyname) totalCPUs = ans; } -} -else -{ +//} +//else +version(none){ static assert(0, "Don't know how to get N CPUs on this OS."); } @@ -198,13 +189,14 @@ shared static this() private void atomicSetUbyte(T)(ref T stuff, T newVal) if (__traits(isIntegral, T) && is(T : ubyte)) { - //core.atomic.cas(cast(shared) &stuff, stuff, newVal); + import core.atomic : atomicStore; atomicStore(*(cast(shared) &stuff), newVal); } private ubyte atomicReadUbyte(T)(ref T val) if (__traits(isIntegral, T) && is(T : ubyte)) { + import core.atomic : atomicLoad; return atomicLoad(*(cast(shared) &val)); } @@ -213,12 +205,16 @@ if (__traits(isIntegral, T) && is(T : ubyte)) private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) if (__traits(isIntegral, T) && is(T : ubyte)) { - return core.atomic.cas(cast(shared) &stuff, testVal, newVal); + import core.atomic : cas; + return cas(cast(shared) &stuff, testVal, newVal); } /*--------------------- Generic helper functions, etc.------------------------*/ private template MapType(R, functions...) { + import std.functional : adjoin, unaryFun; + import std.meta : staticMap; + import std.range.primitives : ElementType; static assert(functions.length); ElementType!R e = void; @@ -233,6 +229,7 @@ private template ReduceType(alias fun, R, E) private template noUnsharedAliasing(T) { + import std.traits : hasUnsharedAliasing; enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; } @@ -241,6 +238,9 @@ private template noUnsharedAliasing(T) // requirement for executing it via a TaskPool. (See isSafeReturn). private template isSafeTask(F) { + import std.meta : allSatisfy; + import std.traits : functionAttributes, FunctionAttribute, + hasUnsharedAliasing, Parameters, isFunctionPointer; enum bool isSafeTask = (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && (functionAttributes!F & FunctionAttribute.ref_) == 0 && @@ -274,6 +274,7 @@ unittest // since they can't read global state. private template isSafeReturn(T) { + import std.traits : hasUnsharedAliasing; static if (!hasUnsharedAliasing!(T.ReturnType)) { enum isSafeReturn = true; @@ -290,6 +291,7 @@ private template isSafeReturn(T) private template randAssignable(R) { + import std.range.primitives : hasAssignableElements, isRandomAccessRange; enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; } @@ -309,6 +311,7 @@ private template AliasReturn(alias fun, T...) // and won't work w/ private. template reduceAdjoin(functions...) { + import std.functional : binaryFun; static if (functions.length == 1) { alias reduceAdjoin = binaryFun!(functions[0]); @@ -317,6 +320,7 @@ template reduceAdjoin(functions...) { T reduceAdjoin(T, U)(T lhs, U rhs) { + import std.meta : staticMap; alias funs = staticMap!(binaryFun, functions); foreach (i, Unused; typeof(lhs.expand)) @@ -331,6 +335,7 @@ template reduceAdjoin(functions...) private template reduceFinish(functions...) { + import std.functional : binaryFun; static if (functions.length == 1) { alias reduceFinish = binaryFun!(functions[0]); @@ -339,6 +344,7 @@ private template reduceFinish(functions...) { T reduceFinish(T)(T lhs, T rhs) { + import std.meta : staticMap; alias funs = staticMap!(binaryFun, functions); foreach (i, Unused; typeof(lhs.expand)) @@ -431,6 +437,9 @@ Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the */ struct Task(alias fun, Args...) { + import std.meta : allSatisfy; + import std.traits : isAssignable; + AbstractTask base = {runTask : &impl}; alias base this; @@ -482,6 +491,8 @@ struct Task(alias fun, Args...) // TaskPool from @safe code. See isSafeReturn. static if (__traits(isSame, fun, run)) { + import std.traits : isFunctionPointer, functionAttributes, + FunctionAttribute; static if (isFunctionPointer!(_args[0])) { private enum bool isPure = @@ -537,6 +548,7 @@ struct Task(alias fun, Args...) private void enforcePool() { + import std.exception : enforce; enforce(this.pool !is null, "Job not submitted yet."); } @@ -685,10 +697,10 @@ struct Task(alias fun, Args...) if (job !is null) { - version(verboseUnittest) - { + //version(verboseUnittest) + //{ stderr.writeln("Doing workForce work."); - } + //} pool.doJob(job); @@ -706,10 +718,10 @@ struct Task(alias fun, Args...) } else { - version(verboseUnittest) - { + //version(verboseUnittest) + //{ stderr.writeln("Yield from workForce."); - } + //} return yieldForce; } @@ -1013,6 +1025,9 @@ Occasionally it is useful to explicitly instantiate a $(D TaskPool): final class TaskPool { private: + import core.sync.condition : Condition, Mutex; + import std.range.primitives : isInputRange; + import std.traits : isArray, Parameters; // A pool can either be a regular pool or a single-task pool. A // single-task pool is a dummy pool that's fired up for @@ -1189,6 +1204,7 @@ private: } out { + import std.conv : text; assert(tail.prev !is tail); assert(tail.next is null, text(tail.prev, '\t', tail.next)); if (tail.prev !is null) @@ -1403,6 +1419,7 @@ public: // as public API. size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow { + import std.algorithm.comparison : max; if (this.size == 0) { return rangeLen; @@ -1529,6 +1546,7 @@ public: */ ParallelForeach!R parallel(R)(R range, size_t workUnitSize) { + import std.exception : enforce; enforce(workUnitSize > 0, "workUnitSize must be > 0."); alias RetType = ParallelForeach!R; return RetType(this, range, workUnitSize); @@ -1538,6 +1556,7 @@ public: /// Ditto ParallelForeach!R parallel(R)(R range) { + import std.range.primitives : hasLength; static if (hasLength!R) { // Default work unit size is such that we would use 4x as many @@ -1556,6 +1575,8 @@ public: /// template amap(functions...) { + import std.range.primitives : isRandomAccessRange; + /** Eager parallel map. The eagerness of this function means it has less overhead than the lazily evaluated $(D TaskPool.map) and should be @@ -1642,6 +1663,13 @@ public: auto amap(Args...)(Args args) if (isRandomAccessRange!(Args[0])) { + import std.array : uninitializedArray; + import std.conv : emplaceRef, text; + import std.exception : enforce; + import std.functional : adjoin, unaryFun; + import std.meta : staticMap; + import std.range.primitives : ElementType; + import std.traits : isIntegral; alias fun = adjoin!(staticMap!(unaryFun, functions)); alias range = args[0]; @@ -1708,6 +1736,10 @@ public: void doIt() { + import core.atomic : atomicLoad, atomicStore, atomicOp; + import std.array : front, popFront; + import std.algorithm.comparison : min; + import std.range.primitives : hasSlicing; scope(failure) { // If an exception is thrown, all threads should bail. @@ -1753,6 +1785,9 @@ public: /// template map(functions...) { + import std.range.primitives : hasLength, isInputRange, + isRandomAccessRange; + /** A semi-lazy parallel map that can be used for pipelining. The map functions are evaluated for the first $(D bufSize) elements and stored in a @@ -1826,6 +1861,10 @@ public: map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) if (isInputRange!S) { + import std.algorithm.comparison : min; + import std.exception : enforce; + import std.functional : adjoin, staticMap, unaryFun; + import std.range.primitives : ElementType, hasLength, isInfinite; enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, "Work unit size must be smaller than buffer size."); alias fun = adjoin!(staticMap!(unaryFun, functions)); @@ -1892,6 +1931,7 @@ public: // No need to copy element by element. FromType dumpToFrom() { + import std.algorithm.mutation : swap; assert(source.buf1.length <= from.length); from.length = source.buf1.length; swap(source.buf1, from); @@ -1976,6 +2016,7 @@ public: // case. E[] fillBuf(E[] buf) { + import std.range : take; static if (isRandomAccessRange!S) { auto toMap = take(source, buf.length); @@ -2069,7 +2110,7 @@ public: } } - static if (std.range.isInfinite!S) + static if (isInfinite!S) { enum bool empty = false; } @@ -2132,6 +2173,7 @@ public: */ auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) { + import std.range.primitives : ElementType, hasLength, isInfinite; static final class AsyncBuf { // This is a class because the task and source both need to be on @@ -2249,7 +2291,7 @@ public: } } - static if (std.range.isInfinite!S) + static if (isInfinite!S) { enum bool empty = false; } @@ -2429,6 +2471,11 @@ public: */ auto reduce(Args...)(Args args) { + import std.array : empty, front, popFront; + import std.exception : enforce; + import std.range.primitives : isInputRange; + import std.traits : isIntegral; + alias fun = reduceAdjoin!functions; alias finishFun = reduceFinish!functions; @@ -2446,6 +2493,8 @@ public: auto makeStartValue(Type)(Type e) { + import std.conv : emplaceRef; + import std.functional : adjoin, binaryFun, staticMap; static if (functions.length == 1) { return e; @@ -2502,6 +2551,7 @@ public: // since we're assuming functions are associative anyhow. // This is so that loops can be unrolled automatically. + import std.meta : AliasSeq; enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); enum nILP = ilpTuple.length; immutable subSize = (upperBound - lowerBound) / nILP; @@ -2605,6 +2655,8 @@ public: immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; import core.stdc.stdlib; + import core.exception : OutOfMemoryError; + import std.conv : emplaceRef; if (nBytesNeeded < maxStack) { tasks = (cast(RTask*) buf.ptr)[0..nWorkUnits]; @@ -2643,6 +2695,7 @@ public: size_t curPos = 0; void useTask(ref RTask task) { + import std.algorithm.comparison : min; task.pool = this; task._args[0] = scopedAddress(&reduceOnRange); task._args[3] = min(len, curPos + workUnitSize); // upper bound. @@ -2837,6 +2890,7 @@ public: void initialize(TaskPool pool) { + import core.memory : GC; this.pool = pool; size = pool.size + 1; stillThreadLocal = new bool; @@ -2867,6 +2921,7 @@ public: ref T opIndex(size_t index) { + import std.conv : text; assert(index < size, text(index, '\t', uint.max)); return *(cast(T*) (data + elemSize * index)); } @@ -3183,6 +3238,7 @@ public: @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) if (isSafeReturn!(typeof(*task))) { + import std.exception : enforce; enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); put(*task); } @@ -3232,7 +3288,8 @@ public: */ int priority() @property @trusted { - return (size == 0) ? core.thread.Thread.PRIORITY_MIN : + //import core.thread : Thread; + return (size == 0) ? Thread.PRIORITY_MIN : pool[0].priority; } @@ -3270,6 +3327,7 @@ terminating the main thread. private shared uint _defaultPoolThreads; shared static this() { + import core.atomic : atomicStore; atomicStore(_defaultPoolThreads, totalCPUs - 1); } @@ -3281,12 +3339,14 @@ number of worker threads in the instance returned by $(D taskPool). */ @property uint defaultPoolThreads() @trusted { + import core.atomic : atomicLoad; return atomicLoad(_defaultPoolThreads); } /// Ditto @property void defaultPoolThreads(uint newVal) @trusted { + import core.atomic : atomicStore; atomicStore(_defaultPoolThreads, newVal); } @@ -3333,6 +3393,7 @@ class ParallelForeachError : Error /*------Structs that implement opApply for parallel foreach.------------------*/ private template randLen(R) { + import std.range.primitives : hasLength, isRandomAccessRange; enum randLen = isRandomAccessRange!R && hasLength!R; } @@ -3346,6 +3407,7 @@ private void submitAndExecute( alias PTask = typeof(scopedTask(doIt)); import core.stdc.stdlib; import core.stdc.string : memcpy; + import core.exception : OutOfMemoryError; // The logical thing to do would be to just use alloca() here, but that // causes problems on Windows for reasons that I don't understand @@ -3453,6 +3515,7 @@ void foreachErr() int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) { + import std.traits : Parameters; with(p) { int res = 0; @@ -3498,6 +3561,9 @@ int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) private enum string parallelApplyMixinRandomAccess = q{ // Handle empty thread pool as special case. + import core.atomic : atomicLoad, atomicStore, atomicOp; + import std.algorithm.comparison : min; + import std.traits : Parameters; if (pool.size == 0) { return doSizeZeroCase(this, dg); @@ -3553,6 +3619,13 @@ private enum string parallelApplyMixinRandomAccess = q{ enum string parallelApplyMixinInputRange = q{ // Handle empty thread pool as special case. + import core.atomic : atomicLoad, atomicStore, atomicOp; + import core.sync.condition : Condition, Mutex; + import std.array : empty, front, popFront; + import std.algorithm.mutation : swap; + import std.array : uninitializedArray; + import std.traits : Parameters; + if (pool.size == 0) { return doSizeZeroCase(this, dg); @@ -3766,6 +3839,8 @@ private void addToChain( private struct ParallelForeach(R) { + import std.range.primitives : ElementType, hasLvalueElements; + TaskPool pool; R range; size_t workUnitSize; @@ -3816,6 +3891,7 @@ instance and forwards it to the input range overload of asyncBuf. */ private struct RoundRobinBuffer(C1, C2) { + import std.traits : Parameters; // No need for constraints because they're already checked for in asyncBuf. alias Array = Parameters!(C1.init)[0]; @@ -3885,18 +3961,26 @@ private struct RoundRobinBuffer(C1, C2) } } -version(unittest) -{ +//version(unittest) +//{ // This was the only way I could get nested maps to work. __gshared TaskPool poolInstance; import std.stdio; -} +//} // These test basic functionality but don't stress test for threading bugs. // These are the tests that should be run every time Phobos is compiled. unittest { + static import std.algorithm.iteration; + import std.algorithm.comparison : equal, max, min; + import std.conv : text; + import std.exception : assertThrown; + import std.math : approxEqual, log, sqrt; + import std.range : iota, indexed; + import std.typecons : tuple, Tuple; + poolInstance = new TaskPool(2); scope(exit) poolInstance.stop(); @@ -4008,7 +4092,7 @@ unittest assert(addScopedTask.yieldForce == 3); // Test parallel foreach with non-random access range. - auto range = filter!"a != 666"([0, 1, 2, 3, 4]); + auto range = std.algorithm.iteration.filter!"a != 666"([0, 1, 2, 3, 4]); foreach (i, elem; poolInstance.parallel(range)) { @@ -4041,12 +4125,12 @@ unittest // Test amap with a non-array buffer. auto toIndex = new int[5]; - auto indexed = std.range.indexed(toIndex, [3, 1, 4, 0, 2]); - poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], indexed); - assert(equal(indexed, [2, 4, 6, 8, 10])); + auto indexedRslt = indexed(toIndex, [3, 1, 4, 0, 2]); + poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], indexedRslt); + assert(equal(indexedRslt, [2, 4, 6, 8, 10])); assert(equal(toIndex, [8, 4, 10, 2, 6])); - poolInstance.amap!"a / 2"(indexed, indexed); - assert(equal(indexed, [1, 2, 3, 4, 5])); + poolInstance.amap!"a / 2"(indexedRslt, indexedRslt); + assert(equal(indexedRslt, [1, 2, 3, 4, 5])); assert(equal(toIndex, [4, 2, 5, 1, 3])); auto buf = new int[5]; @@ -4063,7 +4147,7 @@ unittest assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == tuple(10, 24)); - immutable serialAns = std.algorithm.reduce!"a + b"(iota(1000)); + immutable serialAns = std.algorithm.iteration.reduce!"a + b"(iota(1000)); assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); @@ -4082,6 +4166,7 @@ unittest // Test finish() { + import core.thread : dur; static void slowFun() { Thread.sleep(dur!"msecs"(1)); } auto pool1 = new TaskPool(); @@ -4124,34 +4209,35 @@ unittest assert(equal( poolInstance.map!"a * a"(iota(30_000_001), 10_000), - std.algorithm.map!"a * a"(iota(30_000_001)) + std.algorithm.iteration.map!"a * a"(iota(30_000_001)) )); // The filter is to kill random access and test the non-random access // branch. assert(equal( poolInstance.map!"a * a"( - filter!"a == a"(iota(30_000_001) + std.algorithm.iteration.filter!"a == a"(iota(30_000_001) ), 10_000, 1000), - std.algorithm.map!"a * a"(iota(30_000_001)) + std.algorithm.iteration.map!"a * a"(iota(30_000_001)) )); assert( - reduce!"a + b"(0UL, + std.algorithm.iteration.reduce!"a + b"(0UL, poolInstance.map!"a * a"(iota(3_000_001), 10_000) ) == - reduce!"a + b"(0UL, - std.algorithm.map!"a * a"(iota(3_000_001)) + std.algorithm.iteration.reduce!"a + b"(0UL, + std.algorithm.iteration.map!"a * a"(iota(3_000_001)) ) ); assert(equal( iota(1_000_002), - poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) + poolInstance.asyncBuf(std.algorithm.iteration.filter!"a == a"(iota(1_000_002))) )); { import std.file : deleteme; + import std.conv : to; string temp_file = deleteme ~ "-tempDelMe.txt"; auto file = File(temp_file, "wb"); @@ -4165,6 +4251,7 @@ unittest auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; foreach (row; written) { + import std.array : join; file.writeln(join(to!(string[])(row), "\t")); } @@ -4182,6 +4269,7 @@ unittest foreach (line; asyncReader) { + import std.array : split; if (line.length == 0) continue; auto ls = line.split("\t"); read ~= to!(double[])(ls); @@ -4295,10 +4383,12 @@ unittest // These are more like stress tests than real unit tests. They print out // tons of stuff and should not be run every time make unittest is run. -version(parallelismStressTest) -{ +//version(parallelismStressTest) +//{ unittest { + static import std.algorithm.iteration; + import std.range : iota; size_t attempt; for (; attempt < 10; attempt++) foreach (poolSize; [0, 4]) @@ -4322,7 +4412,7 @@ version(parallelismStressTest) stderr.writeln("Done creating nums."); - auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); + auto myNumbers = std.algorithm.iteration.filter!"a % 7 > 0"( iota(0, 1000)); foreach (num; poolInstance.parallel(myNumbers)) { assert(num % 7 > 0 && num < 1000); @@ -4337,7 +4427,7 @@ version(parallelismStressTest) } stderr.writeln("Done squares."); - auto sumFuture = task!( reduce!"a + b" )(numbers); + auto sumFuture = task!( std.algorithm.iteration.reduce!"a + b" )(numbers); poolInstance.put(sumFuture); ulong sumSquares = 0; @@ -4382,6 +4472,12 @@ version(parallelismStressTest) // as examples. unittest { + import std.range : iota; + import std.conv : text; + import core.atomic : atomicOp; + import std.math : approxEqual, sqrt; + static import std.algorithm.iteration; + foreach (attempt; 0..10) foreach (poolSize; [0, 4]) { @@ -4397,7 +4493,7 @@ version(parallelismStressTest) { workerLocalStorage.get++; } - assert(reduce!"a + b"(workerLocalStorage.toRange) == + assert(std.algorithm.iteration.reduce!"a + b"(workerLocalStorage.toRange) == 1_000_000 + poolInstance.size + 1); // Make sure work is reasonably balanced among threads. This test is @@ -4482,6 +4578,7 @@ version(parallelismStressTest) stderr.writeln("Done sum of square roots."); // Test whether tasks work with function pointers. + bool isNaN(double v) { static import std.math; return std.math.isNaN(v); } auto nanTask = task(&isNaN, 1.0L); poolInstance.put(nanTask); assert(nanTask.spinForce == false); @@ -4513,6 +4610,7 @@ version(parallelismStressTest) int[] nums = [1,2,3,4,5]; static struct RemoveRandom { + import std.array : empty, front, popFront; int[] arr; ref int front() @@ -4540,7 +4638,7 @@ version(parallelismStressTest) poolInstance.stop(); } } -} +//} version(unittest) {