Skip to content

Explicit Processor Mapping of DArray Blocks in Block-Cyclic Manner #596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a20f6ba
Update darray.jl
AkhilAkkapelli Apr 20, 2025
f009ef3
Update darray.jl
AkhilAkkapelli Apr 20, 2025
04d2017
Create dbcarray.jl
AkhilAkkapelli Apr 20, 2025
1f716e8
Update Dagger.jl
AkhilAkkapelli Apr 20, 2025
1614256
Update dbcarray.jl
AkhilAkkapelli Apr 21, 2025
a85fecf
Create DBCArray.md
AkhilAkkapelli Apr 21, 2025
15090c0
Update dbcarray.jl
AkhilAkkapelli Apr 21, 2025
feb588d
Update DBCArray.md
AkhilAkkapelli Apr 21, 2025
195a844
Update DBCArray.md
AkhilAkkapelli Apr 21, 2025
c7bf6a0
Update DBCArray.md
AkhilAkkapelli Apr 21, 2025
3a043a6
Update DBCArray.md
AkhilAkkapelli Apr 21, 2025
5f49118
Update darray.jl
AkhilAkkapelli Apr 24, 2025
7bf3428
Update Dagger.jl
AkhilAkkapelli Apr 24, 2025
42e6c29
Update darray.jl
AkhilAkkapelli Apr 24, 2025
2934e19
Update darray.jl
AkhilAkkapelli Apr 25, 2025
a96ab02
Update darray.jl
AkhilAkkapelli Apr 25, 2025
c2975b7
Update darray.jl
AkhilAkkapelli Apr 25, 2025
e4dcb87
Update Dagger.jl
AkhilAkkapelli Apr 25, 2025
0b0ca0f
Delete src/array/dbcarray.jl
AkhilAkkapelli Apr 25, 2025
207cf4c
Delete reports directory
AkhilAkkapelli Apr 25, 2025
4887abd
Update darray.jl
AkhilAkkapelli Apr 25, 2025
fd45316
Update darray.md
AkhilAkkapelli Apr 26, 2025
74d73a9
Update darray.jl
AkhilAkkapelli Apr 28, 2025
af2da41
Update allocation.jl
AkhilAkkapelli Apr 28, 2025
76d3a25
Update allocation.jl
AkhilAkkapelli Apr 28, 2025
ee3a42a
Update darray.md
AkhilAkkapelli Apr 28, 2025
e4b9a26
Update darray.md
AkhilAkkapelli Apr 28, 2025
cdd7453
Update thunk.jl
AkhilAkkapelli May 6, 2025
c694da6
Update Sch.jl
AkhilAkkapelli May 6, 2025
14857f6
Update thunk.jl
AkhilAkkapelli May 7, 2025
0abfdc7
Merge branch 'JuliaParallel:master' into master
AkhilAkkapelli May 13, 2025
10a09e4
Update Sch.jl
AkhilAkkapelli May 13, 2025
a6088c2
Update chunks.jl
AkhilAkkapelli May 13, 2025
09b0f7d
Update util.jl
AkhilAkkapelli May 13, 2025
adef9d5
Update thunk.jl
AkhilAkkapelli May 13, 2025
fc4409e
Update darray.jl
AkhilAkkapelli May 13, 2025
41bbe7e
Create task-affinity.md
AkhilAkkapelli May 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,165 @@ across the workers in the Julia cluster in a relatively even distribution;
future operations on a `DArray` may produce a different distribution from the
one chosen by previous calls.

<!-- -->

### Explicit Processor Mapping of DArray Blocks

This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors or threads within the cluster. Fine-grained control over data locality can be crucial for optimizing the performance of certain distributed algorithms.

You specify the mapping using the optional `assignment` keyword argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`) and the `distribute` function.

The `assignment` argument accepts the following values:

* `:arbitrary` (Default):

* If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior.
* `:blockcyclic`:

* If `assignment` is set to `:blockcyclic`, `DArray` blocks are assigned to processors in a block-cyclic manner. Blocks are distributed cyclically across processors, iterating through the processors in increasing rank along the *last* dimension of the block distribution.
* Any other symbol used for `assignment` results in an error.
* `AbstractArray{<:Int, N}`:

* Provide an N-dimensional array of integer worker IDs. The dimension `N` must match the number of dimensions of the `DArray`.
* Dagger maps blocks to worker IDs in a block-cyclic manner. The block at index `(i, j, ...)` is assigned to the first thread of the processor with ID `assignment[i, j, ...]`. This pattern repeats in a block-cyclic fashion to assign all blocks.
* `AbstractArray{<:Processor, N}`:

* Provide an N-dimensional array of `Processor` objects. The dimension `N` must match the number of dimensions of the `DArray` blocks.
* Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the `assignment` array. The block at index `(i, j, ...)` is assigned to the processor at `assignment[i, j, ...]`. This pattern repeats in a block-cyclic fashion to assign all blocks.

#### Examples and Usage

The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array:

* `DArray`: For N-dimensional distributed arrays.

* `DVector`: Specifically for 1-dimensional distributed arrays.

* `DMatrix`: Specifically for 2-dimensional distributed arrays.

* `distribute`: General function to distribute arrays.

Here are some examples using a setup with one processor and three worker processors.

First, let's create some sample arrays:

```julia
A = rand(7, 11) # 2D array
v = rand(15) # 1D array
M = rand(5, 5, 5) # 3D array
```

1. **Arbitrary Assignment:**

```julia
Ad = distribute(A, Blocks(2, 2), :arbitrary)
# DMatrix(A, Blocks(2, 2), :arbitrary)

vd = distribute(v, Blocks(3), :arbitrary)
# DVector(v, Blocks(3), :arbitrary)

Md = distribute(M, Blocks(2, 2, 2), :arbitrary)
# DArray(M, Blocks(2,2,2), :arbitrary)
```

This creates distributed arrays with the specified block sizes, and Dagger assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1)
ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1)
ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1)

```

2. **Block-Cyclic Assignment:**

```julia
Ad = distribute(A, Blocks(2, 2), :blockcyclic)
# DMatrix(A, Blocks(2, 2), :blockcyclic)

vd = distribute(v, Blocks(3), :blockcyclic)
# DVector(v, Blocks(3), :blockcyclic)

Md = distribute(M, Blocks(2, 2, 2), :blockcyclic)
# DArray(M, Blocks(2,2,2), :blockcyclic)
```

This assigns blocks cyclically along the last dimension across the available processors with increasing rank. For the 2D case (`Ad`), the assignment will look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)

```

3. **Block-Cyclic Assignment with Integer Array:**

```julia
assignment_2d = [2 1; 4 3]
Ad = distribute(A, Blocks(2, 2), assignment_2d)
# DMatrix(A, Blocks(2, 2), [3 1; 4 2])

assignment_1d = [2,3,1,4]
vd = distribute(v, Blocks(3), assignment_1d)
# DVector(v, Blocks(3), [2,3,1,4])

assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3)
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
# DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3))

```

Here, the assignment arrays define how processors are arranged. For example, `assignment_2d` creates a 2x2 processor grid for the 2D array.

The assignment for `Ad` would be:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)

```

4. **Block-Cyclic Assignment with Processor Array:**

```julia
assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1);
Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)]
Ad = distribute(A, Blocks(2, 2), assignment_2d)
# DMatrix(A, Blocks(2, 2), assignment_2d)

assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)]
vd = distribute(v, Blocks(3), assignment_1d)
# DVector(v, Blocks(3), assignment_1d)

assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)],
[Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3)
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
# DArray(M, Blocks(2, 2, 2), assignment_3d)

```

If the assignment is a matrix of `Processor` objects, the blocks are assigned as follows:
For `Ad`:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)

```

<!-- -->

## Broadcasting

As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's
Expand Down
47 changes: 47 additions & 0 deletions docs/src/task-affinity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
```@meta
CurrentModule = Dagger
```

# Task Affinity


Dagger.jl's `@spawn` macro offers fine-grained control over task execution by using the `compute_scope` and `result_scope` options to precisely control where tasks run and where their results can be accessed.

## Compute Scope

`compute_scope` defines exactly where a task's computation must occur. This option overrides the standard `scope` option if both are provided.

```julia
g = Dagger.@spawn compute_scope=ExactScope(Dagger.ThreadProc(3, 1)) f(x,y)
```

In this example, task `f(x,y)` is scheduled to run specifically on thread 1 of processor 3.


## Result Scope

`result_scope` restricts the locations from which a task's result can be fetched. This is useful for managing data locality and access patterns.

```julia
g = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(2)) f(x,y)
```

Here, the result of `f(x,y)` (referenced by `g`) will be primarily accessible from worker process 2. Fetching from other locations might require data movement.

## Interaction of compute_scope and result_scope

When both `compute_scope` and `result_scope` are specified for a task, Scheduler determines the execution location based on their intersection:

- **Intersection Exists:** If there is an intersection between the compute_scope and result_scope, the task's computation will be scheduled to occur within this intersection. This is the preferred scenario.

- **No Intersection:** If there is no intersection, the task's computation will occur in the compute_scope. However, the result_scope will still be respected for accessing the result.

### Syntax:
```julia
g = Dagger.@spawn compute_scope=ExactScope(Dagger.ThreadProc(3, 1)) result_scope=ExactScope(Dagger.ThreadProc(2, 2)) f(x,y)
```

In this case, the task computes on `Dagger.ThreadProc(3, 1)`. Result access is restricted to `Dagger.ThreadProc(2, 2)`.

!!! note "Chunk Inputs"
If the input to `Dagger.@spawn` is already a `Dagger.tochunk`, the `compute_scope` and `result_scope` options will have no effect on the task's execution or result accessibility.
103 changes: 74 additions & 29 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -419,26 +419,26 @@ struct Distribute{T,N,B<:AbstractBlocks} <: ArrayOp{T, N}
domainchunks
partitioning::B
data::AbstractArray{T,N}
procgrid::Union{AbstractArray{<:Processor, N}, Nothing}
end

size(x::Distribute) = size(domain(x.data))

Base.@deprecate BlockPartition Blocks


Distribute(p::Blocks, data::AbstractArray) =
Distribute(partition(p, domain(data)), p, data)
Distribute(p::Blocks, data::AbstractArray, procgrid::Union{AbstractArray{<:Processor},Nothing} = nothing) =
Distribute(partition(p, domain(data)), p, data, procgrid)

function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}) where {T,N}
function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N}
p = Blocks(ntuple(i->first(domainchunks.cumlength[i]), N))
Distribute(domainchunks, p, data)
Distribute(domainchunks, p, data, procgrid)
end

function Distribute(data::AbstractArray{T,N}) where {T,N}
nprocs = sum(w->length(Dagger.get_processors(OSProc(w))),
procs())
function Distribute(data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N}
nprocs = sum(w->length(get_processors(OSProc(w))),procs())
p = Blocks(ntuple(i->max(cld(size(data, i), nprocs), 1), N))
return Distribute(partition(p, domain(data)), p, data)
return Distribute(partition(p, domain(data)), p, data, procgrid)
end

function stage(ctx::Context, d::Distribute)
Expand All @@ -451,7 +451,8 @@ function stage(ctx::Context, d::Distribute)
Nd = ndims(x)
T = eltype(d.data)
concat = x.concat
cs = map(d.domainchunks) do idx
cs = map(CartesianIndices(d.domainchunks)) do I
idx = d.domainchunks[I]
chunks = stage(ctx, x[idx]).chunks
shape = size(chunks)
# TODO: fix hashing
Expand All @@ -466,12 +467,20 @@ function stage(ctx::Context, d::Distribute)
end
end
else
cs = map(d.domainchunks) do c
cs = map(CartesianIndices(d.domainchunks)) do I
# TODO: fix hashing
#hash = uhash(c, Base.hash(Distribute, Base.hash(d.data)))
Dagger.@spawn identity(d.data[c])
c = d.domainchunks[I]
if isnothing(d.procgrid)
Dagger.@spawn identity(d.data[c])
else
proc = d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)]
scope = ExactScope(proc)
Dagger.@spawn compute_scope=scope identity(d.data[c])
end
end
end

return DArray(eltype(d.data),
domain(d.data),
d.domainchunks,
Expand All @@ -494,29 +503,65 @@ function auto_blocks(dims::Dims{N}) where N
end
auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A))

distribute(A::AbstractArray) = distribute(A, AutoBlocks())
distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} =
_to_darray(Distribute(dist, A))
distribute(A::AbstractArray, ::AutoBlocks) = distribute(A, auto_blocks(A))
function distribute(x::AbstractArray{T,N}, n::NTuple{N}) where {T,N}
distribute(A::AbstractArray, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = distribute(A, AutoBlocks(), assignment)
function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} = :arbitrary) where {T,N}
procgrid = nothing
availprocs = [proc for i in procs() for proc in get_processors(OSProc(i))]
sort!(availprocs, by = x -> (x.owner, x.tid))
if assignment isa Symbol
if assignment == :arbitrary
procgrid = nothing
elseif assignment == :blockrow
p = ntuple(i -> i == 1 ? Int(ceil(size(A,1) / dist.blocksize[1])) : 1, N)
rows_per_proc, extra = divrem(Int(ceil(size(A,1) / dist.blocksize[1])), num_processors())
counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
procgrid = reshape(vcat(fill.(availprocs, counts)...), p)
elseif assignment == :blockcol
p = ntuple(i -> i == N ? Int(ceil(size(A,N) / dist.blocksize[N])) : 1, N)
cols_per_proc, extra = divrem(Int(ceil(size(A,N) / dist.blocksize[N])), num_processors())
counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
procgrid = reshape(vcat(fill.(availprocs, counts)...), p)
elseif assignment == :cyclicrow
p = ntuple(i -> i == 1 ? num_processors() : 1, N)
procgrid = reshape(availprocs, p)
elseif assignment == :cycliccol
p = ntuple(i -> i == N ? num_processors() : 1, N)
procgrid = reshape(availprocs, p)
else
error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol")
end
elseif assignment isa AbstractArray{<:Int, N}
missingprocs = filter(p -> p ∉ procs(), assignment)
isempty(missingprocs) || error("Missing processors: $missingprocs")
procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment]
elseif assignment isa AbstractArray{<:Processor, N}
missingprocs = filter(p -> p ∉ availprocs, assignment)
isempty(missingprocs) || error("Missing processors: $missingprocs")
procgrid = assignment
end

return _to_darray(Distribute(dist, A, procgrid))
end

distribute(A::AbstractArray, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = distribute(A, auto_blocks(A), assignment)
function distribute(x::AbstractArray{T,N}, n::NTuple{N}, assignment::Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} = :arbitrary) where {T,N}
p = map((d, dn)->ceil(Int, d / dn), size(x), n)
distribute(x, Blocks(p))
distribute(x, Blocks(p), assignment)
end
distribute(x::AbstractVector, n::Int) = distribute(x, (n,))
distribute(x::AbstractVector, n::Vector{<:Integer}) =
distribute(x, DomainBlocks((1,), (cumsum(n),)))
distribute(x::AbstractVector, n::Int, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) = distribute(x, (n,), assignment)
distribute(x::AbstractVector, n::Vector{<:Integer}, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) = distribute(x, DomainBlocks((1,), (cumsum(n),)), assignment)

DVector(A::AbstractVector{T}, part::Blocks{1}) where T = distribute(A, part)
DMatrix(A::AbstractMatrix{T}, part::Blocks{2}) where T = distribute(A, part)
DArray(A::AbstractArray{T,N}, part::Blocks{N}) where {T,N} = distribute(A, part)
DVector(A::AbstractVector{T}, part::Blocks{1}, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) where T = distribute(A, part, assignment)
DMatrix(A::AbstractMatrix{T}, part::Blocks{2}, assignment::Union{Symbol, AbstractArray{<:Int, 2}, AbstractArray{<:Processor, 2}} = :arbitrary) where T = distribute(A, part, assignment)
DArray(A::AbstractArray{T,N}, part::Blocks{N}, assignment::Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} = :arbitrary) where {T,N} = distribute(A, part, assignment)

DVector(A::AbstractVector{T}) where T = DVector(A, AutoBlocks())
DMatrix(A::AbstractMatrix{T}) where T = DMatrix(A, AutoBlocks())
DArray(A::AbstractArray) = DArray(A, AutoBlocks())
DVector(A::AbstractVector{T}, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) where T = DVector(A, AutoBlocks(), assignment)
DMatrix(A::AbstractMatrix{T}, assignment::Union{Symbol, AbstractArray{<:Int, 2}, AbstractArray{<:Processor, 2}} = :arbitrary) where T = DMatrix(A, AutoBlocks(), assignment)
DArray(A::AbstractArray, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = DArray(A, AutoBlocks(), assignment)

DVector(A::AbstractVector{T}, ::AutoBlocks) where T = DVector(A, auto_blocks(A))
DMatrix(A::AbstractMatrix{T}, ::AutoBlocks) where T = DMatrix(A, auto_blocks(A))
DArray(A::AbstractArray, ::AutoBlocks) = DArray(A, auto_blocks(A))
DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) where T = DVector(A, auto_blocks(A), assignment)
DMatrix(A::AbstractMatrix{T}, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int, 2}, AbstractArray{<:Processor, 2}} = :arbitrary) where T = DMatrix(A, auto_blocks(A), assignment)
DArray(A::AbstractArray, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = DArray(A, auto_blocks(A), assignment)

function Base.:(==)(x::ArrayOp{T,N}, y::AbstractArray{S,N}) where {T,S,N}
collect(x) == y
Expand Down
4 changes: 2 additions & 2 deletions src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ be used.

All other kwargs are passed directly to `MemPool.poolset`.
"""
function tochunk(x::X, proc::P=OSProc(), scope::S=AnyScope(); persist=false, cache=false, device=nothing, kwargs...) where {X,P,S}
function tochunk(x::X, proc::P=OSProc(), scope::S=DefaultScope(); persist=false, cache=false, device=nothing, kwargs...) where {X,P,S}
if device === nothing
device = if Sch.walk_storage_safe(x)
MemPool.GLOBAL_DEVICE[]
Expand All @@ -284,7 +284,7 @@ function savechunk(data, dir, f)
end
fr = FileRef(f, sz)
proc = OSProc()
scope = AnyScope() # FIXME: Scoped to this node
scope = DefaultScope() # FIXME: Scoped to this node
Chunk{typeof(data),typeof(fr),typeof(proc),typeof(scope)}(typeof(data), domain(data), fr, proc, scope, true)
end

Expand Down
Loading