Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module github.com/celestiaorg/go-square/v3

go 1.23.6
go 1.24.0

require (
github.com/celestiaorg/nmt v0.24.1
github.com/celestiaorg/nmt v0.24.2
github.com/stretchr/testify v1.11.1
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
golang.org/x/exp v0.0.0-20250911091902-df9299821621
golang.org/x/sync v0.17.0
google.golang.org/protobuf v1.36.9
)

Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
github.com/celestiaorg/nmt v0.24.1 h1:MhGKqp257eq2EQQKcva1H/BSYFqIt0Trk8/t3IWfWSw=
github.com/celestiaorg/nmt v0.24.1/go.mod h1:IhLnJDgCdP70crZFpgihFmU6G+PGeXN37tnMRm+/4iU=
github.com/celestiaorg/nmt v0.24.2 h1:LlpJSPOd6/Lw1Ig6HUhZuqiINHLka/ZSRTBzlNJpchg=
github.com/celestiaorg/nmt v0.24.2/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand All @@ -27,6 +28,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8=
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/exp v0.0.0-20250911091902-df9299821621 h1:2id6c1/gto0kaHYyrixvknJ8tUK/Qs5IsmBtrc+FtgU=
golang.org/x/exp v0.0.0-20250911091902-df9299821621/go.mod h1:TwQYMMnGpvZyc+JpB/UAuTNIsVJifOlSkrZkhcvpVUk=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand All @@ -35,7 +38,10 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
132 changes: 132 additions & 0 deletions inclusion/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package inclusion

import (
"crypto/sha256"
"errors"
"fmt"

sh "github.com/celestiaorg/go-square/v3/share"
"github.com/celestiaorg/nmt"
"golang.org/x/sync/errgroup"
)

type MerkleRootFn func([][]byte) []byte
Expand Down Expand Up @@ -81,6 +84,135 @@ func GenerateSubtreeRoots(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, er
return subTreeRoots, nil
}

// CreateParallelCommitments generates commitments for multiple blobs in parallel using a pool of NMT instances.
// See docs for CreateCommitment for more details.
func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int, numWorkers int) ([][]byte, error) {
if len(blobs) == 0 {
return [][]byte{}, nil
}
if numWorkers <= 0 {
return nil, errors.New("number of workers must be positive")
}

// split all blobs into shares in parallel
blobSharesResults := make([][]sh.Share, len(blobs))
g := new(errgroup.Group)
g.SetLimit(numWorkers)

for i := range blobs {
idx := i
g.Go(func() error {
shares, err := splitBlobs(blobs[idx])
if err != nil {
return err
}
blobSharesResults[idx] = shares
return nil
})
}
if err := g.Wait(); err != nil {
return nil, fmt.Errorf("failed to split blob shares: %w", err)
}

maxSubtreeSize := 0
type blobInfo struct {
shares []sh.Share
namespace sh.Namespace
leafSets [][][]byte
}
blobInfos := make([]blobInfo, len(blobs))

// calculate the maximum subtree size across all blobs and prepare
// subtree for parallel calculation using pooled nmts
for i, blob := range blobs {
shares := blobSharesResults[i]
subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold)
treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth))
if err != nil {
return nil, err
}
for _, size := range treeSizes {
if int(size) > maxSubtreeSize {
maxSubtreeSize = int(size)
}
}

// prepare leaf sets for this blob
leafSets := make([][][]byte, len(treeSizes))
cursor := uint64(0)
for j, treeSize := range treeSizes {
leafSets[j] = sh.ToBytes(shares[cursor : cursor+treeSize])
cursor += treeSize
}
blobInfos[i] = blobInfo{
shares: shares,
namespace: blob.Namespace(),
leafSets: leafSets,
}
}

pool, err := newNMTPool(numWorkers, maxSubtreeSize)
if err != nil {
return nil, err
}

// process all subtree roots in parallel
type subtreeResult struct {
blobIdx int
treeIdx int
root []byte
}
totalSubtrees := 0
for _, info := range blobInfos {
totalSubtrees += len(info.leafSets)
}
resultChan := make(chan subtreeResult, totalSubtrees)
g = new(errgroup.Group)
g.SetLimit(numWorkers)

// queue all subtree computations
// since go 1.22 there is no need to copy the variables used in loop
for blobIdx, info := range blobInfos {
for treeIdx, leafSet := range info.leafSets {
g.Go(func() error {
tree := pool.acquire()
root, err := tree.computeRoot(info.namespace.Bytes(), leafSet)
resultChan <- subtreeResult{
blobIdx: blobIdx,
treeIdx: treeIdx,
root: root,
}
return err
})
}
}

if err := g.Wait(); err != nil {
close(resultChan)
return nil, err
}
close(resultChan)

// collect results and organize by blob
subtreeRootsByBlob := make([][][]byte, len(blobs))
for i, info := range blobInfos {
subtreeRootsByBlob[i] = make([][]byte, len(info.leafSets))
}

for result := range resultChan {
subtreeRootsByBlob[result.blobIdx][result.treeIdx] = result.root
}

// compute final commitments using the merkle root function
commitments := make([][]byte, len(blobs))
for i, subtreeRoots := range subtreeRootsByBlob {
commitments[i] = merkleRootFn(subtreeRoots)
}

return commitments, nil
}

// CreateCommitments generates commitments sequentially for given blobs.
func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int) ([][]byte, error) {
commitments := make([][]byte, len(blobs))
for i, blob := range blobs {
Expand Down
158 changes: 158 additions & 0 deletions inclusion/commitment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package inclusion_test
import (
"bytes"
"crypto/sha256"
"fmt"
"math/rand"
"runtime"
"testing"
"time"

"github.com/celestiaorg/go-square/v3/inclusion"
"github.com/celestiaorg/go-square/v3/internal/test"
"github.com/celestiaorg/go-square/v3/share"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -113,3 +118,156 @@ func twoLeafMerkleRoot(data [][]byte) []byte {
sum := sha256.Sum256(append(h1[:], h2[:]...))
return sum[:]
}

func hashConcatenatedData(data [][]byte) []byte {
var total []byte
for _, d := range data {
total = append(total, d...)
}
finalHash := sha256.Sum256(total)
return finalHash[:]
}

// TestCreateParallelCommitments compares results of
// parallel and non-parallel versions of the algorithm with different configurations.
func TestCreateParallelCommitments(t *testing.T) {
t.Run("empty blob", func(t *testing.T) {
result, err := inclusion.CreateParallelCommitments([]*share.Blob{}, hashConcatenatedData, defaultSubtreeRootThreshold, 4)
require.NoError(t, err)
assert.Empty(t, result)
})

t.Run("equivalence with sequential commitments (random)", func(t *testing.T) {
var (
workers = runtime.NumCPU()
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
minBlobSize = 512
maxBlobSize = 1024 * 1024
maxBlobNum = 10
minBlobNum = 2
testCount = 100
)

for i := 0; i < testCount; i++ {
numBlobs := rng.Intn(maxBlobNum-minBlobNum) + minBlobNum
blobSizes := make([]int, numBlobs)
maxSize := 0
for j := range blobSizes {
blobSizes[j] = rng.Intn(maxBlobSize-minBlobSize) + minBlobSize
if blobSizes[j] > maxSize {
maxSize = blobSizes[j]
}
}
t.Run(fmt.Sprintf("test_%d_blobs_%d_max_size_%d", numBlobs, i, maxSize), func(t *testing.T) {
blobs := test.GenerateBlobs(blobSizes...)

sequential, err := inclusion.CreateCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold)
require.NoError(t, err)

parallel, err := inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, workers)
require.NoError(t, err)

assert.Equal(t, sequential, parallel,
"Parallel results with %d workers should match sequential for %d blobs",
workers, numBlobs)
})
}
})

t.Run("equivalence with sequential commitments (fixed)", func(t *testing.T) {
genBlobSizes := func(size, count int) []int {
blobSizes := make([]int, count)
for i := range blobSizes {
blobSizes[i] = size
}
return blobSizes
}
testCases := []struct {
name string
blobSizes []int
}{
{
name: "16*512bytes",
blobSizes: genBlobSizes(512, 16),
},
{
name: "16x8MB",
blobSizes: genBlobSizes(8*1024*1024, 16),
},
{
name: "mixed_sizes",
blobSizes: []int{512, 8192, 32768, 131072},
},
}
workers := runtime.NumCPU()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
blobs := test.GenerateBlobs(tc.blobSizes...)

sequential, err := inclusion.CreateCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold)
require.NoError(t, err)

parallel, err := inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, workers)
require.NoError(t, err)

assert.Equal(t, sequential, parallel,
"Parallel results should match sequential for %s", tc.name)
})
}
})

t.Run("invalid worker count", func(t *testing.T) {
blobs := test.GenerateBlobs(1024)

_, err := inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, 0)
require.Error(t, err)

_, err = inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, -1)
require.Error(t, err)
})
}

// BenchmarkCommitmentsComparison directly compares CreateCommitment vs CreateParallelCommitments.
func BenchmarkCommitmentsComparison(b *testing.B) {
scenarios := []struct {
numBlobs int
bytesPerBlob int
description string
}{
{1, 1048576, "1x1MB"}, // 1 blob of 1MB
{10, 104858, "10x100KB"}, // 10 blobs of ~100KB each
{100, 10486, "100x10KB"}, // 100 blobs of ~10KB each
{4, 1048576, "4x1MB"}, // 4 blobs of 1MB each
{16, 262144, "16x256KB"}, // 16 blobs of 256KB each
{64, 65536, "64x64KB"}, // 64 blobs of 64KB each
{16, 8388608, "16x8MB"}, // 16 blobs of 8MB each (128MB total)
}
emptyHash := func([][]byte) []byte {
return nil
}
for _, scenario := range scenarios {
blobSizes := make([]int, scenario.numBlobs)
for i := range blobSizes {
blobSizes[i] = scenario.bytesPerBlob
}
blobs := test.GenerateBlobs(blobSizes...)
b.Run(fmt.Sprintf("%s_Sequential", scenario.description), func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, blob := range blobs {
_, err := inclusion.CreateCommitment(blob, emptyHash, defaultSubtreeRootThreshold)
require.NoError(b, err)
}
}
})
b.Run(fmt.Sprintf("%s_Parallel", scenario.description), func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err := inclusion.CreateParallelCommitments(blobs, emptyHash, defaultSubtreeRootThreshold, runtime.NumCPU())
require.NoError(b, err)
}
})
}
}
Loading
Loading