diff --git a/std/parallelism.d b/std/parallelism.d index 0abfc26b372..99cdd95e2ed 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -77,20 +77,14 @@ module std.parallelism; } import core.atomic; -import core.exception; import core.memory; import core.sync.condition; import core.thread; -import std.algorithm; -import std.conv; -import std.exception; import std.functional; -import std.math; import std.meta; -import std.range; +import std.range.primitives; import std.traits; -import std.typecons; version(OSX) { @@ -112,6 +106,7 @@ version(Windows) shared static this() { import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; + import std.algorithm.comparison : max; SYSTEM_INFO si; GetSystemInfo(&si); @@ -537,6 +532,7 @@ struct Task(alias fun, Args...) private void enforcePool() { + import std.exception : enforce; enforce(this.pool !is null, "Job not submitted yet."); } @@ -1187,6 +1183,8 @@ private: } out { + import std.conv : text; + assert(tail.prev !is tail); assert(tail.next is null, text(tail.prev, '\t', tail.next)); if (tail.prev !is null) @@ -1401,6 +1399,8 @@ public: // as public API. size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow { + import std.algorithm.comparison : max; + if (this.size == 0) { return rangeLen; @@ -1527,6 +1527,7 @@ public: */ ParallelForeach!R parallel(R)(R range, size_t workUnitSize) { + import std.exception : enforce; enforce(workUnitSize > 0, "workUnitSize must be > 0."); alias RetType = ParallelForeach!R; return RetType(this, range, workUnitSize); @@ -1640,6 +1641,8 @@ public: auto amap(Args...)(Args args) if (isRandomAccessRange!(Args[0])) { + import std.conv : emplaceRef; + alias fun = adjoin!(staticMap!(unaryFun, functions)); alias range = args[0]; @@ -1651,6 +1654,9 @@ public: is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) ) { + import std.conv : text; + import std.exception : enforce; + alias buf = args[$ - 1]; alias args2 = args[0..$ - 1]; alias Args2 = Args[0..$ - 1]; @@ -1664,6 +1670,8 @@ public: } else { + import std.array : uninitializedArray; + auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); alias args2 = args; alias Args2 = Args; @@ -1706,6 +1714,8 @@ public: void doIt() { + import std.algorithm.comparison : min; + scope(failure) { // If an exception is thrown, all threads should bail. @@ -1824,6 +1834,8 @@ public: map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) if (isInputRange!S) { + import std.exception : enforce; + enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, "Work unit size must be smaller than buffer size."); alias fun = adjoin!(staticMap!(unaryFun, functions)); @@ -1854,6 +1866,8 @@ public: void popSource() { + import std.algorithm.comparison : min; + static if (__traits(compiles, source[0 .. source.length])) { source = source[min(buf1.length, source.length)..source.length]; @@ -1890,6 +1904,8 @@ public: // No need to copy element by element. FromType dumpToFrom() { + import std.algorithm.mutation : swap; + assert(source.buf1.length <= from.length); from.length = source.buf1.length; swap(source.buf1, from); @@ -1974,8 +1990,11 @@ public: // case. E[] fillBuf(E[] buf) { + import std.algorithm.comparison : min; + static if (isRandomAccessRange!S) { + import std.range : take; auto toMap = take(source, buf.length); scope(success) popSource(); } @@ -2067,7 +2086,7 @@ public: } } - static if (std.range.isInfinite!S) + static if (isInfinite!S) { enum bool empty = false; } @@ -2247,7 +2266,7 @@ public: } } - static if (std.range.isInfinite!S) + static if (isInfinite!S) { enum bool empty = false; } @@ -2427,6 +2446,10 @@ public: */ auto reduce(Args...)(Args args) { + import core.exception : OutOfMemoryError; + import std.conv : emplaceRef; + import std.exception : enforce; + alias fun = reduceAdjoin!functions; alias finishFun = reduceFinish!functions; @@ -2642,6 +2665,8 @@ public: size_t curPos = 0; void useTask(ref RTask task) { + import std.algorithm.comparison : min; + task.pool = this; task._args[0] = scopedAddress(&reduceOnRange); task._args[3] = min(len, curPos + workUnitSize); // upper bound. @@ -2866,6 +2891,7 @@ public: ref T opIndex(size_t index) { + import std.conv : text; assert(index < size, text(index, '\t', uint.max)); return *(cast(T*) (data + elemSize * index)); } @@ -3168,6 +3194,7 @@ public: void put(alias fun, Args...)(Task!(fun, Args)* task) if (!isSafeReturn!(typeof(*task))) { + import std.exception : enforce; enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); put(*task); } @@ -3182,6 +3209,7 @@ public: @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) if (isSafeReturn!(typeof(*task))) { + import std.exception : enforce; enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); put(*task); } @@ -3340,6 +3368,7 @@ private void submitAndExecute( scope void delegate() doIt ) { + import core.exception : OutOfMemoryError; immutable nThreads = pool.size + 1; alias PTask = typeof(scopedTask(doIt)); @@ -3513,6 +3542,8 @@ private enum string parallelApplyMixinRandomAccess = q{ void doIt() { + import std.algorithm.comparison : min; + scope(failure) { // If an exception is thrown, all threads should bail. @@ -3610,6 +3641,7 @@ enum string parallelApplyMixinInputRange = q{ size_t makeTemp() { import std.algorithm.internal : addressOf; + import std.array : uninitializedArray; if (temp is null) { @@ -3641,6 +3673,8 @@ enum string parallelApplyMixinInputRange = q{ // Returns: The previous value of nPopped. static if (!bufferTrick) size_t makeTemp() { + import std.array : uninitializedArray; + if (temp is null) { temp = uninitializedArray!Temp(workUnitSize); @@ -3663,6 +3697,7 @@ enum string parallelApplyMixinInputRange = q{ static if (bufferTrick) size_t makeTemp() { + import std.algorithm.mutation : swap; rangeMutex.lock(); scope(exit) rangeMutex.unlock(); @@ -3896,6 +3931,15 @@ version(unittest) // These are the tests that should be run every time Phobos is compiled. @system unittest { + import std.algorithm.iteration : filter, map, reduce; + import std.algorithm.comparison : equal, min, max; + import std.array : split; + import std.conv : text; + import std.exception : assertThrown; + import std.math : approxEqual, sqrt, log; + import std.range : indexed, iota, join; + import std.typecons : Tuple, tuple; + poolInstance = new TaskPool(2); scope(exit) poolInstance.stop(); @@ -4040,12 +4084,12 @@ version(unittest) // Test amap with a non-array buffer. auto toIndex = new int[5]; - auto indexed = std.range.indexed(toIndex, [3, 1, 4, 0, 2]); - poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], indexed); - assert(equal(indexed, [2, 4, 6, 8, 10])); + auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); + poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); + assert(equal(ind, [2, 4, 6, 8, 10])); assert(equal(toIndex, [8, 4, 10, 2, 6])); - poolInstance.amap!"a / 2"(indexed, indexed); - assert(equal(indexed, [1, 2, 3, 4, 5])); + poolInstance.amap!"a / 2"(ind, ind); + assert(equal(ind, [1, 2, 3, 4, 5])); assert(equal(toIndex, [4, 2, 5, 1, 3])); auto buf = new int[5]; @@ -4062,7 +4106,7 @@ version(unittest) assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == tuple(10, 24)); - immutable serialAns = std.algorithm.reduce!"a + b"(iota(1000)); + immutable serialAns = reduce!"a + b"(iota(1000)); assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); @@ -4123,7 +4167,7 @@ version(unittest) assert(equal( poolInstance.map!"a * a"(iota(30_000_001), 10_000), - std.algorithm.map!"a * a"(iota(30_000_001)) + map!"a * a"(iota(30_000_001)) )); // The filter is to kill random access and test the non-random access @@ -4132,7 +4176,7 @@ version(unittest) poolInstance.map!"a * a"( filter!"a == a"(iota(30_000_001) ), 10_000, 1000), - std.algorithm.map!"a * a"(iota(30_000_001)) + map!"a * a"(iota(30_000_001)) )); assert( @@ -4140,7 +4184,7 @@ version(unittest) poolInstance.map!"a * a"(iota(3_000_001), 10_000) ) == reduce!"a + b"(0UL, - std.algorithm.map!"a * a"(iota(3_000_001)) + map!"a * a"(iota(3_000_001)) ) ); @@ -4150,6 +4194,7 @@ version(unittest) )); { + import std.conv : to; import std.file : deleteme; string temp_file = deleteme ~ "-tempDelMe.txt"; @@ -4564,12 +4609,16 @@ version(unittest) @safe unittest { + import std.range : iota; + // this test was in std.range, but caused cycles. assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); } @safe unittest { + import std.algorithm.iteration : each; + long[] arr; static assert(is(typeof({ arr.parallel.each!"a++";