diff --git a/batch.go b/batch.go index 5b721d7..e34997c 100644 --- a/batch.go +++ b/batch.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -51,86 +52,102 @@ func (b *batch) commitKeys(ctx context.Context, keys []datastore.Key) error { defer stop() log.Debugf("committing batch", "Batch", keys) - errs := make(chan error) + errs := make(chan error, len(keys)) chunks := chunk(len(keys), 25) + + var wg sync.WaitGroup + for _, chunk := range chunks { - var writeReqs []*dynamodb.WriteRequest - for _, keyIdx := range chunk { - k := keys[keyIdx] - v := b.reqs[k] - if v != nil { - // put - itemMap, err := b.ds.makePutItem(k, v, 0) - if err != nil { - return err - } - writeReqs = append(writeReqs, &dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{Item: itemMap}, - }) - } else { - // delete - itemMap, err := b.ds.makeDeleteItemMap(k) - if err != nil { - return err + if err := b.ds.batchSem.Acquire(ctx, 1); err != nil { + return fmt.Errorf("failed to acquire semaphore: %w", err) + } + + wg.Add(1) + go func(chunk []int) { + defer wg.Done() + defer b.ds.batchSem.Release(1) + + var writeReqs []*dynamodb.WriteRequest + for _, keyIdx := range chunk { + k := keys[keyIdx] + v := b.reqs[k] + if v != nil { + // put + itemMap, err := b.ds.makePutItem(k, v, 0) + if err != nil { + errs <- err + return + } + writeReqs = append(writeReqs, &dynamodb.WriteRequest{ + PutRequest: &dynamodb.PutRequest{Item: itemMap}, + }) + } else { + // delete + itemMap, err := b.ds.makeDeleteItemMap(k) + if err != nil { + errs <- err + return + } + writeReqs = append(writeReqs, &dynamodb.WriteRequest{ + DeleteRequest: &dynamodb.DeleteRequest{Key: itemMap}, + }) } - writeReqs = append(writeReqs, &dynamodb.WriteRequest{ - DeleteRequest: &dynamodb.DeleteRequest{Key: itemMap}, - }) } - - } - go b.commitChunk(ctx, errs, writeReqs) + err := b.commitChunk(ctx, writeReqs) + if err != nil { + errs <- err + } + }(chunk) } - for i := 0; i < len(chunks); i++ { - err := <-errs + go func() { + wg.Wait() + close(errs) + }() + + for err := range errs { if err != nil { return err } } return nil - } -func (b *batch) commitChunk(ctx context.Context, errs chan<- error, chunk []*dynamodb.WriteRequest) { +func (b *batch) commitChunk(ctx context.Context, chunk []*dynamodb.WriteRequest) error { attempts := 0 - - var err error - - defer func() { - select { - case errs <- err: - case <-ctx.Done(): - } - }() - - var res *dynamodb.BatchWriteItemOutput for attempts < maxBatchChunkAttempts { attempts++ batchReq := dynamodb.BatchWriteItemInput{ RequestItems: map[string][]*dynamodb.WriteRequest{b.ds.table: chunk}, } - res, err = b.ds.ddbClient.BatchWriteItemWithContext(ctx, &batchReq) + res, err := b.ds.ddbClient.BatchWriteItemWithContext(ctx, &batchReq) if err != nil { - return + return err } if len(res.UnprocessedItems[b.ds.table]) == 0 { - return + return nil } chunk = res.UnprocessedItems[b.ds.table] - // sleep using exponential backoff w/ jitter - jitter := (b.ds.rand.Float64() * 0.2) + 0.9 // jitter factor is in interval [0.9:1.1] - delayMS := math.Exp2(float64(attempts)) * 250 * jitter // delays are approx 500, 1000, 2000, 4000, ... + b.ds.randMu.Lock() + jitter := (b.ds.rand.Float64() * 0.2) + 0.9 + b.ds.randMu.Unlock() - delay := time.Duration(time.Duration(delayMS) * time.Millisecond) - time.Sleep(delay) + delayMS := math.Exp2(float64(attempts)) * 250 * jitter + delay := time.Duration(delayMS) * time.Millisecond + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + // Continue with retry + } } - err = fmt.Errorf("reached max attempts (%d) trying to commit batch to DynamoDB, last error: %w", maxBatchChunkAttempts, err) + return fmt.Errorf("reached max attempts (%d) trying to commit batch to DynamoDB", maxBatchChunkAttempts) } // chunk returns a list of chunks, each consisting of a list of array indexes. diff --git a/ddbds.go b/ddbds.go index 0c21a66..e4bf68b 100644 --- a/ddbds.go +++ b/ddbds.go @@ -7,6 +7,7 @@ import ( "math/rand" "strconv" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -16,6 +17,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" golog "github.com/ipfs/go-log/v2" + "golang.org/x/sync/semaphore" ) const ( @@ -96,6 +98,7 @@ func New(ddbClient *dynamodb.DynamoDB, table string, optFns ...func(o *Options)) disableQueries: opts.disableQueries, disableScans: opts.disableScans, rand: rand.New(rand.NewSource(time.Now().UnixNano())), + batchSem: semaphore.NewWeighted(25), } if ddbDS.scanParallelism == 0 { @@ -129,7 +132,10 @@ type DDBDatastore struct { disableQueries bool disableScans bool - rand *rand.Rand + randMu sync.Mutex + rand *rand.Rand + + batchSem *semaphore.Weighted } var _ ds.Datastore = (*DDBDatastore)(nil)