diff --git a/Makefile b/Makefile index aeb568ee..a4c9a817 100644 --- a/Makefile +++ b/Makefile @@ -49,14 +49,13 @@ precommit: lint test .PHONY: test test: - ginkgo momento/ auth/ + ginkgo momento/ auth/ batchutils/ .PHONY: vendor vendor: go mod vendor .PHONY: build-examples -build: +build-examples: cd examples go build ./... - \ No newline at end of file diff --git a/batchutils/batch_operations.go b/batchutils/batch_operations.go new file mode 100644 index 00000000..1acbfdff --- /dev/null +++ b/batchutils/batch_operations.go @@ -0,0 +1,125 @@ +package batchutils + +import ( + "context" + "sync" + + "github.com/momentohq/client-sdk-go/momento" +) + +const maxConcurrentDeletes = 5 + +func keyDistributor(ctx context.Context, keys []momento.Key, keyChan chan momento.Key) { + for _, k := range keys { + keyChan <- k + } + + for { + select { + case <-ctx.Done(): + return + default: + keyChan <- nil + } + } +} + +type errKeyVal struct { + key momento.Value + error error +} + +func deleteWorker( + ctx context.Context, + client momento.CacheClient, + cacheName string, + keyChan chan momento.Key, + errChan chan *errKeyVal, +) { + for { + myKey := <-keyChan + if myKey == nil { + return + } + _, err := client.Delete(ctx, &momento.DeleteRequest{ + CacheName: cacheName, + Key: myKey, + }) + if err != nil { + errChan <- &errKeyVal{ + key: myKey, + error: err, + } + } else { + errChan <- nil + } + } +} + +type BatchDeleteRequest struct { + Client momento.CacheClient + CacheName string + Keys []momento.Key + MaxConcurrentDeletes int +} + +// BatchDeleteError contains a map associating failing cache keys with their specific errors. +// It may be necessary to use a type assertion to access the errors: +// +// errors := err.(*BatchDeleteError).Errors() +type BatchDeleteError struct { + errors map[momento.Value]error +} + +func (e *BatchDeleteError) Error() string { + return "errors occurred during batch delete" +} + +func (e *BatchDeleteError) Errors() map[momento.Value]error { + return e.errors +} + +// BatchDelete deletes a slice of keys from the cache, returning a map from failing cache keys to their specific errors. +func BatchDelete(ctx context.Context, props *BatchDeleteRequest) *BatchDeleteError { + // initialize return value + cancelCtx, cancelFunc := context.WithCancel(ctx) + // stop the key distributor when we return + defer cancelFunc() + var wg sync.WaitGroup + + if props.MaxConcurrentDeletes == 0 { + props.MaxConcurrentDeletes = maxConcurrentDeletes + } + if len(props.Keys) < props.MaxConcurrentDeletes { + props.MaxConcurrentDeletes = len(props.Keys) + } + keyChan := make(chan momento.Key, props.MaxConcurrentDeletes) + errChan := make(chan *errKeyVal, len(props.Keys)) + + for i := 0; i < props.MaxConcurrentDeletes; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + deleteWorker(ctx, props.Client, props.CacheName, keyChan, errChan) + }() + } + + go keyDistributor(cancelCtx, props.Keys, keyChan) + + // wait for the workers to return + wg.Wait() + + var errors = make(map[momento.Value]error, 0) + for i := 0; i < len(props.Keys); i++ { + msg := <-errChan + if msg != nil { + errors[msg.key] = msg.error + } + } + + if len(errors) == 0 { + return nil + } + return &BatchDeleteError{errors: errors} +} diff --git a/batchutils/batch_operations_test.go b/batchutils/batch_operations_test.go new file mode 100644 index 00000000..e11a8e36 --- /dev/null +++ b/batchutils/batch_operations_test.go @@ -0,0 +1,95 @@ +package batchutils_test + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/momentohq/client-sdk-go/auth" + "github.com/momentohq/client-sdk-go/batchutils" + "github.com/momentohq/client-sdk-go/config" + "github.com/momentohq/client-sdk-go/config/logger" + . "github.com/momentohq/client-sdk-go/momento" + "github.com/momentohq/client-sdk-go/responses" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Batch operations", func() { + + var ( + ctx context.Context + client CacheClient + cacheName string + keys []Value + ) + + BeforeEach(func() { + ctx = context.Background() + cacheName = fmt.Sprintf("golang-%s", uuid.NewString()) + credentialProvider, err := auth.FromEnvironmentVariable("TEST_AUTH_TOKEN") + if err != nil { + panic(err) + } + client, err = NewCacheClient( + config.LaptopLatestWithLogger(logger.NewNoopMomentoLoggerFactory()), + credentialProvider, + time.Second*60, + ) + if err != nil { + panic(err) + } + + _, err = client.CreateCache(ctx, &CreateCacheRequest{CacheName: cacheName}) + if err != nil { + panic(err) + } + + for i := 0; i < 50; i++ { + key := String(fmt.Sprintf("key%d", i)) + keys = append(keys, key) + _, err := client.Set(ctx, &SetRequest{ + CacheName: cacheName, + Key: key, + Value: String(fmt.Sprintf("val%d", i)), + }) + if err != nil { + panic(err) + } + } + }) + + AfterEach(func() { + _, err := client.DeleteCache(ctx, &DeleteCacheRequest{CacheName: cacheName}) + if err != nil { + panic(err) + } + }) + + It("batch deletes", func() { + errors := batchutils.BatchDelete(ctx, &batchutils.BatchDeleteRequest{ + Client: client, + CacheName: cacheName, + Keys: keys[5:21], + }) + Expect(errors).To(BeNil()) + for i := 0; i < 50; i++ { + resp, err := client.Get(ctx, &GetRequest{ + CacheName: cacheName, + Key: keys[i], + }) + Expect(err).To(BeNil()) + switch resp.(type) { + case *responses.GetHit: + if i >= 5 && i <= 20 { + Fail("got a hit for #%d that should be a miss", i) + } + case *responses.GetMiss: + if !(i >= 5 && i <= 20) { + Fail("got a miss for #%d that should be a hit", i) + } + } + } + }) +}) diff --git a/batchutils/batchutils_suite_test.go b/batchutils/batchutils_suite_test.go new file mode 100644 index 00000000..72cdd303 --- /dev/null +++ b/batchutils/batchutils_suite_test.go @@ -0,0 +1,13 @@ +package batchutils_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestAuth(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Batch utils Suite") +}