Skip to content

Commit

Permalink
feat: add batch delete util (#293)
Browse files Browse the repository at this point in the history
* feat: add batch delete util

* fix: a little bit of simplification

* fix: improve test

* fix: remove irrelevant ctx check

* fix: no longer need the select

* fix: fix test to clean up after itself

* fix: improve error handling

* fix: fix test

* fix: rename error map property

* fix: comment fix
  • Loading branch information
pgautier404 authored Apr 6, 2023
1 parent db53bc0 commit 327415a
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 3 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ precommit: lint test

.PHONY: test
test:
ginkgo momento/ auth/
ginkgo momento/ auth/ batchutils/

.PHONY: vendor
vendor:
go mod vendor

.PHONY: build-examples
build:
build-examples:
cd examples
go build ./...

125 changes: 125 additions & 0 deletions batchutils/batch_operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package batchutils

import (
"context"
"sync"

"github.com/momentohq/client-sdk-go/momento"
)

const maxConcurrentDeletes = 5

func keyDistributor(ctx context.Context, keys []momento.Key, keyChan chan momento.Key) {
for _, k := range keys {
keyChan <- k
}

for {
select {
case <-ctx.Done():
return
default:
keyChan <- nil
}
}
}

type errKeyVal struct {
key momento.Value
error error
}

func deleteWorker(
ctx context.Context,
client momento.CacheClient,
cacheName string,
keyChan chan momento.Key,
errChan chan *errKeyVal,
) {
for {
myKey := <-keyChan
if myKey == nil {
return
}
_, err := client.Delete(ctx, &momento.DeleteRequest{
CacheName: cacheName,
Key: myKey,
})
if err != nil {
errChan <- &errKeyVal{
key: myKey,
error: err,
}
} else {
errChan <- nil
}
}
}

type BatchDeleteRequest struct {
Client momento.CacheClient
CacheName string
Keys []momento.Key
MaxConcurrentDeletes int
}

// BatchDeleteError contains a map associating failing cache keys with their specific errors.
// It may be necessary to use a type assertion to access the errors:
//
// errors := err.(*BatchDeleteError).Errors()
type BatchDeleteError struct {
errors map[momento.Value]error
}

func (e *BatchDeleteError) Error() string {
return "errors occurred during batch delete"
}

func (e *BatchDeleteError) Errors() map[momento.Value]error {
return e.errors
}

// BatchDelete deletes a slice of keys from the cache, returning a map from failing cache keys to their specific errors.
func BatchDelete(ctx context.Context, props *BatchDeleteRequest) *BatchDeleteError {
// initialize return value
cancelCtx, cancelFunc := context.WithCancel(ctx)
// stop the key distributor when we return
defer cancelFunc()
var wg sync.WaitGroup

if props.MaxConcurrentDeletes == 0 {
props.MaxConcurrentDeletes = maxConcurrentDeletes
}
if len(props.Keys) < props.MaxConcurrentDeletes {
props.MaxConcurrentDeletes = len(props.Keys)
}
keyChan := make(chan momento.Key, props.MaxConcurrentDeletes)
errChan := make(chan *errKeyVal, len(props.Keys))

for i := 0; i < props.MaxConcurrentDeletes; i++ {
wg.Add(1)

go func() {
defer wg.Done()
deleteWorker(ctx, props.Client, props.CacheName, keyChan, errChan)
}()
}

go keyDistributor(cancelCtx, props.Keys, keyChan)

// wait for the workers to return
wg.Wait()

var errors = make(map[momento.Value]error, 0)
for i := 0; i < len(props.Keys); i++ {
msg := <-errChan
if msg != nil {
errors[msg.key] = msg.error
}
}

if len(errors) == 0 {
return nil
}
return &BatchDeleteError{errors: errors}
}
95 changes: 95 additions & 0 deletions batchutils/batch_operations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package batchutils_test

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/momentohq/client-sdk-go/auth"
"github.com/momentohq/client-sdk-go/batchutils"
"github.com/momentohq/client-sdk-go/config"
"github.com/momentohq/client-sdk-go/config/logger"
. "github.com/momentohq/client-sdk-go/momento"
"github.com/momentohq/client-sdk-go/responses"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Batch operations", func() {

var (
ctx context.Context
client CacheClient
cacheName string
keys []Value
)

BeforeEach(func() {
ctx = context.Background()
cacheName = fmt.Sprintf("golang-%s", uuid.NewString())
credentialProvider, err := auth.FromEnvironmentVariable("TEST_AUTH_TOKEN")
if err != nil {
panic(err)
}
client, err = NewCacheClient(
config.LaptopLatestWithLogger(logger.NewNoopMomentoLoggerFactory()),
credentialProvider,
time.Second*60,
)
if err != nil {
panic(err)
}

_, err = client.CreateCache(ctx, &CreateCacheRequest{CacheName: cacheName})
if err != nil {
panic(err)
}

for i := 0; i < 50; i++ {
key := String(fmt.Sprintf("key%d", i))
keys = append(keys, key)
_, err := client.Set(ctx, &SetRequest{
CacheName: cacheName,
Key: key,
Value: String(fmt.Sprintf("val%d", i)),
})
if err != nil {
panic(err)
}
}
})

AfterEach(func() {
_, err := client.DeleteCache(ctx, &DeleteCacheRequest{CacheName: cacheName})
if err != nil {
panic(err)
}
})

It("batch deletes", func() {
errors := batchutils.BatchDelete(ctx, &batchutils.BatchDeleteRequest{
Client: client,
CacheName: cacheName,
Keys: keys[5:21],
})
Expect(errors).To(BeNil())
for i := 0; i < 50; i++ {
resp, err := client.Get(ctx, &GetRequest{
CacheName: cacheName,
Key: keys[i],
})
Expect(err).To(BeNil())
switch resp.(type) {
case *responses.GetHit:
if i >= 5 && i <= 20 {
Fail("got a hit for #%d that should be a miss", i)
}
case *responses.GetMiss:
if !(i >= 5 && i <= 20) {
Fail("got a miss for #%d that should be a hit", i)
}
}
}
})
})
13 changes: 13 additions & 0 deletions batchutils/batchutils_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package batchutils_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestAuth(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Batch utils Suite")
}

0 comments on commit 327415a

Please sign in to comment.