Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align allocations #230

Merged
merged 3 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,29 @@ To join a data set, use the `Join()` function, which will join the shards and wr
err = enc.Join(io.Discard, data, len(bigfile))
```

## Aligned Allocations

For AMD64 aligned inputs can make a big speed difference.

This is an example of the speed difference when inputs are unaligned/aligned:

```
BenchmarkEncode100x20x10000-32 7058 172648 ns/op 6950.57 MB/s
BenchmarkEncode100x20x10000-32 8406 137911 ns/op 8701.24 MB/s
```

This is mostly the case when dealing with odd-sized shards.

To facilitate this the package provides an `AllocAligned(shards, each int) [][]byte`.
This will allocate a number of shards, each with the size `each`.
Each shard will then be aligned to a 64 byte boundary.

Each encoder also has a `AllocAligned(each int) [][]byte` as an extended interface which will return the same,
but with the shard count configured in the encoder.

It is not possible to re-aligned already allocated slices, for example when using `Split`.
When it is not possible to write to aligned shards, you should not copy to them.

# Progressive encoding

It is possible to encode individual shards using EncodeIdx:
Expand Down
2 changes: 1 addition & 1 deletion galois.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ func genAvx2Matrix(matrixRows [][]byte, inputs, inIdx, outputs int, dst []byte)
// Duplicated in+out
wantBytes := total * 32 * 2
if cap(dst) < wantBytes {
dst = make([]byte, wantBytes)
dst = AllocAligned(1, wantBytes)[0]
} else {
dst = dst[:wantBytes]
}
Expand Down
25 changes: 17 additions & 8 deletions leopard.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (r *leopardFF16) TotalShards() int {
return r.totalShards
}

func (r *leopardFF16) AllocAligned(each int) [][]byte {
return AllocAligned(r.totalShards, each)
}

type ffe uint16

const (
Expand Down Expand Up @@ -129,11 +133,11 @@ func (r *leopardFF16) encode(shards [][]byte) error {
if cap(work) >= m*2 {
work = work[:m*2]
} else {
work = make([][]byte, m*2)
work = AllocAligned(m*2, shardSize)
}
for i := range work {
if cap(work[i]) < shardSize {
work[i] = make([]byte, shardSize)
work[i] = AllocAligned(1, shardSize)[0]
} else {
work[i] = work[i][:shardSize]
}
Expand Down Expand Up @@ -278,13 +282,18 @@ func (r *leopardFF16) Split(data []byte) ([][]byte, error) {
}

// Only allocate memory if necessary
var padding []byte
var padding [][]byte
if len(data) < (r.totalShards * perShard) {
// calculate maximum number of full shards in `data` slice
fullShards := len(data) / perShard
padding = make([]byte, r.totalShards*perShard-perShard*fullShards)
copy(padding, data[perShard*fullShards:])
data = data[0 : perShard*fullShards]
padding = AllocAligned(r.totalShards-fullShards, perShard)
copyFrom := data[perShard*fullShards : dataLen]
for i := range padding {
if len(copyFrom) <= 0 {
break
}
copyFrom = copyFrom[copy(padding[i], copyFrom):]
}
} else {
for i := dataLen; i < dataLen+r.dataShards; i++ {
data[i] = 0
Expand All @@ -300,8 +309,8 @@ func (r *leopardFF16) Split(data []byte) ([][]byte, error) {
}

for j := 0; i+j < len(dst); j++ {
dst[i+j] = padding[:perShard:perShard]
padding = padding[perShard:]
dst[i+j] = padding[0]
padding = padding[1:]
}

return dst, nil
Expand Down
31 changes: 19 additions & 12 deletions leopard8.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (r *leopardFF8) TotalShards() int {
return r.totalShards
}

func (r *leopardFF8) AllocAligned(each int) [][]byte {
return AllocAligned(r.totalShards, each)
}

type ffe8 uint8

const (
Expand Down Expand Up @@ -137,24 +141,22 @@ func (r *leopardFF8) encode(shards [][]byte) error {
var work [][]byte
if w, ok := r.workPool.Get().([][]byte); ok {
work = w
} else {
work = AllocAligned(m*2, workSize8)
}
if cap(work) >= m*2 {
work = work[:m*2]
for i := range work {
if i >= r.parityShards {
if cap(work[i]) < workSize8 {
work[i] = make([]byte, workSize8)
work[i] = AllocAligned(1, workSize8)[0]
} else {
work[i] = work[i][:workSize8]
}
}
}
} else {
work = make([][]byte, m*2)
all := make([]byte, m*2*workSize8)
for i := range work {
work[i] = all[i*workSize8 : i*workSize8+workSize8]
}
work = AllocAligned(m*2, workSize8)
}

defer r.workPool.Put(work)
Expand Down Expand Up @@ -320,13 +322,18 @@ func (r *leopardFF8) Split(data []byte) ([][]byte, error) {
}

// Only allocate memory if necessary
var padding []byte
var padding [][]byte
if len(data) < (r.totalShards * perShard) {
// calculate maximum number of full shards in `data` slice
fullShards := len(data) / perShard
padding = make([]byte, r.totalShards*perShard-perShard*fullShards)
copy(padding, data[perShard*fullShards:])
data = data[0 : perShard*fullShards]
padding = AllocAligned(r.totalShards-fullShards, perShard)
copyFrom := data[perShard*fullShards : dataLen]
for i := range padding {
if len(copyFrom) <= 0 {
break
}
copyFrom = copyFrom[copy(padding[i], copyFrom):]
}
} else {
for i := dataLen; i < dataLen+r.dataShards; i++ {
data[i] = 0
Expand All @@ -342,8 +349,8 @@ func (r *leopardFF8) Split(data []byte) ([][]byte, error) {
}

for j := 0; i+j < len(dst); j++ {
dst[i+j] = padding[:perShard:perShard]
padding = padding[perShard:]
dst[i+j] = padding[0]
padding = padding[1:]
}

return dst, nil
Expand Down
34 changes: 23 additions & 11 deletions reedsolomon.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ type Extensions interface {

// TotalShards will return the total number of shards.
TotalShards() int

// AllocAligned will allocate TotalShards number of slices,
// aligned to reasonable memory sizes.
// Provide the size of each shard.
AllocAligned(each int) [][]byte
}

const (
Expand Down Expand Up @@ -183,6 +188,10 @@ func (r *reedSolomon) TotalShards() int {
return r.totalShards
}

func (r *reedSolomon) AllocAligned(each int) [][]byte {
return AllocAligned(r.totalShards, each)
}

// ErrInvShardNum will be returned by New, if you attempt to create
// an Encoder with less than one data shard or less than zero parity
// shards.
Expand Down Expand Up @@ -1260,10 +1269,7 @@ func (r *reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, byte
return true
}

outputs := make([][]byte, len(toCheck))
for i := range outputs {
outputs[i] = make([]byte, byteCount)
}
outputs := AllocAligned(len(toCheck), byteCount)
r.codeSomeShards(matrixRows, inputs, outputs, byteCount)

for i, calc := range outputs {
Expand Down Expand Up @@ -1488,7 +1494,7 @@ func (r *reedSolomon) reconstruct(shards [][]byte, dataOnly bool, required []boo
if cap(shards[iShard]) >= shardSize {
shards[iShard] = shards[iShard][0:shardSize]
} else {
shards[iShard] = make([]byte, shardSize)
shards[iShard] = AllocAligned(1, shardSize)[0]
}
outputs[outputCount] = shards[iShard]
matrixRows[outputCount] = dataDecodeMatrix[iShard]
Expand All @@ -1514,7 +1520,7 @@ func (r *reedSolomon) reconstruct(shards [][]byte, dataOnly bool, required []boo
if cap(shards[iShard]) >= shardSize {
shards[iShard] = shards[iShard][0:shardSize]
} else {
shards[iShard] = make([]byte, shardSize)
shards[iShard] = AllocAligned(1, shardSize)[0]
}
outputs[outputCount] = shards[iShard]
matrixRows[outputCount] = r.parity[iShard-r.dataShards]
Expand Down Expand Up @@ -1554,12 +1560,18 @@ func (r *reedSolomon) Split(data []byte) ([][]byte, error) {
}

// Only allocate memory if necessary
var padding []byte
var padding [][]byte
if len(data) < (r.totalShards * perShard) {
// calculate maximum number of full shards in `data` slice
fullShards := len(data) / perShard
padding = make([]byte, r.totalShards*perShard-perShard*fullShards)
copy(padding, data[perShard*fullShards:])
padding = AllocAligned(r.totalShards-fullShards, perShard)
copyFrom := data[perShard*fullShards : dataLen]
for i := range padding {
if len(copyFrom) <= 0 {
break
}
copyFrom = copyFrom[copy(padding[i], copyFrom):]
}
data = data[0 : perShard*fullShards]
} else {
for i := dataLen; i < dataLen+r.dataShards; i++ {
Expand All @@ -1576,8 +1588,8 @@ func (r *reedSolomon) Split(data []byte) ([][]byte, error) {
}

for j := 0; i+j < len(dst); j++ {
dst[i+j] = padding[:perShard:perShard]
padding = padding[perShard:]
dst[i+j] = padding[0]
padding = padding[1:]
}

return dst, nil
Expand Down
36 changes: 8 additions & 28 deletions reedsolomon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,11 +1110,8 @@ func benchmarkEncode(b *testing.B, dataShards, parityShards, shardSize int, opts
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, dataShards+parityShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}

shards := r.(Extensions).AllocAligned(shardSize)
for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
Expand All @@ -1141,11 +1138,8 @@ func benchmarkDecode(b *testing.B, dataShards, parityShards, shardSize, deleteSh
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, dataShards+parityShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}

shards := r.(Extensions).AllocAligned(shardSize)
for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
Expand Down Expand Up @@ -1321,10 +1315,7 @@ func benchmarkVerify(b *testing.B, dataShards, parityShards, shardSize int) {
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, parityShards+dataShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
shards := r.(Extensions).AllocAligned(shardSize)

for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
Expand Down Expand Up @@ -1404,10 +1395,7 @@ func benchmarkReconstruct(b *testing.B, dataShards, parityShards, shardSize int,
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, parityShards+dataShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
shards := r.(Extensions).AllocAligned(shardSize)

for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
Expand Down Expand Up @@ -1492,10 +1480,7 @@ func benchmarkReconstructData(b *testing.B, dataShards, parityShards, shardSize
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, parityShards+dataShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
shards := r.(Extensions).AllocAligned(shardSize)

for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
Expand Down Expand Up @@ -1573,10 +1558,7 @@ func benchmarkReconstructP(b *testing.B, dataShards, parityShards, shardSize int
b.ReportAllocs()

b.RunParallel(func(pb *testing.PB) {
shards := make([][]byte, parityShards+dataShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
shards := r.(Extensions).AllocAligned(shardSize)

for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
Expand Down Expand Up @@ -2030,10 +2012,8 @@ func benchmarkParallel(b *testing.B, dataShards, parityShards, shardSize int) {
// Create independent shards
shardsCh := make(chan [][]byte, c)
for i := 0; i < c; i++ {
shards := make([][]byte, dataShards+parityShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
shards := r.(Extensions).AllocAligned(shardSize)

for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
Expand Down
6 changes: 1 addition & 5 deletions streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,7 @@ func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error)
r.r = enc.(*reedSolomon)

r.blockPool.New = func() interface{} {
out := make([][]byte, dataShards+parityShards)
for i := range out {
out[i] = make([]byte, r.o.streamBS)
}
return out
return AllocAligned(dataShards+parityShards, r.o.streamBS)
}
r.readShards = readShards
r.writeShards = writeShards
Expand Down
41 changes: 41 additions & 0 deletions unsafe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//go:build !noasm && !nounsafe && !gccgo && !appengine

/**
* Reed-Solomon Coding over 8-bit values.
*
* Copyright 2023, Klaus Post
*/

package reedsolomon

import (
"unsafe"
)

// AllocAligned allocates 'shards' slices, with 'each' bytes.
// Each slice will start on a 64 byte aligned boundary.
func AllocAligned(shards, each int) [][]byte {
if false {
res := make([][]byte, shards)
for i := range res {
res[i] = make([]byte, each)
}
return res
}
const (
alignEach = 64
alignStart = 64
)
eachAligned := ((each + alignEach - 1) / alignEach) * alignEach
total := make([]byte, eachAligned*shards+63)
align := uint(uintptr(unsafe.Pointer(&total[0]))) & (alignStart - 1)
if align > 0 {
total = total[alignStart-align:]
}
res := make([][]byte, shards)
for i := range res {
res[i] = total[:each:eachAligned]
total = total[eachAligned:]
}
return res
}
Loading