From 5246ed6f6fc53400113c24c306dac13b1aaa1135 Mon Sep 17 00:00:00 2001 From: Robert burner Schadek Date: Thu, 2 Jun 2016 00:19:40 +0200 Subject: [PATCH 1/2] parallelism with selective scoped imports added missing import for windows joakim-noah fix --- std/parallelism.d | 144 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 115 insertions(+), 29 deletions(-) diff --git a/std/parallelism.d b/std/parallelism.d index 51e37268311..72b80bc4e13 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -74,21 +74,9 @@ License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0) */ module std.parallelism; -import core.atomic; -import core.exception; -import core.memory; -import core.sync.condition; -import core.thread; - -import std.algorithm; -import std.conv; -import std.exception; -import std.functional; -import std.math; -import std.meta; -import std.range; -import std.traits; -import std.typecons; +import core.thread; // : Thread; + +import std.traits; // : ReturnType; version(OSX) { @@ -110,6 +98,7 @@ version(Windows) shared static this() { import core.sys.windows.windows; + import std.algorithm.comparison : max; SYSTEM_INFO si; GetSystemInfo(&si); @@ -198,13 +187,14 @@ shared static this() private void atomicSetUbyte(T)(ref T stuff, T newVal) if (__traits(isIntegral, T) && is(T : ubyte)) { - //core.atomic.cas(cast(shared) &stuff, stuff, newVal); + import core.atomic : atomicStore; atomicStore(*(cast(shared) &stuff), newVal); } private ubyte atomicReadUbyte(T)(ref T val) if (__traits(isIntegral, T) && is(T : ubyte)) { + import core.atomic : atomicLoad; return atomicLoad(*(cast(shared) &val)); } @@ -213,12 +203,16 @@ if (__traits(isIntegral, T) && is(T : ubyte)) private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) if (__traits(isIntegral, T) && is(T : ubyte)) { - return core.atomic.cas(cast(shared) &stuff, testVal, newVal); + import core.atomic : cas; + return cas(cast(shared) &stuff, testVal, newVal); } /*--------------------- Generic helper functions, etc.------------------------*/ private template MapType(R, functions...) { + import std.functional : adjoin, unaryFun; + import std.meta : staticMap; + import std.range.primitives : ElementType; static assert(functions.length); ElementType!R e = void; @@ -233,6 +227,7 @@ private template ReduceType(alias fun, R, E) private template noUnsharedAliasing(T) { + import std.traits : hasUnsharedAliasing; enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; } @@ -241,6 +236,9 @@ private template noUnsharedAliasing(T) // requirement for executing it via a TaskPool. (See isSafeReturn). private template isSafeTask(F) { + import std.meta : allSatisfy; + import std.traits : functionAttributes, FunctionAttribute, + hasUnsharedAliasing, Parameters, isFunctionPointer; enum bool isSafeTask = (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && (functionAttributes!F & FunctionAttribute.ref_) == 0 && @@ -274,6 +272,7 @@ unittest // since they can't read global state. private template isSafeReturn(T) { + import std.traits : hasUnsharedAliasing; static if (!hasUnsharedAliasing!(T.ReturnType)) { enum isSafeReturn = true; @@ -290,6 +289,7 @@ private template isSafeReturn(T) private template randAssignable(R) { + import std.range.primitives : hasAssignableElements, isRandomAccessRange; enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; } @@ -309,6 +309,7 @@ private template AliasReturn(alias fun, T...) // and won't work w/ private. template reduceAdjoin(functions...) { + import std.functional : binaryFun; static if (functions.length == 1) { alias reduceAdjoin = binaryFun!(functions[0]); @@ -317,6 +318,7 @@ template reduceAdjoin(functions...) { T reduceAdjoin(T, U)(T lhs, U rhs) { + import std.meta : staticMap; alias funs = staticMap!(binaryFun, functions); foreach (i, Unused; typeof(lhs.expand)) @@ -331,6 +333,7 @@ template reduceAdjoin(functions...) private template reduceFinish(functions...) { + import std.functional : binaryFun; static if (functions.length == 1) { alias reduceFinish = binaryFun!(functions[0]); @@ -339,6 +342,7 @@ private template reduceFinish(functions...) { T reduceFinish(T)(T lhs, T rhs) { + import std.meta : staticMap; alias funs = staticMap!(binaryFun, functions); foreach (i, Unused; typeof(lhs.expand)) @@ -431,6 +435,9 @@ Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the */ struct Task(alias fun, Args...) { + import std.meta : allSatisfy; + import std.traits : isAssignable; + AbstractTask base = {runTask : &impl}; alias base this; @@ -482,6 +489,8 @@ struct Task(alias fun, Args...) // TaskPool from @safe code. See isSafeReturn. static if (__traits(isSame, fun, run)) { + import std.traits : isFunctionPointer, functionAttributes, + FunctionAttribute; static if (isFunctionPointer!(_args[0])) { private enum bool isPure = @@ -537,6 +546,7 @@ struct Task(alias fun, Args...) private void enforcePool() { + import std.exception : enforce; enforce(this.pool !is null, "Job not submitted yet."); } @@ -1013,6 +1023,9 @@ Occasionally it is useful to explicitly instantiate a $(D TaskPool): final class TaskPool { private: + import core.sync.condition : Condition, Mutex; + import std.range.primitives : isInputRange; + import std.traits : isArray, Parameters; // A pool can either be a regular pool or a single-task pool. A // single-task pool is a dummy pool that's fired up for @@ -1189,6 +1202,7 @@ private: } out { + import std.conv : text; assert(tail.prev !is tail); assert(tail.next is null, text(tail.prev, '\t', tail.next)); if (tail.prev !is null) @@ -1403,6 +1417,7 @@ public: // as public API. size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow { + import std.algorithm.comparison : max; if (this.size == 0) { return rangeLen; @@ -1529,6 +1544,7 @@ public: */ ParallelForeach!R parallel(R)(R range, size_t workUnitSize) { + import std.exception : enforce; enforce(workUnitSize > 0, "workUnitSize must be > 0."); alias RetType = ParallelForeach!R; return RetType(this, range, workUnitSize); @@ -1538,6 +1554,7 @@ public: /// Ditto ParallelForeach!R parallel(R)(R range) { + import std.range.primitives : hasLength; static if (hasLength!R) { // Default work unit size is such that we would use 4x as many @@ -1556,6 +1573,8 @@ public: /// template amap(functions...) { + import std.range.primitives : isRandomAccessRange; + /** Eager parallel map. The eagerness of this function means it has less overhead than the lazily evaluated $(D TaskPool.map) and should be @@ -1642,6 +1661,13 @@ public: auto amap(Args...)(Args args) if (isRandomAccessRange!(Args[0])) { + import std.array : uninitializedArray; + import std.conv : emplaceRef, text; + import std.exception : enforce; + import std.functional : adjoin, unaryFun; + import std.meta : staticMap; + import std.range.primitives : ElementType; + import std.traits : isIntegral; alias fun = adjoin!(staticMap!(unaryFun, functions)); alias range = args[0]; @@ -1708,6 +1734,10 @@ public: void doIt() { + import core.atomic : atomicLoad, atomicStore, atomicOp; + import std.array : front, popFront; + import std.algorithm.comparison : min; + import std.range.primitives : hasSlicing; scope(failure) { // If an exception is thrown, all threads should bail. @@ -1753,6 +1783,9 @@ public: /// template map(functions...) { + import std.range.primitives : hasLength, isInputRange, + isRandomAccessRange; + /** A semi-lazy parallel map that can be used for pipelining. The map functions are evaluated for the first $(D bufSize) elements and stored in a @@ -1826,6 +1859,10 @@ public: map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) if (isInputRange!S) { + import std.algorithm.comparison : min; + import std.exception : enforce; + import std.functional : adjoin, staticMap, unaryFun; + import std.range.primitives : ElementType, hasLength, isInfinite; enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, "Work unit size must be smaller than buffer size."); alias fun = adjoin!(staticMap!(unaryFun, functions)); @@ -1892,6 +1929,7 @@ public: // No need to copy element by element. FromType dumpToFrom() { + import std.algorithm.mutation : swap; assert(source.buf1.length <= from.length); from.length = source.buf1.length; swap(source.buf1, from); @@ -1976,6 +2014,7 @@ public: // case. E[] fillBuf(E[] buf) { + import std.range : take; static if (isRandomAccessRange!S) { auto toMap = take(source, buf.length); @@ -2069,7 +2108,7 @@ public: } } - static if (std.range.isInfinite!S) + static if (isInfinite!S) { enum bool empty = false; } @@ -2132,6 +2171,7 @@ public: */ auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) { + import std.range.primitives : ElementType, hasLength, isInfinite; static final class AsyncBuf { // This is a class because the task and source both need to be on @@ -2249,7 +2289,7 @@ public: } } - static if (std.range.isInfinite!S) + static if (isInfinite!S) { enum bool empty = false; } @@ -2429,6 +2469,11 @@ public: */ auto reduce(Args...)(Args args) { + import std.array : empty, front, popFront; + import std.exception : enforce; + import std.range.primitives : isInputRange; + import std.traits : isIntegral; + alias fun = reduceAdjoin!functions; alias finishFun = reduceFinish!functions; @@ -2446,6 +2491,8 @@ public: auto makeStartValue(Type)(Type e) { + import std.conv : emplaceRef; + import std.functional : adjoin, binaryFun, staticMap; static if (functions.length == 1) { return e; @@ -2502,6 +2549,7 @@ public: // since we're assuming functions are associative anyhow. // This is so that loops can be unrolled automatically. + import std.meta : AliasSeq; enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); enum nILP = ilpTuple.length; immutable subSize = (upperBound - lowerBound) / nILP; @@ -2605,6 +2653,8 @@ public: immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; import core.stdc.stdlib; + import core.exception : OutOfMemoryError; + import std.conv : emplaceRef; if (nBytesNeeded < maxStack) { tasks = (cast(RTask*) buf.ptr)[0..nWorkUnits]; @@ -2643,6 +2693,7 @@ public: size_t curPos = 0; void useTask(ref RTask task) { + import std.algorithm.comparison : min; task.pool = this; task._args[0] = scopedAddress(&reduceOnRange); task._args[3] = min(len, curPos + workUnitSize); // upper bound. @@ -2837,6 +2888,7 @@ public: void initialize(TaskPool pool) { + import core.memory : GC; this.pool = pool; size = pool.size + 1; stillThreadLocal = new bool; @@ -2867,6 +2919,7 @@ public: ref T opIndex(size_t index) { + import std.conv : text; assert(index < size, text(index, '\t', uint.max)); return *(cast(T*) (data + elemSize * index)); } @@ -3183,6 +3236,7 @@ public: @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) if (isSafeReturn!(typeof(*task))) { + import std.exception : enforce; enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); put(*task); } @@ -3232,7 +3286,8 @@ public: */ int priority() @property @trusted { - return (size == 0) ? core.thread.Thread.PRIORITY_MIN : + //import core.thread : Thread; + return (size == 0) ? Thread.PRIORITY_MIN : pool[0].priority; } @@ -3270,6 +3325,7 @@ terminating the main thread. private shared uint _defaultPoolThreads; shared static this() { + import core.atomic : atomicStore; atomicStore(_defaultPoolThreads, totalCPUs - 1); } @@ -3281,12 +3337,14 @@ number of worker threads in the instance returned by $(D taskPool). */ @property uint defaultPoolThreads() @trusted { + import core.atomic : atomicLoad; return atomicLoad(_defaultPoolThreads); } /// Ditto @property void defaultPoolThreads(uint newVal) @trusted { + import core.atomic : atomicStore; atomicStore(_defaultPoolThreads, newVal); } @@ -3333,6 +3391,7 @@ class ParallelForeachError : Error /*------Structs that implement opApply for parallel foreach.------------------*/ private template randLen(R) { + import std.range.primitives : hasLength, isRandomAccessRange; enum randLen = isRandomAccessRange!R && hasLength!R; } @@ -3346,6 +3405,7 @@ private void submitAndExecute( alias PTask = typeof(scopedTask(doIt)); import core.stdc.stdlib; import core.stdc.string : memcpy; + import core.exception : OutOfMemoryError; // The logical thing to do would be to just use alloca() here, but that // causes problems on Windows for reasons that I don't understand @@ -3453,6 +3513,7 @@ void foreachErr() int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) { + import std.traits : Parameters; with(p) { int res = 0; @@ -3498,6 +3559,9 @@ int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) private enum string parallelApplyMixinRandomAccess = q{ // Handle empty thread pool as special case. + import core.atomic : atomicLoad, atomicStore, atomicOp; + import std.algorithm.comparison : min; + import std.traits : Parameters; if (pool.size == 0) { return doSizeZeroCase(this, dg); @@ -3553,6 +3617,13 @@ private enum string parallelApplyMixinRandomAccess = q{ enum string parallelApplyMixinInputRange = q{ // Handle empty thread pool as special case. + import core.atomic : atomicLoad, atomicStore, atomicOp; + import core.sync.condition : Condition, Mutex; + import std.array : empty; + import std.algorithm.mutation : swap; + import std.array : uninitializedArray; + import std.traits : Parameters; + if (pool.size == 0) { return doSizeZeroCase(this, dg); @@ -3766,6 +3837,8 @@ private void addToChain( private struct ParallelForeach(R) { + import std.range.primitives : ElementType, hasLvalueElements; + TaskPool pool; R range; size_t workUnitSize; @@ -3816,6 +3889,7 @@ instance and forwards it to the input range overload of asyncBuf. */ private struct RoundRobinBuffer(C1, C2) { + import std.traits : Parameters; // No need for constraints because they're already checked for in asyncBuf. alias Array = Parameters!(C1.init)[0]; @@ -3897,6 +3971,14 @@ version(unittest) // These are the tests that should be run every time Phobos is compiled. unittest { + import std.algorithm.iteration : filter, map, reduce; + import std.algorithm.comparison : equal, max, min; + import std.conv : text; + import std.exception : assertThrown; + import std.math : approxEqual, log, sqrt; + import std.range : iota, indexed; + import std.typecons : tuple, Tuple; + poolInstance = new TaskPool(2); scope(exit) poolInstance.stop(); @@ -4041,12 +4123,12 @@ unittest // Test amap with a non-array buffer. auto toIndex = new int[5]; - auto indexed = std.range.indexed(toIndex, [3, 1, 4, 0, 2]); - poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], indexed); - assert(equal(indexed, [2, 4, 6, 8, 10])); + auto indexedRslt = indexed(toIndex, [3, 1, 4, 0, 2]); + poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], indexedRslt); + assert(equal(indexedRslt, [2, 4, 6, 8, 10])); assert(equal(toIndex, [8, 4, 10, 2, 6])); - poolInstance.amap!"a / 2"(indexed, indexed); - assert(equal(indexed, [1, 2, 3, 4, 5])); + poolInstance.amap!"a / 2"(indexedRslt, indexedRslt); + assert(equal(indexedRslt, [1, 2, 3, 4, 5])); assert(equal(toIndex, [4, 2, 5, 1, 3])); auto buf = new int[5]; @@ -4063,7 +4145,7 @@ unittest assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == tuple(10, 24)); - immutable serialAns = std.algorithm.reduce!"a + b"(iota(1000)); + immutable serialAns = reduce!"a + b"(iota(1000)); assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); @@ -4082,6 +4164,7 @@ unittest // Test finish() { + import core.thread : dur; static void slowFun() { Thread.sleep(dur!"msecs"(1)); } auto pool1 = new TaskPool(); @@ -4124,7 +4207,7 @@ unittest assert(equal( poolInstance.map!"a * a"(iota(30_000_001), 10_000), - std.algorithm.map!"a * a"(iota(30_000_001)) + map!"a * a"(iota(30_000_001)) )); // The filter is to kill random access and test the non-random access @@ -4133,7 +4216,7 @@ unittest poolInstance.map!"a * a"( filter!"a == a"(iota(30_000_001) ), 10_000, 1000), - std.algorithm.map!"a * a"(iota(30_000_001)) + map!"a * a"(iota(30_000_001)) )); assert( @@ -4141,7 +4224,7 @@ unittest poolInstance.map!"a * a"(iota(3_000_001), 10_000) ) == reduce!"a + b"(0UL, - std.algorithm.map!"a * a"(iota(3_000_001)) + map!"a * a"(iota(3_000_001)) ) ); @@ -4152,6 +4235,7 @@ unittest { import std.file : deleteme; + import std.conv : to; string temp_file = deleteme ~ "-tempDelMe.txt"; auto file = File(temp_file, "wb"); @@ -4165,6 +4249,7 @@ unittest auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; foreach (row; written) { + import std.array : join; file.writeln(join(to!(string[])(row), "\t")); } @@ -4182,6 +4267,7 @@ unittest foreach (line; asyncReader) { + import std.array : split; if (line.length == 0) continue; auto ls = line.split("\t"); read ~= to!(double[])(ls); From 5ef946a9565ff0fe42f82fa1e68f1c0addd85a8d Mon Sep 17 00:00:00 2001 From: Robert burner Schadek Date: Fri, 3 Jun 2016 10:20:33 +0200 Subject: [PATCH 2/2] some tickering --- std/parallelism.d | 96 ++++++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 42 deletions(-) diff --git a/std/parallelism.d b/std/parallelism.d index 72b80bc4e13..27c0da090a5 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -78,7 +78,7 @@ import core.thread; // : Thread; import std.traits; // : ReturnType; -version(OSX) +/*version(OSX) { version = useSysctlbyname; } @@ -89,7 +89,8 @@ else version(FreeBSD) else version(NetBSD) { version = useSysctlbyname; -} +}*/ +version = useSysctlbyname; version(Windows) @@ -98,6 +99,7 @@ version(Windows) shared static this() { import core.sys.windows.windows; + import core.sys.windows.winbase; import std.algorithm.comparison : max; SYSTEM_INFO si; @@ -106,26 +108,26 @@ version(Windows) } } -else version(linux) -{ +//else version(linux) +//{ import core.sys.posix.unistd; shared static this() { totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); } -} -else version(Solaris) -{ +//} +//else version(Solaris) +//{ import core.sys.posix.unistd; shared static this() { totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); } -} -else version(useSysctlbyname) -{ +//} +//else version(useSysctlbyname) +//{ extern(C) int sysctlbyname( const char *, void *, size_t *, void *, size_t ); @@ -140,10 +142,10 @@ else version(useSysctlbyname) { auto nameStr = "hw.ncpu\0".ptr; } - else version(NetBSD) - { + //else version(NetBSD) + //{ auto nameStr = "hw.ncpu\0".ptr; - } + //} uint ans; size_t len = uint.sizeof; @@ -151,9 +153,9 @@ else version(useSysctlbyname) totalCPUs = ans; } -} -else -{ +//} +//else +version(none){ static assert(0, "Don't know how to get N CPUs on this OS."); } @@ -695,10 +697,10 @@ struct Task(alias fun, Args...) if (job !is null) { - version(verboseUnittest) - { + //version(verboseUnittest) + //{ stderr.writeln("Doing workForce work."); - } + //} pool.doJob(job); @@ -716,10 +718,10 @@ struct Task(alias fun, Args...) } else { - version(verboseUnittest) - { + //version(verboseUnittest) + //{ stderr.writeln("Yield from workForce."); - } + //} return yieldForce; } @@ -3619,7 +3621,7 @@ enum string parallelApplyMixinInputRange = q{ // Handle empty thread pool as special case. import core.atomic : atomicLoad, atomicStore, atomicOp; import core.sync.condition : Condition, Mutex; - import std.array : empty; + import std.array : empty, front, popFront; import std.algorithm.mutation : swap; import std.array : uninitializedArray; import std.traits : Parameters; @@ -3959,19 +3961,19 @@ private struct RoundRobinBuffer(C1, C2) } } -version(unittest) -{ +//version(unittest) +//{ // This was the only way I could get nested maps to work. __gshared TaskPool poolInstance; import std.stdio; -} +//} // These test basic functionality but don't stress test for threading bugs. // These are the tests that should be run every time Phobos is compiled. unittest { - import std.algorithm.iteration : filter, map, reduce; + static import std.algorithm.iteration; import std.algorithm.comparison : equal, max, min; import std.conv : text; import std.exception : assertThrown; @@ -4090,7 +4092,7 @@ unittest assert(addScopedTask.yieldForce == 3); // Test parallel foreach with non-random access range. - auto range = filter!"a != 666"([0, 1, 2, 3, 4]); + auto range = std.algorithm.iteration.filter!"a != 666"([0, 1, 2, 3, 4]); foreach (i, elem; poolInstance.parallel(range)) { @@ -4145,7 +4147,7 @@ unittest assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == tuple(10, 24)); - immutable serialAns = reduce!"a + b"(iota(1000)); + immutable serialAns = std.algorithm.iteration.reduce!"a + b"(iota(1000)); assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); @@ -4207,30 +4209,30 @@ unittest assert(equal( poolInstance.map!"a * a"(iota(30_000_001), 10_000), - map!"a * a"(iota(30_000_001)) + std.algorithm.iteration.map!"a * a"(iota(30_000_001)) )); // The filter is to kill random access and test the non-random access // branch. assert(equal( poolInstance.map!"a * a"( - filter!"a == a"(iota(30_000_001) + std.algorithm.iteration.filter!"a == a"(iota(30_000_001) ), 10_000, 1000), - map!"a * a"(iota(30_000_001)) + std.algorithm.iteration.map!"a * a"(iota(30_000_001)) )); assert( - reduce!"a + b"(0UL, + std.algorithm.iteration.reduce!"a + b"(0UL, poolInstance.map!"a * a"(iota(3_000_001), 10_000) ) == - reduce!"a + b"(0UL, - map!"a * a"(iota(3_000_001)) + std.algorithm.iteration.reduce!"a + b"(0UL, + std.algorithm.iteration.map!"a * a"(iota(3_000_001)) ) ); assert(equal( iota(1_000_002), - poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) + poolInstance.asyncBuf(std.algorithm.iteration.filter!"a == a"(iota(1_000_002))) )); { @@ -4381,10 +4383,12 @@ unittest // These are more like stress tests than real unit tests. They print out // tons of stuff and should not be run every time make unittest is run. -version(parallelismStressTest) -{ +//version(parallelismStressTest) +//{ unittest { + static import std.algorithm.iteration; + import std.range : iota; size_t attempt; for (; attempt < 10; attempt++) foreach (poolSize; [0, 4]) @@ -4408,7 +4412,7 @@ version(parallelismStressTest) stderr.writeln("Done creating nums."); - auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); + auto myNumbers = std.algorithm.iteration.filter!"a % 7 > 0"( iota(0, 1000)); foreach (num; poolInstance.parallel(myNumbers)) { assert(num % 7 > 0 && num < 1000); @@ -4423,7 +4427,7 @@ version(parallelismStressTest) } stderr.writeln("Done squares."); - auto sumFuture = task!( reduce!"a + b" )(numbers); + auto sumFuture = task!( std.algorithm.iteration.reduce!"a + b" )(numbers); poolInstance.put(sumFuture); ulong sumSquares = 0; @@ -4468,6 +4472,12 @@ version(parallelismStressTest) // as examples. unittest { + import std.range : iota; + import std.conv : text; + import core.atomic : atomicOp; + import std.math : approxEqual, sqrt; + static import std.algorithm.iteration; + foreach (attempt; 0..10) foreach (poolSize; [0, 4]) { @@ -4483,7 +4493,7 @@ version(parallelismStressTest) { workerLocalStorage.get++; } - assert(reduce!"a + b"(workerLocalStorage.toRange) == + assert(std.algorithm.iteration.reduce!"a + b"(workerLocalStorage.toRange) == 1_000_000 + poolInstance.size + 1); // Make sure work is reasonably balanced among threads. This test is @@ -4568,6 +4578,7 @@ version(parallelismStressTest) stderr.writeln("Done sum of square roots."); // Test whether tasks work with function pointers. + bool isNaN(double v) { static import std.math; return std.math.isNaN(v); } auto nanTask = task(&isNaN, 1.0L); poolInstance.put(nanTask); assert(nanTask.spinForce == false); @@ -4599,6 +4610,7 @@ version(parallelismStressTest) int[] nums = [1,2,3,4,5]; static struct RemoveRandom { + import std.array : empty, front, popFront; int[] arr; ref int front() @@ -4626,7 +4638,7 @@ version(parallelismStressTest) poolInstance.stop(); } } -} +//} version(unittest) {