Skip to content
85 changes: 67 additions & 18 deletions std/parallelism.d
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,14 @@ module std.parallelism;
}

import core.atomic;
import core.exception;
import core.memory;
import core.sync.condition;
import core.thread;

import std.algorithm;
import std.conv;
import std.exception;
import std.functional;
import std.math;
import std.meta;
import std.range;
import std.range.primitives;
import std.traits;
import std.typecons;

version(OSX)
{
Expand All @@ -112,6 +106,7 @@ version(Windows)
shared static this()
{
import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
import std.algorithm.comparison : max;

SYSTEM_INFO si;
GetSystemInfo(&si);
Expand Down Expand Up @@ -537,6 +532,7 @@ struct Task(alias fun, Args...)

private void enforcePool()
{
import std.exception : enforce;
enforce(this.pool !is null, "Job not submitted yet.");
}

Expand Down Expand Up @@ -1187,6 +1183,8 @@ private:
}
out
{
import std.conv : text;

assert(tail.prev !is tail);
assert(tail.next is null, text(tail.prev, '\t', tail.next));
if (tail.prev !is null)
Expand Down Expand Up @@ -1401,6 +1399,8 @@ public:
// as public API.
size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
{
import std.algorithm.comparison : max;

if (this.size == 0)
{
return rangeLen;
Expand Down Expand Up @@ -1527,6 +1527,7 @@ public:
*/
ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
{
import std.exception : enforce;
enforce(workUnitSize > 0, "workUnitSize must be > 0.");
alias RetType = ParallelForeach!R;
return RetType(this, range, workUnitSize);
Expand Down Expand Up @@ -1640,6 +1641,8 @@ public:
auto amap(Args...)(Args args)
if (isRandomAccessRange!(Args[0]))
{
import std.conv : emplaceRef;

alias fun = adjoin!(staticMap!(unaryFun, functions));

alias range = args[0];
Expand All @@ -1651,6 +1654,9 @@ public:
is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
)
{
import std.conv : text;
import std.exception : enforce;

alias buf = args[$ - 1];
alias args2 = args[0..$ - 1];
alias Args2 = Args[0..$ - 1];
Expand All @@ -1664,6 +1670,8 @@ public:
}
else
{
import std.array : uninitializedArray;

auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
alias args2 = args;
alias Args2 = Args;
Expand Down Expand Up @@ -1706,6 +1714,8 @@ public:

void doIt()
{
import std.algorithm.comparison : min;

scope(failure)
{
// If an exception is thrown, all threads should bail.
Expand Down Expand Up @@ -1824,6 +1834,8 @@ public:
map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
if (isInputRange!S)
{
import std.exception : enforce;

enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
"Work unit size must be smaller than buffer size.");
alias fun = adjoin!(staticMap!(unaryFun, functions));
Expand Down Expand Up @@ -1854,6 +1866,8 @@ public:

void popSource()
{
import std.algorithm.comparison : min;

static if (__traits(compiles, source[0 .. source.length]))
{
source = source[min(buf1.length, source.length)..source.length];
Expand Down Expand Up @@ -1890,6 +1904,8 @@ public:
// No need to copy element by element.
FromType dumpToFrom()
{
import std.algorithm.mutation : swap;

assert(source.buf1.length <= from.length);
from.length = source.buf1.length;
swap(source.buf1, from);
Expand Down Expand Up @@ -1974,8 +1990,11 @@ public:
// case.
E[] fillBuf(E[] buf)
{
import std.algorithm.comparison : min;

static if (isRandomAccessRange!S)
{
import std.range : take;
auto toMap = take(source, buf.length);
scope(success) popSource();
}
Expand Down Expand Up @@ -2067,7 +2086,7 @@ public:
}
}

static if (std.range.isInfinite!S)
static if (isInfinite!S)
{
enum bool empty = false;
}
Expand Down Expand Up @@ -2247,7 +2266,7 @@ public:
}
}

static if (std.range.isInfinite!S)
static if (isInfinite!S)
{
enum bool empty = false;
}
Expand Down Expand Up @@ -2427,6 +2446,10 @@ public:
*/
auto reduce(Args...)(Args args)
{
import core.exception : OutOfMemoryError;
import std.conv : emplaceRef;
import std.exception : enforce;

alias fun = reduceAdjoin!functions;
alias finishFun = reduceFinish!functions;

Expand Down Expand Up @@ -2642,6 +2665,8 @@ public:
size_t curPos = 0;
void useTask(ref RTask task)
{
import std.algorithm.comparison : min;

task.pool = this;
task._args[0] = scopedAddress(&reduceOnRange);
task._args[3] = min(len, curPos + workUnitSize); // upper bound.
Expand Down Expand Up @@ -2866,6 +2891,7 @@ public:

ref T opIndex(size_t index)
{
import std.conv : text;
assert(index < size, text(index, '\t', uint.max));
return *(cast(T*) (data + elemSize * index));
}
Expand Down Expand Up @@ -3168,6 +3194,7 @@ public:
void put(alias fun, Args...)(Task!(fun, Args)* task)
if (!isSafeReturn!(typeof(*task)))
{
import std.exception : enforce;
enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
put(*task);
}
Expand All @@ -3182,6 +3209,7 @@ public:
@trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
if (isSafeReturn!(typeof(*task)))
{
import std.exception : enforce;
enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
put(*task);
}
Expand Down Expand Up @@ -3340,6 +3368,7 @@ private void submitAndExecute(
scope void delegate() doIt
)
{
import core.exception : OutOfMemoryError;
immutable nThreads = pool.size + 1;

alias PTask = typeof(scopedTask(doIt));
Expand Down Expand Up @@ -3513,6 +3542,8 @@ private enum string parallelApplyMixinRandomAccess = q{

void doIt()
{
import std.algorithm.comparison : min;

scope(failure)
{
// If an exception is thrown, all threads should bail.
Expand Down Expand Up @@ -3610,6 +3641,7 @@ enum string parallelApplyMixinInputRange = q{
size_t makeTemp()
{
import std.algorithm.internal : addressOf;
import std.array : uninitializedArray;

if (temp is null)
{
Expand Down Expand Up @@ -3641,6 +3673,8 @@ enum string parallelApplyMixinInputRange = q{
// Returns: The previous value of nPopped.
static if (!bufferTrick) size_t makeTemp()
{
import std.array : uninitializedArray;

if (temp is null)
{
temp = uninitializedArray!Temp(workUnitSize);
Expand All @@ -3663,6 +3697,7 @@ enum string parallelApplyMixinInputRange = q{

static if (bufferTrick) size_t makeTemp()
{
import std.algorithm.mutation : swap;
rangeMutex.lock();
scope(exit) rangeMutex.unlock();

Expand Down Expand Up @@ -3896,6 +3931,15 @@ version(unittest)
// These are the tests that should be run every time Phobos is compiled.
@system unittest
{
import std.algorithm.iteration : filter, map, reduce;
import std.algorithm.comparison : equal, min, max;
import std.array : split;
import std.conv : text;
import std.exception : assertThrown;
import std.math : approxEqual, sqrt, log;
import std.range : indexed, iota, join;
import std.typecons : Tuple, tuple;

poolInstance = new TaskPool(2);
scope(exit) poolInstance.stop();

Expand Down Expand Up @@ -4040,12 +4084,12 @@ version(unittest)

// Test amap with a non-array buffer.
auto toIndex = new int[5];
auto indexed = std.range.indexed(toIndex, [3, 1, 4, 0, 2]);
poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], indexed);
assert(equal(indexed, [2, 4, 6, 8, 10]));
auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
assert(equal(ind, [2, 4, 6, 8, 10]));
assert(equal(toIndex, [8, 4, 10, 2, 6]));
poolInstance.amap!"a / 2"(indexed, indexed);
assert(equal(indexed, [1, 2, 3, 4, 5]));
poolInstance.amap!"a / 2"(ind, ind);
assert(equal(ind, [1, 2, 3, 4, 5]));
assert(equal(toIndex, [4, 2, 5, 1, 3]));

auto buf = new int[5];
Expand All @@ -4062,7 +4106,7 @@ version(unittest)
assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
tuple(10, 24));

immutable serialAns = std.algorithm.reduce!"a + b"(iota(1000));
immutable serialAns = reduce!"a + b"(iota(1000));
assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);

Expand Down Expand Up @@ -4123,7 +4167,7 @@ version(unittest)

assert(equal(
poolInstance.map!"a * a"(iota(30_000_001), 10_000),
std.algorithm.map!"a * a"(iota(30_000_001))
map!"a * a"(iota(30_000_001))
));

// The filter is to kill random access and test the non-random access
Expand All @@ -4132,15 +4176,15 @@ version(unittest)
poolInstance.map!"a * a"(
filter!"a == a"(iota(30_000_001)
), 10_000, 1000),
std.algorithm.map!"a * a"(iota(30_000_001))
map!"a * a"(iota(30_000_001))
));

assert(
reduce!"a + b"(0UL,
poolInstance.map!"a * a"(iota(3_000_001), 10_000)
) ==
reduce!"a + b"(0UL,
std.algorithm.map!"a * a"(iota(3_000_001))
map!"a * a"(iota(3_000_001))
)
);

Expand All @@ -4150,6 +4194,7 @@ version(unittest)
));

{
import std.conv : to;
import std.file : deleteme;

string temp_file = deleteme ~ "-tempDelMe.txt";
Expand Down Expand Up @@ -4564,12 +4609,16 @@ version(unittest)

@safe unittest
{
import std.range : iota;

// this test was in std.range, but caused cycles.
assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
}

@safe unittest
{
import std.algorithm.iteration : each;

long[] arr;
static assert(is(typeof({
arr.parallel.each!"a++";
Expand Down