From d54ecb2425d6f12cdf336c26a83704b23f740ba7 Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Wed, 24 Sep 2025 11:05:42 +0200 Subject: [PATCH 1/6] perf: use buffer for commitment --- go.mod | 5 +- go.sum | 6 +- inclusion/commitment.go | 377 ++++++++++++++++++++++++++++++++++- inclusion/commitment_test.go | 345 ++++++++++++++++++++++++++++++++ inclusion/nmt_pool.go | 98 +++++++++ 5 files changed, 826 insertions(+), 5 deletions(-) create mode 100644 inclusion/nmt_pool.go diff --git a/go.mod b/go.mod index c7ee740..debdf33 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/celestiaorg/go-square/v3 -go 1.23.6 +go 1.24.0 require ( - github.com/celestiaorg/nmt v0.24.1 + github.com/celestiaorg/nmt v0.24.2-0.20250918161004-dcaa76b80708 github.com/stretchr/testify v1.11.1 golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb google.golang.org/protobuf v1.36.9 @@ -13,5 +13,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sync v0.17.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index ea6aee6..e70f76a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/celestiaorg/nmt v0.24.1 h1:MhGKqp257eq2EQQKcva1H/BSYFqIt0Trk8/t3IWfWSw= -github.com/celestiaorg/nmt v0.24.1/go.mod h1:IhLnJDgCdP70crZFpgihFmU6G+PGeXN37tnMRm+/4iU= +github.com/celestiaorg/nmt v0.24.2-0.20250918161004-dcaa76b80708 h1:V+Ak590bIn9g3dqYaKDsno8lpvEmjtiFHXDTdguIFoE= +github.com/celestiaorg/nmt v0.24.2-0.20250918161004-dcaa76b80708/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -36,6 +36,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/inclusion/commitment.go b/inclusion/commitment.go index 2e5f02f..7f57484 100644 --- a/inclusion/commitment.go +++ b/inclusion/commitment.go @@ -2,9 +2,11 @@ package inclusion import ( "crypto/sha256" + "sync" sh "github.com/celestiaorg/go-square/v3/share" "github.com/celestiaorg/nmt" + "golang.org/x/sync/errgroup" ) type MerkleRootFn func([][]byte) []byte @@ -81,7 +83,380 @@ func GenerateSubtreeRoots(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, er return subTreeRoots, nil } -func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int) ([][]byte, error) { +// GenerateSubtreeRootsReusedNMT generates the subtree roots of a blob using a reusable NMT. +// This is an optimized version that reuses the same NMT instance across multiple subtrees +// to reduce memory allocations. +func GenerateSubtreeRootsReusedNMT(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, error) { + shares, err := splitBlobs(blob) + if err != nil { + return nil, err + } + + // the commitment is the root of a merkle mountain range with max tree size + // determined by the number of roots required to create a share commitment + // over that blob. The size of the tree is only increased if the number of + // subtree roots surpasses a constant threshold. + subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) + treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth)) + if err != nil { + return nil, err + } + leafSets := make([][][]byte, len(treeSizes)) + cursor := uint64(0) + for i, treeSize := range treeSizes { + leafSets[i] = sh.ToBytes(shares[cursor : cursor+treeSize]) + cursor += treeSize + } + + namespace := blob.Namespace() + // Create a single NMT instance with ReuseBuffer option + tree := nmt.New(sha256.New(), + nmt.NamespaceIDSize(sh.NamespaceSize), + nmt.IgnoreMaxNamespace(true), + nmt.ReuseBuffer(true)) + + // Pre-allocate a single large buffer for all shares with namespace prepended + // This allows NMT to use the buffer directly without copying + leafSize := sh.NamespaceSize + sh.ShareSize + nsLeafBuf := make([]byte, len(shares)*leafSize) + + // Pre-fill all namespace prefixes in the buffer + for i := 0; i < len(shares); i++ { + copy(nsLeafBuf[i*leafSize:i*leafSize+sh.NamespaceSize], namespace.Bytes()) + } + + // create the commitments by pushing each leaf set onto the reused NMT + subTreeRoots := make([][]byte, len(leafSets)) + shareIdx := 0 + for i, set := range leafSets { + // Reset the tree for reuse + tree.Reset() + + for _, leaf := range set { + // Calculate offset in the large buffer + offset := shareIdx * leafSize + // Copy share data after the namespace + copy(nsLeafBuf[offset+sh.NamespaceSize:offset+leafSize], leaf) + // Push slice from the large buffer - NMT will use it directly + nsLeaf := nsLeafBuf[offset : offset+sh.NamespaceSize+len(leaf)] + + err = tree.Push(nsLeaf) + if err != nil { + return nil, err + } + shareIdx++ + } + // add the root + root, err := tree.Root() + if err != nil { + return nil, err + } + // Make a copy of the root since the tree buffer will be reused + subTreeRoots[i] = append([]byte(nil), root...) + } + return subTreeRoots, nil +} + +// GenerateSubtreeRootsParallel generates the subtree roots of a blob using parallel processing. +// This version uses goroutines to process multiple leaf sets concurrently with reusable NMTs. +func GenerateSubtreeRootsParallel(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, error) { + return GenerateSubtreeRootsParallelWithWorkers(blob, subtreeRootThreshold, 16) +} + +// GenerateSubtreeRootsParallelWithWorkers generates the subtree roots of a blob using parallel processing +// with a configurable number of worker goroutines. +// +// Work Distribution Strategy: +// The function splits the work into "leaf sets" where each leaf set represents a subtree in the +// Merkle Mountain Range. The work is distributed as follows: +// +// 1. Each leaf set is an independent unit of work that produces one subtree root +// 2. Leaf sets are processed via a work queue (channel) that workers pull from +// 3. Each worker goroutine: +// - Has its own reusable NMT instance to avoid lock contention +// - Pulls leaf sets from the work queue until it's empty +// - Processes each leaf set by pushing all its leaves into the NMT and computing the root +// +// For example, with 128 shares and subtreeRootThreshold=64: +// - This creates 2 leaf sets of 64 shares each +// - With 2 workers, each worker would process 1 leaf set +// - With 4 workers, 2 workers would each process 1 leaf set, and 2 would be idle +// +// The work distribution is dynamic (work-stealing pattern) rather than static partitioning, +// which provides better load balancing when leaf sets have different sizes. +func GenerateSubtreeRootsParallelWithWorkers(blob *sh.Blob, subtreeRootThreshold int, numWorkers int) ([][]byte, error) { + shares, err := splitBlobs(blob) + if err != nil { + return nil, err + } + + // Calculate Merkle Mountain Range structure + subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) + treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth)) + if err != nil { + return nil, err + } + + // Create leaf sets - each will become one subtree root + leafSets := make([][][]byte, len(treeSizes)) + cursor := uint64(0) + for i, treeSize := range treeSizes { + leafSets[i] = sh.ToBytes(shares[cursor : cursor+treeSize]) + cursor += treeSize + } + + namespace := blob.Namespace() + + // Pre-allocate a single large buffer for all shares with namespace prepended + // This avoids allocations during NMT operations + leafSize := sh.NamespaceSize + sh.ShareSize + nsLeafBuf := make([]byte, len(shares)*leafSize) + + // Pre-fill all namespace prefixes and share data in the buffer + for i := 0; i < len(shares); i++ { + copy(nsLeafBuf[i*leafSize:i*leafSize+sh.NamespaceSize], namespace.Bytes()) + } + + shareIdx := 0 + for _, set := range leafSets { + for _, leaf := range set { + offset := shareIdx * leafSize + copy(nsLeafBuf[offset+sh.NamespaceSize:offset+leafSize], leaf) + shareIdx++ + } + } + + // Result slice to hold computed roots + subTreeRoots := make([][]byte, len(leafSets)) + + // Adjust number of workers based on available work + if numWorkers < 1 { + numWorkers = 1 + } + if len(leafSets) < numWorkers { + numWorkers = len(leafSets) + } + + // Use errgroup for clean error handling and goroutine management + g := new(errgroup.Group) + workChan := make(chan int, len(leafSets)) + + // Queue all work items (leaf set indices) + for i := range leafSets { + workChan <- i + } + close(workChan) + + // Mutex to protect subTreeRoots writes (though each index is written only once) + var mu sync.Mutex + + // Start worker goroutines + for w := 0; w < numWorkers; w++ { + g.Go(func() error { + // Each worker has its own reusable NMT to avoid contention + tree := nmt.New(sha256.New(), + nmt.NamespaceIDSize(sh.NamespaceSize), + nmt.IgnoreMaxNamespace(true), + nmt.ReuseBuffer(true)) + + // Process work items from the queue + for leafSetIdx := range workChan { + tree.Reset() + set := leafSets[leafSetIdx] + + // Calculate starting position in the pre-filled buffer + startIdx := 0 + for j := 0; j < leafSetIdx; j++ { + startIdx += len(leafSets[j]) + } + + // Push all leaves for this leaf set into the NMT + for j, leaf := range set { + offset := (startIdx + j) * leafSize + nsLeaf := nsLeafBuf[offset : offset+sh.NamespaceSize+len(leaf)] + + if err := tree.Push(nsLeaf); err != nil { + return err + } + } + + // Compute the root for this subtree + root, err := tree.Root() + if err != nil { + return err + } + + // Store the root (making a copy since tree buffer will be reused) + mu.Lock() + subTreeRoots[leafSetIdx] = append([]byte(nil), root...) + mu.Unlock() + } + return nil + }) + } + + // Wait for all workers to complete + if err := g.Wait(); err != nil { + return nil, err + } + + return subTreeRoots, nil +} + +// CreateCommitments generates commitments for multiple blobs in parallel using a pool of NMT instances. +// This implementation: +// 1. Splits blobs into shares in parallel using X goroutines +// 2. Uses a pool of buffered NMT wrappers for efficient memory reuse +// 3. Processes all subtree roots across all blobs concurrently +// 4. Returns commitments in the same order as input blobs +func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int, numWorkers int) ([][]byte, error) { + if len(blobs) == 0 { + return [][]byte{}, nil + } + + // Step 1: Split all blobs into shares in parallel + type blobShares struct { + shares []sh.Share + err error + } + + blobSharesResults := make([]blobShares, len(blobs)) + g := new(errgroup.Group) + g.SetLimit(numWorkers) // Limit concurrent blob splitting + + for i := range blobs { + idx := i + g.Go(func() error { + shares, err := splitBlobs(blobs[idx]) + blobSharesResults[idx] = blobShares{shares: shares, err: err} + return err + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // Step 2: Calculate the maximum subtree size across all blobs + maxSubtreeSize := 0 + type blobInfo struct { + shares []sh.Share + namespace sh.Namespace + treeSizes []uint64 + leafSets [][][]byte + } + blobInfos := make([]blobInfo, len(blobs)) + + for i, blob := range blobs { + shares := blobSharesResults[i].shares + subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) + treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth)) + if err != nil { + return nil, err + } + + // Track maximum subtree size for buffer allocation + for _, size := range treeSizes { + if int(size) > maxSubtreeSize { + maxSubtreeSize = int(size) + } + } + + // Prepare leaf sets for this blob + leafSets := make([][][]byte, len(treeSizes)) + cursor := uint64(0) + for j, treeSize := range treeSizes { + leafSets[j] = sh.ToBytes(shares[cursor : cursor+treeSize]) + cursor += treeSize + } + + blobInfos[i] = blobInfo{ + shares: shares, + namespace: blob.Namespace(), + treeSizes: treeSizes, + leafSets: leafSets, + } + } + + // Step 3: Create NMT pool with appropriate buffer size + poolSize := numWorkers * 2 // Allow some buffer for concurrent operations + pool := newNMTPool(poolSize, maxSubtreeSize) + + // Step 4: Process all subtree roots in parallel + type subtreeResult struct { + blobIdx int + treeIdx int + root []byte + err error + } + + // Calculate total number of subtrees + totalSubtrees := 0 + for _, info := range blobInfos { + totalSubtrees += len(info.leafSets) + } + + resultChan := make(chan subtreeResult, totalSubtrees) + g = new(errgroup.Group) + g.SetLimit(numWorkers) + + // Queue all subtree computations + for blobIdx, info := range blobInfos { + bIdx := blobIdx + bInfo := info + for treeIdx, leafSet := range info.leafSets { + tIdx := treeIdx + leaves := leafSet + + g.Go(func() error { + // Acquire a buffered NMT from the pool + tree := pool.acquire() + + // Compute the root for this subtree + root, err := tree.computeRoot(bInfo.namespace.Bytes(), leaves) + + resultChan <- subtreeResult{ + blobIdx: bIdx, + treeIdx: tIdx, + root: root, + err: err, + } + return err + }) + } + } + + // Wait for all computations to complete + if err := g.Wait(); err != nil { + close(resultChan) + return nil, err + } + close(resultChan) + + // Step 5: Collect results and organize by blob + subtreeRootsByBlob := make([][][]byte, len(blobs)) + for i, info := range blobInfos { + subtreeRootsByBlob[i] = make([][]byte, len(info.leafSets)) + } + + for result := range resultChan { + if result.err != nil { + return nil, result.err + } + subtreeRootsByBlob[result.blobIdx][result.treeIdx] = result.root + } + + // Step 6: Compute final commitments using the merkle root function + commitments := make([][]byte, len(blobs)) + for i, subtreeRoots := range subtreeRootsByBlob { + commitments[i] = merkleRootFn(subtreeRoots) + } + + return commitments, nil +} + +// CreateCommitmentsSequential is the original sequential implementation for comparison. +func CreateCommitmentsSequential(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int) ([][]byte, error) { commitments := make([][]byte, len(blobs)) for i, blob := range blobs { commitment, err := CreateCommitment(blob, merkleRootFn, subtreeRootThreshold) diff --git a/inclusion/commitment_test.go b/inclusion/commitment_test.go index 89f6e2e..6aaabc9 100644 --- a/inclusion/commitment_test.go +++ b/inclusion/commitment_test.go @@ -3,9 +3,12 @@ package inclusion_test import ( "bytes" "crypto/sha256" + "fmt" + "runtime" "testing" "github.com/celestiaorg/go-square/v3/inclusion" + "github.com/celestiaorg/go-square/v3/internal/test" "github.com/celestiaorg/go-square/v3/share" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -60,6 +63,32 @@ func TestMerkleMountainRangeSizes(t *testing.T) { // TestCreateCommitment will fail if a change is made to share encoding or how // the commitment is calculated. If this is the case, the expected commitment // bytes will need to be updated. +func TestGenerateSubtreeRootsEquivalence(t *testing.T) { + blobSizes := []int{ + share.AvailableBytesFromSparseShares(2), + share.AvailableBytesFromSparseShares(16), + share.AvailableBytesFromSparseShares(64), + } + + for _, size := range blobSizes { + blobs := test.GenerateBlobs(size) + require.Len(t, blobs, 1) + blob := blobs[0] + + original, err := inclusion.GenerateSubtreeRoots(blob, defaultSubtreeRootThreshold) + require.NoError(t, err) + + optimized, err := inclusion.GenerateSubtreeRootsReusedNMT(blob, defaultSubtreeRootThreshold) + require.NoError(t, err) + + parallel, err := inclusion.GenerateSubtreeRootsParallel(blob, defaultSubtreeRootThreshold) + require.NoError(t, err) + + assert.Equal(t, original, optimized, "Optimized results should be identical for blob size %d", size) + assert.Equal(t, original, parallel, "Parallel results should be identical for blob size %d", size) + } +} + func TestCreateCommitment(t *testing.T) { ns1 := share.MustNewV0Namespace(bytes.Repeat([]byte{0x1}, share.NamespaceVersionZeroIDSize)) @@ -113,3 +142,319 @@ func twoLeafMerkleRoot(data [][]byte) []byte { sum := sha256.Sum256(append(h1[:], h2[:]...)) return sum[:] } + +func simpleMerkleRoot(data [][]byte) []byte { + // Return a dummy 32-byte value to avoid affecting benchmark + return make([]byte, 32) +} + +func BenchmarkGenerateSubtreeRoots(b *testing.B) { + benchmarks := []struct { + name string + blobSize int + }{ + { + name: "2 shares", + blobSize: share.AvailableBytesFromSparseShares(2), + }, + { + name: "16 shares", + blobSize: share.AvailableBytesFromSparseShares(16), + }, + { + name: "64 shares", + blobSize: share.AvailableBytesFromSparseShares(64), + }, + { + name: "128 shares", + blobSize: share.AvailableBytesFromSparseShares(128), + }, + { + name: "256 shares", + blobSize: share.AvailableBytesFromSparseShares(256), + }, + { + name: "16384 shares", + blobSize: share.AvailableBytesFromSparseShares(16384), + }, + } + + for _, bm := range benchmarks { + blobs := test.GenerateBlobs(bm.blobSize) + if len(blobs) != 1 { + b.Fatal("expected exactly one blob") + } + blob := blobs[0] + + b.Run(bm.name+"/original", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.GenerateSubtreeRoots(blob, defaultSubtreeRootThreshold) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run(bm.name+"/reused_nmt", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.GenerateSubtreeRootsReusedNMT(blob, defaultSubtreeRootThreshold) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run(bm.name+"/parallel", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.GenerateSubtreeRootsParallel(blob, defaultSubtreeRootThreshold) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +func BenchmarkGenerateSubtreeRootsParallelWorkers(b *testing.B) { + blobSizes := []struct { + name string + blobSize int + }{ + { + name: "64 shares", + blobSize: share.AvailableBytesFromSparseShares(64), + }, + { + name: "128 shares", + blobSize: share.AvailableBytesFromSparseShares(128), + }, + { + name: "256 shares", + blobSize: share.AvailableBytesFromSparseShares(256), + }, + { + name: "512 shares", + blobSize: share.AvailableBytesFromSparseShares(512), + }, + } + + workerCounts := []int{1, 2, 4, 8, 16} + + for _, bs := range blobSizes { + blobs := test.GenerateBlobs(bs.blobSize) + if len(blobs) != 1 { + b.Fatal("expected exactly one blob") + } + blob := blobs[0] + + for _, workers := range workerCounts { + b.Run(fmt.Sprintf("%s/%d_workers", bs.name, workers), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.GenerateSubtreeRootsParallelWithWorkers(blob, defaultSubtreeRootThreshold, workers) + if err != nil { + b.Fatal(err) + } + } + }) + } + + // Also benchmark the sequential reused version for comparison + b.Run(bs.name+"/sequential_reused", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.GenerateSubtreeRootsReusedNMT(blob, defaultSubtreeRootThreshold) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +func BenchmarkCreateCommitment(b *testing.B) { + benchmarks := []struct { + name string + blobSize int + }{ + { + name: "2 shares", + blobSize: share.AvailableBytesFromSparseShares(2), + }, + { + name: "4 shares", + blobSize: share.AvailableBytesFromSparseShares(4), + }, + { + name: "8 shares", + blobSize: share.AvailableBytesFromSparseShares(8), + }, + { + name: "16 shares", + blobSize: share.AvailableBytesFromSparseShares(16), + }, + { + name: "32 shares", + blobSize: share.AvailableBytesFromSparseShares(32), + }, + { + name: "64 shares", + blobSize: share.AvailableBytesFromSparseShares(64), + }, + { + name: "128 shares", + blobSize: share.AvailableBytesFromSparseShares(128), + }, + { + name: "256 shares", + blobSize: share.AvailableBytesFromSparseShares(256), + }, + { + name: "512 shares", + blobSize: share.AvailableBytesFromSparseShares(512), + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + blobs := test.GenerateBlobs(bm.blobSize) + if len(blobs) != 1 { + b.Fatal("expected exactly one blob") + } + blob := blobs[0] + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.CreateCommitment(blob, simpleMerkleRoot, defaultSubtreeRootThreshold) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +func TestCreateCommitmentsEquivalence(t *testing.T) { + // Test with various combinations of blob counts and sizes + testCases := []struct { + name string + blobSizes []int + }{ + { + name: "single small blob", + blobSizes: []int{1024}, // ~2 shares + }, + { + name: "multiple small blobs", + blobSizes: []int{1024, 2048, 1536}, + }, + { + name: "mixed sizes", + blobSizes: []int{512, 8192, 32768, 1024}, + }, + { + name: "large blobs", + blobSizes: []int{65536, 131072}, // 128 and 256 shares + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + blobs := test.GenerateBlobs(tc.blobSizes...) + + // Sequential version + sequential, err := inclusion.CreateCommitmentsSequential(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold) + require.NoError(t, err) + + // Parallel version with different worker counts + for _, workers := range []int{1, 2, 4, 8} { + parallel, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) + require.NoError(t, err) + + assert.Equal(t, sequential, parallel, + "Parallel results with %d workers should match sequential for %s", workers, tc.name) + } + }) + } +} + +func TestCreateCommitmentsEmpty(t *testing.T) { + // Test with empty blob slice + result, err := inclusion.CreateCommitments([]*share.Blob{}, simpleMerkleRoot, defaultSubtreeRootThreshold, 4) + require.NoError(t, err) + assert.Empty(t, result) +} + +// BenchmarkCommitmentsComparison directly compares CreateCommitment vs CreateCommitments +func BenchmarkCommitmentsComparison(b *testing.B) { + // Test scenarios with different blob configurations + scenarios := []struct { + numBlobs int + bytesPerBlob int + description string + }{ + {1, 1048576, "1x1MB"}, // 1 blob of 1MB + {10, 104858, "10x100KB"}, // 10 blobs of ~100KB each + {100, 10486, "100x10KB"}, // 100 blobs of ~10KB each + {4, 1048576, "4x1MB"}, // 4 blobs of 1MB each + {16, 262144, "16x256KB"}, // 16 blobs of 256KB each + {64, 65536, "64x64KB"}, // 64 blobs of 64KB each + {16, 8388608, "16x8MB"}, // 16 blobs of 8MB each (128MB total) + } + + for _, scenario := range scenarios { + // Generate blobs for this scenario + blobSizes := make([]int, scenario.numBlobs) + for i := range blobSizes { + blobSizes[i] = scenario.bytesPerBlob + } + blobs := test.GenerateBlobs(blobSizes...) + + //totalMB := float64(scenario.numBlobs * scenario.bytesPerBlob) / (1024 * 1024) + + // Sequential: CreateCommitment for each blob + b.Run(fmt.Sprintf("%s_Sequential", scenario.description), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, blob := range blobs { + _, err := inclusion.CreateCommitment(blob, simpleMerkleRoot, defaultSubtreeRootThreshold) + if err != nil { + b.Fatal(err) + } + } + } + //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") + }) + + // Parallel: CreateCommitments with 8 workers + b.Run(fmt.Sprintf("%s_Parallel8", scenario.description), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, 8) + if err != nil { + b.Fatal(err) + } + } + //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") + }) + + // For the large 16x8MB scenario, also test with different worker counts + if scenario.description == "16x8MB" { + for _, workers := range []int{4, 16, 32, 4 * runtime.NumCPU()} { + b.Run(fmt.Sprintf("%s_Parallel%d", scenario.description, workers), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) + if err != nil { + b.Fatal(err) + } + } + //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") + }) + } + } + } +} diff --git a/inclusion/nmt_pool.go b/inclusion/nmt_pool.go new file mode 100644 index 0000000..965a4ee --- /dev/null +++ b/inclusion/nmt_pool.go @@ -0,0 +1,98 @@ +package inclusion + +import ( + "crypto/sha256" + + sh "github.com/celestiaorg/go-square/v3/share" + "github.com/celestiaorg/nmt" +) + +// nmtPool provides a fixed-size pool of bufferedNMT instances for efficient reuse. +type nmtPool struct { + trees chan *bufferedNMT + poolSize int + opts []nmt.Option +} + +// newNMTPool creates a new pool of buffered NMT instances. +func newNMTPool(poolSize int, maxSubtreeSize int) *nmtPool { + pool := &nmtPool{ + trees: make(chan *bufferedNMT, poolSize), + poolSize: poolSize, + opts: []nmt.Option{ + nmt.NamespaceIDSize(sh.NamespaceSize), + nmt.IgnoreMaxNamespace(true), + nmt.ReuseBuffer(true), + }, + } + + // Pre-populate the pool with buffered NMT instances + for i := 0; i < poolSize; i++ { + pool.trees <- newBufferedNMT(maxSubtreeSize, pool) + } + + return pool +} + +// acquire gets a buffered NMT from the pool, blocking if none available. +func (p *nmtPool) acquire() *bufferedNMT { + return <-p.trees +} + +// release returns a buffered NMT to the pool for reuse. +func (p *nmtPool) release(tree *bufferedNMT) { + tree.reset() + p.trees <- tree +} + +// bufferedNMT wraps an NMT with a pre-allocated buffer for efficient operations. +type bufferedNMT struct { + tree *nmt.NamespacedMerkleTree + buffer []byte // Pre-allocated buffer for namespace+share data + pool *nmtPool // Reference to the pool for auto-release + leafSize int // Size of namespace + share + maxLeaves int // Maximum number of leaves this buffer can handle +} + +// newBufferedNMT creates a new buffered NMT wrapper. +func newBufferedNMT(maxLeaves int, pool *nmtPool) *bufferedNMT { + leafSize := sh.NamespaceSize + sh.ShareSize + return &bufferedNMT{ + tree: nmt.New(sha256.New(), pool.opts...), + buffer: make([]byte, maxLeaves*leafSize), + pool: pool, + leafSize: leafSize, + maxLeaves: maxLeaves, + } +} + +// reset prepares the buffered NMT for reuse. +func (t *bufferedNMT) reset() { + t.tree.Reset() +} + +// computeRoot processes a set of leaves with a given namespace and returns the root. +// It automatically releases itself back to the pool after computing the root. +func (t *bufferedNMT) computeRoot(namespace []byte, leaves [][]byte) ([]byte, error) { + defer t.pool.release(t) + + // Pre-fill namespace in buffer for all leaves + for i := 0; i < len(leaves); i++ { + offset := i * t.leafSize + copy(t.buffer[offset:offset+sh.NamespaceSize], namespace) + } + + // Copy leaf data and push to tree + for i, leaf := range leaves { + offset := i * t.leafSize + copy(t.buffer[offset+sh.NamespaceSize:offset+t.leafSize], leaf) + + // Create slice from buffer and push to NMT + nsLeaf := t.buffer[offset : offset+sh.NamespaceSize+len(leaf)] + if err := t.tree.Push(nsLeaf); err != nil { + return nil, err + } + } + + return t.tree.Root() +} From f7615639ff3f5ed7edf6e3814533d843e542bfb3 Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Thu, 25 Sep 2025 17:14:29 +0200 Subject: [PATCH 2/6] refactor: update nmt to v0.24.2 --- go.mod | 4 ++-- go.sum | 4 ++-- inclusion/commitment.go | 4 ++-- inclusion/nmt_pool.go | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index debdf33..b256e31 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/celestiaorg/go-square/v3 go 1.24.0 require ( - github.com/celestiaorg/nmt v0.24.2-0.20250918161004-dcaa76b80708 + github.com/celestiaorg/nmt v0.24.2 github.com/stretchr/testify v1.11.1 golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb + golang.org/x/sync v0.17.0 google.golang.org/protobuf v1.36.9 ) @@ -13,6 +14,5 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/sync v0.17.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index e70f76a..95c24c9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/celestiaorg/nmt v0.24.2-0.20250918161004-dcaa76b80708 h1:V+Ak590bIn9g3dqYaKDsno8lpvEmjtiFHXDTdguIFoE= -github.com/celestiaorg/nmt v0.24.2-0.20250918161004-dcaa76b80708/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA= +github.com/celestiaorg/nmt v0.24.2 h1:LlpJSPOd6/Lw1Ig6HUhZuqiINHLka/ZSRTBzlNJpchg= +github.com/celestiaorg/nmt v0.24.2/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= diff --git a/inclusion/commitment.go b/inclusion/commitment.go index 7f57484..1c267f0 100644 --- a/inclusion/commitment.go +++ b/inclusion/commitment.go @@ -113,7 +113,7 @@ func GenerateSubtreeRootsReusedNMT(blob *sh.Blob, subtreeRootThreshold int) ([][ tree := nmt.New(sha256.New(), nmt.NamespaceIDSize(sh.NamespaceSize), nmt.IgnoreMaxNamespace(true), - nmt.ReuseBuffer(true)) + nmt.ReuseBuffers(true)) // Pre-allocate a single large buffer for all shares with namespace prepended // This allows NMT to use the buffer directly without copying @@ -257,7 +257,7 @@ func GenerateSubtreeRootsParallelWithWorkers(blob *sh.Blob, subtreeRootThreshold tree := nmt.New(sha256.New(), nmt.NamespaceIDSize(sh.NamespaceSize), nmt.IgnoreMaxNamespace(true), - nmt.ReuseBuffer(true)) + nmt.ReuseBuffers(true)) // Process work items from the queue for leafSetIdx := range workChan { diff --git a/inclusion/nmt_pool.go b/inclusion/nmt_pool.go index 965a4ee..66f0a4e 100644 --- a/inclusion/nmt_pool.go +++ b/inclusion/nmt_pool.go @@ -22,7 +22,7 @@ func newNMTPool(poolSize int, maxSubtreeSize int) *nmtPool { opts: []nmt.Option{ nmt.NamespaceIDSize(sh.NamespaceSize), nmt.IgnoreMaxNamespace(true), - nmt.ReuseBuffer(true), + nmt.ReuseBuffers(true), }, } From 2a754d2dea274918f1c56d1fba5041ebbcfac974 Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Thu, 25 Sep 2025 17:25:42 +0200 Subject: [PATCH 3/6] refactor: rename to parallel commitments --- inclusion/commitment.go | 8 ++++---- inclusion/commitment_test.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/inclusion/commitment.go b/inclusion/commitment.go index 1c267f0..11f85ef 100644 --- a/inclusion/commitment.go +++ b/inclusion/commitment.go @@ -303,13 +303,13 @@ func GenerateSubtreeRootsParallelWithWorkers(blob *sh.Blob, subtreeRootThreshold return subTreeRoots, nil } -// CreateCommitments generates commitments for multiple blobs in parallel using a pool of NMT instances. +// CreateParallelCommitments generates commitments for multiple blobs in parallel using a pool of NMT instances. // This implementation: // 1. Splits blobs into shares in parallel using X goroutines // 2. Uses a pool of buffered NMT wrappers for efficient memory reuse // 3. Processes all subtree roots across all blobs concurrently // 4. Returns commitments in the same order as input blobs -func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int, numWorkers int) ([][]byte, error) { +func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int, numWorkers int) ([][]byte, error) { if len(blobs) == 0 { return [][]byte{}, nil } @@ -455,8 +455,8 @@ func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootT return commitments, nil } -// CreateCommitmentsSequential is the original sequential implementation for comparison. -func CreateCommitmentsSequential(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int) ([][]byte, error) { +// CreateCommitments is the original sequential implementation for comparison. +func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int) ([][]byte, error) { commitments := make([][]byte, len(blobs)) for i, blob := range blobs { commitment, err := CreateCommitment(blob, merkleRootFn, subtreeRootThreshold) diff --git a/inclusion/commitment_test.go b/inclusion/commitment_test.go index 6aaabc9..95d49d6 100644 --- a/inclusion/commitment_test.go +++ b/inclusion/commitment_test.go @@ -366,12 +366,12 @@ func TestCreateCommitmentsEquivalence(t *testing.T) { blobs := test.GenerateBlobs(tc.blobSizes...) // Sequential version - sequential, err := inclusion.CreateCommitmentsSequential(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold) + sequential, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold) require.NoError(t, err) // Parallel version with different worker counts for _, workers := range []int{1, 2, 4, 8} { - parallel, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) + parallel, err := inclusion.CreateParallelCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) require.NoError(t, err) assert.Equal(t, sequential, parallel, @@ -383,12 +383,12 @@ func TestCreateCommitmentsEquivalence(t *testing.T) { func TestCreateCommitmentsEmpty(t *testing.T) { // Test with empty blob slice - result, err := inclusion.CreateCommitments([]*share.Blob{}, simpleMerkleRoot, defaultSubtreeRootThreshold, 4) + result, err := inclusion.CreateParallelCommitments([]*share.Blob{}, simpleMerkleRoot, defaultSubtreeRootThreshold, 4) require.NoError(t, err) assert.Empty(t, result) } -// BenchmarkCommitmentsComparison directly compares CreateCommitment vs CreateCommitments +// BenchmarkCommitmentsComparison directly compares CreateCommitment vs CreateParallelCommitments func BenchmarkCommitmentsComparison(b *testing.B) { // Test scenarios with different blob configurations scenarios := []struct { @@ -429,11 +429,11 @@ func BenchmarkCommitmentsComparison(b *testing.B) { //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") }) - // Parallel: CreateCommitments with 8 workers + // Parallel: CreateParallelCommitments with 8 workers b.Run(fmt.Sprintf("%s_Parallel8", scenario.description), func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, 8) + _, err := inclusion.CreateParallelCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, 8) if err != nil { b.Fatal(err) } @@ -447,7 +447,7 @@ func BenchmarkCommitmentsComparison(b *testing.B) { b.Run(fmt.Sprintf("%s_Parallel%d", scenario.description, workers), func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) + _, err := inclusion.CreateParallelCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) if err != nil { b.Fatal(err) } From 8b9f771921c003864987d128b5b83ac801a5763a Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Fri, 26 Sep 2025 20:23:09 +0200 Subject: [PATCH 4/6] refactor: simplify code, add comments, improve tests --- go.mod | 2 +- go.sum | 4 + inclusion/commitment.go | 287 +++---------------------- inclusion/commitment_test.go | 391 +++++++++-------------------------- inclusion/nmt_pool.go | 67 +++--- 5 files changed, 169 insertions(+), 582 deletions(-) diff --git a/go.mod b/go.mod index b256e31..c399cfd 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.0 require ( github.com/celestiaorg/nmt v0.24.2 github.com/stretchr/testify v1.11.1 - golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb + golang.org/x/exp v0.0.0-20250911091902-df9299821621 golang.org/x/sync v0.17.0 google.golang.org/protobuf v1.36.9 ) diff --git a/go.sum b/go.sum index 95c24c9..42dfc17 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -27,6 +28,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8= golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20250911091902-df9299821621 h1:2id6c1/gto0kaHYyrixvknJ8tUK/Qs5IsmBtrc+FtgU= +golang.org/x/exp v0.0.0-20250911091902-df9299821621/go.mod h1:TwQYMMnGpvZyc+JpB/UAuTNIsVJifOlSkrZkhcvpVUk= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -35,6 +38,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= diff --git a/inclusion/commitment.go b/inclusion/commitment.go index 11f85ef..271d669 100644 --- a/inclusion/commitment.go +++ b/inclusion/commitment.go @@ -2,7 +2,8 @@ package inclusion import ( "crypto/sha256" - "sync" + "errors" + "fmt" sh "github.com/celestiaorg/go-square/v3/share" "github.com/celestiaorg/nmt" @@ -83,246 +84,24 @@ func GenerateSubtreeRoots(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, er return subTreeRoots, nil } -// GenerateSubtreeRootsReusedNMT generates the subtree roots of a blob using a reusable NMT. -// This is an optimized version that reuses the same NMT instance across multiple subtrees -// to reduce memory allocations. -func GenerateSubtreeRootsReusedNMT(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, error) { - shares, err := splitBlobs(blob) - if err != nil { - return nil, err - } - - // the commitment is the root of a merkle mountain range with max tree size - // determined by the number of roots required to create a share commitment - // over that blob. The size of the tree is only increased if the number of - // subtree roots surpasses a constant threshold. - subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) - treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth)) - if err != nil { - return nil, err - } - leafSets := make([][][]byte, len(treeSizes)) - cursor := uint64(0) - for i, treeSize := range treeSizes { - leafSets[i] = sh.ToBytes(shares[cursor : cursor+treeSize]) - cursor += treeSize - } - - namespace := blob.Namespace() - // Create a single NMT instance with ReuseBuffer option - tree := nmt.New(sha256.New(), - nmt.NamespaceIDSize(sh.NamespaceSize), - nmt.IgnoreMaxNamespace(true), - nmt.ReuseBuffers(true)) - - // Pre-allocate a single large buffer for all shares with namespace prepended - // This allows NMT to use the buffer directly without copying - leafSize := sh.NamespaceSize + sh.ShareSize - nsLeafBuf := make([]byte, len(shares)*leafSize) - - // Pre-fill all namespace prefixes in the buffer - for i := 0; i < len(shares); i++ { - copy(nsLeafBuf[i*leafSize:i*leafSize+sh.NamespaceSize], namespace.Bytes()) - } - - // create the commitments by pushing each leaf set onto the reused NMT - subTreeRoots := make([][]byte, len(leafSets)) - shareIdx := 0 - for i, set := range leafSets { - // Reset the tree for reuse - tree.Reset() - - for _, leaf := range set { - // Calculate offset in the large buffer - offset := shareIdx * leafSize - // Copy share data after the namespace - copy(nsLeafBuf[offset+sh.NamespaceSize:offset+leafSize], leaf) - // Push slice from the large buffer - NMT will use it directly - nsLeaf := nsLeafBuf[offset : offset+sh.NamespaceSize+len(leaf)] - - err = tree.Push(nsLeaf) - if err != nil { - return nil, err - } - shareIdx++ - } - // add the root - root, err := tree.Root() - if err != nil { - return nil, err - } - // Make a copy of the root since the tree buffer will be reused - subTreeRoots[i] = append([]byte(nil), root...) - } - return subTreeRoots, nil -} - -// GenerateSubtreeRootsParallel generates the subtree roots of a blob using parallel processing. -// This version uses goroutines to process multiple leaf sets concurrently with reusable NMTs. -func GenerateSubtreeRootsParallel(blob *sh.Blob, subtreeRootThreshold int) ([][]byte, error) { - return GenerateSubtreeRootsParallelWithWorkers(blob, subtreeRootThreshold, 16) -} - -// GenerateSubtreeRootsParallelWithWorkers generates the subtree roots of a blob using parallel processing -// with a configurable number of worker goroutines. -// -// Work Distribution Strategy: -// The function splits the work into "leaf sets" where each leaf set represents a subtree in the -// Merkle Mountain Range. The work is distributed as follows: -// -// 1. Each leaf set is an independent unit of work that produces one subtree root -// 2. Leaf sets are processed via a work queue (channel) that workers pull from -// 3. Each worker goroutine: -// - Has its own reusable NMT instance to avoid lock contention -// - Pulls leaf sets from the work queue until it's empty -// - Processes each leaf set by pushing all its leaves into the NMT and computing the root -// -// For example, with 128 shares and subtreeRootThreshold=64: -// - This creates 2 leaf sets of 64 shares each -// - With 2 workers, each worker would process 1 leaf set -// - With 4 workers, 2 workers would each process 1 leaf set, and 2 would be idle -// -// The work distribution is dynamic (work-stealing pattern) rather than static partitioning, -// which provides better load balancing when leaf sets have different sizes. -func GenerateSubtreeRootsParallelWithWorkers(blob *sh.Blob, subtreeRootThreshold int, numWorkers int) ([][]byte, error) { - shares, err := splitBlobs(blob) - if err != nil { - return nil, err - } - - // Calculate Merkle Mountain Range structure - subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) - treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth)) - if err != nil { - return nil, err - } - - // Create leaf sets - each will become one subtree root - leafSets := make([][][]byte, len(treeSizes)) - cursor := uint64(0) - for i, treeSize := range treeSizes { - leafSets[i] = sh.ToBytes(shares[cursor : cursor+treeSize]) - cursor += treeSize - } - - namespace := blob.Namespace() - - // Pre-allocate a single large buffer for all shares with namespace prepended - // This avoids allocations during NMT operations - leafSize := sh.NamespaceSize + sh.ShareSize - nsLeafBuf := make([]byte, len(shares)*leafSize) - - // Pre-fill all namespace prefixes and share data in the buffer - for i := 0; i < len(shares); i++ { - copy(nsLeafBuf[i*leafSize:i*leafSize+sh.NamespaceSize], namespace.Bytes()) - } - - shareIdx := 0 - for _, set := range leafSets { - for _, leaf := range set { - offset := shareIdx * leafSize - copy(nsLeafBuf[offset+sh.NamespaceSize:offset+leafSize], leaf) - shareIdx++ - } - } - - // Result slice to hold computed roots - subTreeRoots := make([][]byte, len(leafSets)) - - // Adjust number of workers based on available work - if numWorkers < 1 { - numWorkers = 1 - } - if len(leafSets) < numWorkers { - numWorkers = len(leafSets) - } - - // Use errgroup for clean error handling and goroutine management - g := new(errgroup.Group) - workChan := make(chan int, len(leafSets)) - - // Queue all work items (leaf set indices) - for i := range leafSets { - workChan <- i - } - close(workChan) - - // Mutex to protect subTreeRoots writes (though each index is written only once) - var mu sync.Mutex - - // Start worker goroutines - for w := 0; w < numWorkers; w++ { - g.Go(func() error { - // Each worker has its own reusable NMT to avoid contention - tree := nmt.New(sha256.New(), - nmt.NamespaceIDSize(sh.NamespaceSize), - nmt.IgnoreMaxNamespace(true), - nmt.ReuseBuffers(true)) - - // Process work items from the queue - for leafSetIdx := range workChan { - tree.Reset() - set := leafSets[leafSetIdx] - - // Calculate starting position in the pre-filled buffer - startIdx := 0 - for j := 0; j < leafSetIdx; j++ { - startIdx += len(leafSets[j]) - } - - // Push all leaves for this leaf set into the NMT - for j, leaf := range set { - offset := (startIdx + j) * leafSize - nsLeaf := nsLeafBuf[offset : offset+sh.NamespaceSize+len(leaf)] - - if err := tree.Push(nsLeaf); err != nil { - return err - } - } - - // Compute the root for this subtree - root, err := tree.Root() - if err != nil { - return err - } - - // Store the root (making a copy since tree buffer will be reused) - mu.Lock() - subTreeRoots[leafSetIdx] = append([]byte(nil), root...) - mu.Unlock() - } - return nil - }) - } - - // Wait for all workers to complete - if err := g.Wait(); err != nil { - return nil, err - } - - return subTreeRoots, nil -} - // CreateParallelCommitments generates commitments for multiple blobs in parallel using a pool of NMT instances. -// This implementation: -// 1. Splits blobs into shares in parallel using X goroutines -// 2. Uses a pool of buffered NMT wrappers for efficient memory reuse -// 3. Processes all subtree roots across all blobs concurrently -// 4. Returns commitments in the same order as input blobs +// See docs for CreateCommitment for more details. func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int, numWorkers int) ([][]byte, error) { if len(blobs) == 0 { return [][]byte{}, nil } + if numWorkers <= 0 { + return nil, errors.New("number of workers must be positive") + } - // Step 1: Split all blobs into shares in parallel + // split all blobs into shares in parallel type blobShares struct { shares []sh.Share err error } - blobSharesResults := make([]blobShares, len(blobs)) g := new(errgroup.Group) - g.SetLimit(numWorkers) // Limit concurrent blob splitting + g.SetLimit(numWorkers) for i := range blobs { idx := i @@ -332,21 +111,20 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt return err }) } - if err := g.Wait(); err != nil { - return nil, err + return nil, fmt.Errorf("failed to split blob shares: %w", err) } - // Step 2: Calculate the maximum subtree size across all blobs maxSubtreeSize := 0 type blobInfo struct { shares []sh.Share namespace sh.Namespace - treeSizes []uint64 leafSets [][][]byte } blobInfos := make([]blobInfo, len(blobs)) + // calculate the maximum subtree size across all blobs and prepare + // subtree for parallel calculation using pooled nmts for i, blob := range blobs { shares := blobSharesResults[i].shares subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) @@ -354,70 +132,56 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt if err != nil { return nil, err } - - // Track maximum subtree size for buffer allocation for _, size := range treeSizes { if int(size) > maxSubtreeSize { maxSubtreeSize = int(size) } } - // Prepare leaf sets for this blob + // prepare leaf sets for this blob leafSets := make([][][]byte, len(treeSizes)) cursor := uint64(0) for j, treeSize := range treeSizes { leafSets[j] = sh.ToBytes(shares[cursor : cursor+treeSize]) cursor += treeSize } - blobInfos[i] = blobInfo{ shares: shares, namespace: blob.Namespace(), - treeSizes: treeSizes, leafSets: leafSets, } } - // Step 3: Create NMT pool with appropriate buffer size - poolSize := numWorkers * 2 // Allow some buffer for concurrent operations - pool := newNMTPool(poolSize, maxSubtreeSize) + pool, err := newNMTPool(numWorkers, maxSubtreeSize) + if err != nil { + return nil, err + } - // Step 4: Process all subtree roots in parallel + // process all subtree roots in parallel type subtreeResult struct { blobIdx int treeIdx int root []byte err error } - - // Calculate total number of subtrees totalSubtrees := 0 for _, info := range blobInfos { totalSubtrees += len(info.leafSets) } - resultChan := make(chan subtreeResult, totalSubtrees) g = new(errgroup.Group) g.SetLimit(numWorkers) - // Queue all subtree computations + // queue all subtree computations + // since go 1.22 there is no need to copy the variables used in loop for blobIdx, info := range blobInfos { - bIdx := blobIdx - bInfo := info for treeIdx, leafSet := range info.leafSets { - tIdx := treeIdx - leaves := leafSet - g.Go(func() error { - // Acquire a buffered NMT from the pool tree := pool.acquire() - - // Compute the root for this subtree - root, err := tree.computeRoot(bInfo.namespace.Bytes(), leaves) - + root, err := tree.computeRoot(info.namespace.Bytes(), leafSet) resultChan <- subtreeResult{ - blobIdx: bIdx, - treeIdx: tIdx, + blobIdx: blobIdx, + treeIdx: treeIdx, root: root, err: err, } @@ -426,14 +190,13 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt } } - // Wait for all computations to complete if err := g.Wait(); err != nil { close(resultChan) return nil, err } close(resultChan) - // Step 5: Collect results and organize by blob + // collect results and organize by blob subtreeRootsByBlob := make([][][]byte, len(blobs)) for i, info := range blobInfos { subtreeRootsByBlob[i] = make([][]byte, len(info.leafSets)) @@ -446,7 +209,7 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt subtreeRootsByBlob[result.blobIdx][result.treeIdx] = result.root } - // Step 6: Compute final commitments using the merkle root function + // compute final commitments using the merkle root function commitments := make([][]byte, len(blobs)) for i, subtreeRoots := range subtreeRootsByBlob { commitments[i] = merkleRootFn(subtreeRoots) @@ -455,7 +218,7 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt return commitments, nil } -// CreateCommitments is the original sequential implementation for comparison. +// CreateCommitments generates commitments sequentially for given blobs. func CreateCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subtreeRootThreshold int) ([][]byte, error) { commitments := make([][]byte, len(blobs)) for i, blob := range blobs { diff --git a/inclusion/commitment_test.go b/inclusion/commitment_test.go index 95d49d6..c3e7dd5 100644 --- a/inclusion/commitment_test.go +++ b/inclusion/commitment_test.go @@ -4,8 +4,10 @@ import ( "bytes" "crypto/sha256" "fmt" + "math/rand" "runtime" "testing" + "time" "github.com/celestiaorg/go-square/v3/inclusion" "github.com/celestiaorg/go-square/v3/internal/test" @@ -63,32 +65,6 @@ func TestMerkleMountainRangeSizes(t *testing.T) { // TestCreateCommitment will fail if a change is made to share encoding or how // the commitment is calculated. If this is the case, the expected commitment // bytes will need to be updated. -func TestGenerateSubtreeRootsEquivalence(t *testing.T) { - blobSizes := []int{ - share.AvailableBytesFromSparseShares(2), - share.AvailableBytesFromSparseShares(16), - share.AvailableBytesFromSparseShares(64), - } - - for _, size := range blobSizes { - blobs := test.GenerateBlobs(size) - require.Len(t, blobs, 1) - blob := blobs[0] - - original, err := inclusion.GenerateSubtreeRoots(blob, defaultSubtreeRootThreshold) - require.NoError(t, err) - - optimized, err := inclusion.GenerateSubtreeRootsReusedNMT(blob, defaultSubtreeRootThreshold) - require.NoError(t, err) - - parallel, err := inclusion.GenerateSubtreeRootsParallel(blob, defaultSubtreeRootThreshold) - require.NoError(t, err) - - assert.Equal(t, original, optimized, "Optimized results should be identical for blob size %d", size) - assert.Equal(t, original, parallel, "Parallel results should be identical for blob size %d", size) - } -} - func TestCreateCommitment(t *testing.T) { ns1 := share.MustNewV0Namespace(bytes.Repeat([]byte{0x1}, share.NamespaceVersionZeroIDSize)) @@ -143,254 +119,116 @@ func twoLeafMerkleRoot(data [][]byte) []byte { return sum[:] } -func simpleMerkleRoot(data [][]byte) []byte { - // Return a dummy 32-byte value to avoid affecting benchmark - return make([]byte, 32) -} - -func BenchmarkGenerateSubtreeRoots(b *testing.B) { - benchmarks := []struct { - name string - blobSize int - }{ - { - name: "2 shares", - blobSize: share.AvailableBytesFromSparseShares(2), - }, - { - name: "16 shares", - blobSize: share.AvailableBytesFromSparseShares(16), - }, - { - name: "64 shares", - blobSize: share.AvailableBytesFromSparseShares(64), - }, - { - name: "128 shares", - blobSize: share.AvailableBytesFromSparseShares(128), - }, - { - name: "256 shares", - blobSize: share.AvailableBytesFromSparseShares(256), - }, - { - name: "16384 shares", - blobSize: share.AvailableBytesFromSparseShares(16384), - }, +func hashConcatenatedData(data [][]byte) []byte { + var total []byte + for _, d := range data { + total = append(total, d...) } + finalHash := sha256.Sum256(total) + return finalHash[:] +} - for _, bm := range benchmarks { - blobs := test.GenerateBlobs(bm.blobSize) - if len(blobs) != 1 { - b.Fatal("expected exactly one blob") - } - blob := blobs[0] - - b.Run(bm.name+"/original", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.GenerateSubtreeRoots(blob, defaultSubtreeRootThreshold) - if err != nil { - b.Fatal(err) - } - } - }) - - b.Run(bm.name+"/reused_nmt", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.GenerateSubtreeRootsReusedNMT(blob, defaultSubtreeRootThreshold) - if err != nil { - b.Fatal(err) - } - } - }) - - b.Run(bm.name+"/parallel", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.GenerateSubtreeRootsParallel(blob, defaultSubtreeRootThreshold) - if err != nil { - b.Fatal(err) +// TestCreateParallelCommitments compares results of +// parallel and non-parallel versions of the algorithm with different configurations. +func TestCreateParallelCommitments(t *testing.T) { + t.Run("empty blob", func(t *testing.T) { + result, err := inclusion.CreateParallelCommitments([]*share.Blob{}, hashConcatenatedData, defaultSubtreeRootThreshold, 4) + require.NoError(t, err) + assert.Empty(t, result) + }) + + t.Run("equivalence with sequential commitments (random)", func(t *testing.T) { + var ( + workers = runtime.NumCPU() + rng = rand.New(rand.NewSource(time.Now().UnixNano())) + minBlobSize = 512 + maxBlobSize = 1024 * 1024 + maxBlobNum = 10 + minBlobNum = 2 + testCount = 100 + ) + + for i := 0; i < testCount; i++ { + numBlobs := rng.Intn(maxBlobNum-minBlobNum) + minBlobNum + blobSizes := make([]int, numBlobs) + maxSize := 0 + for j := range blobSizes { + blobSizes[j] = rng.Intn(maxBlobSize-minBlobSize) + minBlobSize + if blobSizes[j] > maxSize { + maxSize = blobSizes[j] } } - }) - } -} + t.Run(fmt.Sprintf("test_%d_blobs_%d_max_size_%d", numBlobs, i, maxSize), func(t *testing.T) { + blobs := test.GenerateBlobs(blobSizes...) -func BenchmarkGenerateSubtreeRootsParallelWorkers(b *testing.B) { - blobSizes := []struct { - name string - blobSize int - }{ - { - name: "64 shares", - blobSize: share.AvailableBytesFromSparseShares(64), - }, - { - name: "128 shares", - blobSize: share.AvailableBytesFromSparseShares(128), - }, - { - name: "256 shares", - blobSize: share.AvailableBytesFromSparseShares(256), - }, - { - name: "512 shares", - blobSize: share.AvailableBytesFromSparseShares(512), - }, - } - - workerCounts := []int{1, 2, 4, 8, 16} + sequential, err := inclusion.CreateCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold) + require.NoError(t, err) - for _, bs := range blobSizes { - blobs := test.GenerateBlobs(bs.blobSize) - if len(blobs) != 1 { - b.Fatal("expected exactly one blob") - } - blob := blobs[0] + parallel, err := inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, workers) + require.NoError(t, err) - for _, workers := range workerCounts { - b.Run(fmt.Sprintf("%s/%d_workers", bs.name, workers), func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.GenerateSubtreeRootsParallelWithWorkers(blob, defaultSubtreeRootThreshold, workers) - if err != nil { - b.Fatal(err) - } - } + assert.Equal(t, sequential, parallel, + "Parallel results with %d workers should match sequential for %d blobs", + workers, numBlobs) }) } + }) - // Also benchmark the sequential reused version for comparison - b.Run(bs.name+"/sequential_reused", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.GenerateSubtreeRootsReusedNMT(blob, defaultSubtreeRootThreshold) - if err != nil { - b.Fatal(err) - } - } - }) - } -} - -func BenchmarkCreateCommitment(b *testing.B) { - benchmarks := []struct { - name string - blobSize int - }{ - { - name: "2 shares", - blobSize: share.AvailableBytesFromSparseShares(2), - }, - { - name: "4 shares", - blobSize: share.AvailableBytesFromSparseShares(4), - }, - { - name: "8 shares", - blobSize: share.AvailableBytesFromSparseShares(8), - }, - { - name: "16 shares", - blobSize: share.AvailableBytesFromSparseShares(16), - }, - { - name: "32 shares", - blobSize: share.AvailableBytesFromSparseShares(32), - }, - { - name: "64 shares", - blobSize: share.AvailableBytesFromSparseShares(64), - }, - { - name: "128 shares", - blobSize: share.AvailableBytesFromSparseShares(128), - }, - { - name: "256 shares", - blobSize: share.AvailableBytesFromSparseShares(256), - }, - { - name: "512 shares", - blobSize: share.AvailableBytesFromSparseShares(512), - }, - } - - for _, bm := range benchmarks { - b.Run(bm.name, func(b *testing.B) { - blobs := test.GenerateBlobs(bm.blobSize) - if len(blobs) != 1 { - b.Fatal("expected exactly one blob") + t.Run("equivalence with sequential commitments (fixed)", func(t *testing.T) { + genBlobSizes := func(size, count int) []int { + blobSizes := make([]int, count) + for i := range blobSizes { + blobSizes[i] = size } - blob := blobs[0] - - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.CreateCommitment(blob, simpleMerkleRoot, defaultSubtreeRootThreshold) - if err != nil { - b.Fatal(err) - } - } - }) - } -} - -func TestCreateCommitmentsEquivalence(t *testing.T) { - // Test with various combinations of blob counts and sizes - testCases := []struct { - name string - blobSizes []int - }{ - { - name: "single small blob", - blobSizes: []int{1024}, // ~2 shares - }, - { - name: "multiple small blobs", - blobSizes: []int{1024, 2048, 1536}, - }, - { - name: "mixed sizes", - blobSizes: []int{512, 8192, 32768, 1024}, - }, - { - name: "large blobs", - blobSizes: []int{65536, 131072}, // 128 and 256 shares - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - blobs := test.GenerateBlobs(tc.blobSizes...) + return blobSizes + } + testCases := []struct { + name string + blobSizes []int + }{ + { + name: "16*512bytes", + blobSizes: genBlobSizes(512, 16), + }, + { + name: "16x8MB", + blobSizes: genBlobSizes(8*1024*1024, 16), + }, + { + name: "mixed_sizes", + blobSizes: []int{512, 8192, 32768, 131072}, + }, + } + workers := runtime.NumCPU() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + blobs := test.GenerateBlobs(tc.blobSizes...) - // Sequential version - sequential, err := inclusion.CreateCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold) - require.NoError(t, err) + sequential, err := inclusion.CreateCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold) + require.NoError(t, err) - // Parallel version with different worker counts - for _, workers := range []int{1, 2, 4, 8} { - parallel, err := inclusion.CreateParallelCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) + parallel, err := inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, workers) require.NoError(t, err) assert.Equal(t, sequential, parallel, - "Parallel results with %d workers should match sequential for %s", workers, tc.name) - } - }) - } -} + "Parallel results should match sequential for %s", tc.name) + }) + } + }) + + t.Run("invalid worker count", func(t *testing.T) { + blobs := test.GenerateBlobs(1024) -func TestCreateCommitmentsEmpty(t *testing.T) { - // Test with empty blob slice - result, err := inclusion.CreateParallelCommitments([]*share.Blob{}, simpleMerkleRoot, defaultSubtreeRootThreshold, 4) - require.NoError(t, err) - assert.Empty(t, result) + _, err := inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, 0) + require.Error(t, err) + + _, err = inclusion.CreateParallelCommitments(blobs, hashConcatenatedData, defaultSubtreeRootThreshold, -1) + require.Error(t, err) + }) } -// BenchmarkCommitmentsComparison directly compares CreateCommitment vs CreateParallelCommitments +// BenchmarkCommitmentsComparison directly compares CreateCommitment vs CreateParallelCommitments. func BenchmarkCommitmentsComparison(b *testing.B) { - // Test scenarios with different blob configurations scenarios := []struct { numBlobs int bytesPerBlob int @@ -404,57 +242,32 @@ func BenchmarkCommitmentsComparison(b *testing.B) { {64, 65536, "64x64KB"}, // 64 blobs of 64KB each {16, 8388608, "16x8MB"}, // 16 blobs of 8MB each (128MB total) } - + emptyHash := func([][]byte) []byte { + return nil + } for _, scenario := range scenarios { - // Generate blobs for this scenario blobSizes := make([]int, scenario.numBlobs) for i := range blobSizes { blobSizes[i] = scenario.bytesPerBlob } blobs := test.GenerateBlobs(blobSizes...) - - //totalMB := float64(scenario.numBlobs * scenario.bytesPerBlob) / (1024 * 1024) - - // Sequential: CreateCommitment for each blob b.Run(fmt.Sprintf("%s_Sequential", scenario.description), func(b *testing.B) { b.ResetTimer() + b.ReportAllocs() for i := 0; i < b.N; i++ { for _, blob := range blobs { - _, err := inclusion.CreateCommitment(blob, simpleMerkleRoot, defaultSubtreeRootThreshold) - if err != nil { - b.Fatal(err) - } + _, err := inclusion.CreateCommitment(blob, emptyHash, defaultSubtreeRootThreshold) + require.NoError(b, err) } } - //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") }) - - // Parallel: CreateParallelCommitments with 8 workers - b.Run(fmt.Sprintf("%s_Parallel8", scenario.description), func(b *testing.B) { + b.Run(fmt.Sprintf("%s_Parallel", scenario.description), func(b *testing.B) { b.ResetTimer() + b.ReportAllocs() for i := 0; i < b.N; i++ { - _, err := inclusion.CreateParallelCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, 8) - if err != nil { - b.Fatal(err) - } + _, err := inclusion.CreateParallelCommitments(blobs, emptyHash, defaultSubtreeRootThreshold, runtime.NumCPU()) + require.NoError(b, err) } - //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") }) - - // For the large 16x8MB scenario, also test with different worker counts - if scenario.description == "16x8MB" { - for _, workers := range []int{4, 16, 32, 4 * runtime.NumCPU()} { - b.Run(fmt.Sprintf("%s_Parallel%d", scenario.description, workers), func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := inclusion.CreateParallelCommitments(blobs, simpleMerkleRoot, defaultSubtreeRootThreshold, workers) - if err != nil { - b.Fatal(err) - } - } - //b.ReportMetric(totalMB*1000/b.Elapsed().Seconds(), "MB/s") - }) - } - } } } diff --git a/inclusion/nmt_pool.go b/inclusion/nmt_pool.go index 66f0a4e..b4b9bf6 100644 --- a/inclusion/nmt_pool.go +++ b/inclusion/nmt_pool.go @@ -2,22 +2,29 @@ package inclusion import ( "crypto/sha256" + "errors" sh "github.com/celestiaorg/go-square/v3/share" "github.com/celestiaorg/nmt" ) -// nmtPool provides a fixed-size pool of bufferedNMT instances for efficient reuse. +// nmtPool provides a fixed-size pool of bufferTree instances for efficient reuse. type nmtPool struct { - trees chan *bufferedNMT + trees chan *bufferTree poolSize int opts []nmt.Option } // newNMTPool creates a new pool of buffered NMT instances. -func newNMTPool(poolSize int, maxSubtreeSize int) *nmtPool { +func newNMTPool(poolSize int, maxLeaves int) (*nmtPool, error) { + if poolSize <= 0 { + return nil, errors.New("pool size must be positive") + } + if maxLeaves <= 0 { + return nil, errors.New("max leaves must be positive") + } pool := &nmtPool{ - trees: make(chan *bufferedNMT, poolSize), + trees: make(chan *bufferTree, poolSize), poolSize: poolSize, opts: []nmt.Option{ nmt.NamespaceIDSize(sh.NamespaceSize), @@ -26,73 +33,73 @@ func newNMTPool(poolSize int, maxSubtreeSize int) *nmtPool { }, } - // Pre-populate the pool with buffered NMT instances + // pre-populate the pool with buffered NMT instances for i := 0; i < poolSize; i++ { - pool.trees <- newBufferedNMT(maxSubtreeSize, pool) + pool.trees <- newBufferTree(maxLeaves, pool) } - return pool + return pool, nil } // acquire gets a buffered NMT from the pool, blocking if none available. -func (p *nmtPool) acquire() *bufferedNMT { +func (p *nmtPool) acquire() *bufferTree { return <-p.trees } // release returns a buffered NMT to the pool for reuse. -func (p *nmtPool) release(tree *bufferedNMT) { +func (p *nmtPool) release(tree *bufferTree) { tree.reset() p.trees <- tree } -// bufferedNMT wraps an NMT with a pre-allocated buffer for efficient operations. -type bufferedNMT struct { - tree *nmt.NamespacedMerkleTree - buffer []byte // Pre-allocated buffer for namespace+share data - pool *nmtPool // Reference to the pool for auto-release - leafSize int // Size of namespace + share - maxLeaves int // Maximum number of leaves this buffer can handle +// bufferTree wraps an NMT with a pre-allocated buffer for efficient operations. +type bufferTree struct { + // tree is an instance of NMT for root calculation + tree *nmt.NamespacedMerkleTree + // buffer is a pre-allocated buffer for namespace and share data + buffer []byte + // pool reference to the pool for release + pool *nmtPool + // leafSize is a size of namespace + share + leafSize int } -// newBufferedNMT creates a new buffered NMT wrapper. -func newBufferedNMT(maxLeaves int, pool *nmtPool) *bufferedNMT { +// newBufferTree creates a new buffered NMT wrapper. +func newBufferTree(maxLeaves int, pool *nmtPool) *bufferTree { leafSize := sh.NamespaceSize + sh.ShareSize - return &bufferedNMT{ - tree: nmt.New(sha256.New(), pool.opts...), - buffer: make([]byte, maxLeaves*leafSize), - pool: pool, - leafSize: leafSize, - maxLeaves: maxLeaves, + return &bufferTree{ + tree: nmt.New(sha256.New(), pool.opts...), + buffer: make([]byte, maxLeaves*leafSize), + pool: pool, + leafSize: leafSize, } } // reset prepares the buffered NMT for reuse. -func (t *bufferedNMT) reset() { +func (t *bufferTree) reset() { t.tree.Reset() } // computeRoot processes a set of leaves with a given namespace and returns the root. // It automatically releases itself back to the pool after computing the root. -func (t *bufferedNMT) computeRoot(namespace []byte, leaves [][]byte) ([]byte, error) { +func (t *bufferTree) computeRoot(namespace []byte, leaves [][]byte) ([]byte, error) { defer t.pool.release(t) - // Pre-fill namespace in buffer for all leaves + // pre-fill namespace in buffer for all leaves for i := 0; i < len(leaves); i++ { offset := i * t.leafSize copy(t.buffer[offset:offset+sh.NamespaceSize], namespace) } - // Copy leaf data and push to tree + // copy leaf data and push to tree for i, leaf := range leaves { offset := i * t.leafSize copy(t.buffer[offset+sh.NamespaceSize:offset+t.leafSize], leaf) - // Create slice from buffer and push to NMT nsLeaf := t.buffer[offset : offset+sh.NamespaceSize+len(leaf)] if err := t.tree.Push(nsLeaf); err != nil { return nil, err } } - return t.tree.Root() } From c59ef05c084a0a323612ba9d1d3c1d9e40a5fe5b Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Sat, 27 Sep 2025 10:38:39 +0200 Subject: [PATCH 5/6] refactor: remove unnecessary err in resultchan --- inclusion/commitment.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/inclusion/commitment.go b/inclusion/commitment.go index 271d669..4487edb 100644 --- a/inclusion/commitment.go +++ b/inclusion/commitment.go @@ -162,7 +162,6 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt blobIdx int treeIdx int root []byte - err error } totalSubtrees := 0 for _, info := range blobInfos { @@ -183,7 +182,6 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt blobIdx: blobIdx, treeIdx: treeIdx, root: root, - err: err, } return err }) @@ -203,9 +201,6 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt } for result := range resultChan { - if result.err != nil { - return nil, result.err - } subtreeRootsByBlob[result.blobIdx][result.treeIdx] = result.root } From 266c680da9a6b058cbbbfcdb2e54a03104664d6c Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Wed, 1 Oct 2025 18:01:38 +0200 Subject: [PATCH 6/6] refactor: remove unnecessary struct --- inclusion/commitment.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/inclusion/commitment.go b/inclusion/commitment.go index 4487edb..d83fa30 100644 --- a/inclusion/commitment.go +++ b/inclusion/commitment.go @@ -95,11 +95,7 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt } // split all blobs into shares in parallel - type blobShares struct { - shares []sh.Share - err error - } - blobSharesResults := make([]blobShares, len(blobs)) + blobSharesResults := make([][]sh.Share, len(blobs)) g := new(errgroup.Group) g.SetLimit(numWorkers) @@ -107,8 +103,11 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt idx := i g.Go(func() error { shares, err := splitBlobs(blobs[idx]) - blobSharesResults[idx] = blobShares{shares: shares, err: err} - return err + if err != nil { + return err + } + blobSharesResults[idx] = shares + return nil }) } if err := g.Wait(); err != nil { @@ -126,7 +125,7 @@ func CreateParallelCommitments(blobs []*sh.Blob, merkleRootFn MerkleRootFn, subt // calculate the maximum subtree size across all blobs and prepare // subtree for parallel calculation using pooled nmts for i, blob := range blobs { - shares := blobSharesResults[i].shares + shares := blobSharesResults[i] subTreeWidth := SubTreeWidth(len(shares), subtreeRootThreshold) treeSizes, err := MerkleMountainRangeSizes(uint64(len(shares)), uint64(subTreeWidth)) if err != nil {