Skip to content

Commit

Permalink
[ADDED] UpdateKeyValue and CreateOrUpdateKeyValue methods (#1549)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Feb 8, 2024
1 parent d7fda7d commit 547cafa
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 22 deletions.
6 changes: 5 additions & 1 deletion jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const (
)

var (
// API errors
// JetStream API errors

// ErrJetStreamNotEnabled is an error returned when JetStream is not
// enabled.
Expand Down Expand Up @@ -284,6 +284,10 @@ var (
// name.
ErrInvalidKey JetStreamError = &jsError{message: "invalid key"}

// ErrBucketExists is returned when attempting to create a bucket that
// already exists and has a different configuration.
ErrBucketExists JetStreamError = &jsError{message: "bucket name already in use"}

// ErrBucketNotFound is returned when attempting to access a bucket that
// does not exist.
ErrBucketNotFound JetStreamError = &jsError{message: "bucket not found"}
Expand Down
113 changes: 95 additions & 18 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,35 @@ type (
// KeyValueManager is used to manage KeyValue stores. It provides methods to
// create, delete, and retrieve KeyValue stores.
KeyValueManager interface {
// KeyValue will lookup and bind to an existing KeyValue store. If the
// KeyValue store with given name does not exist, ErrBucketNotFound will
// be returned.
// KeyValue will lookup and bind to an existing KeyValue store.
//
// If the KeyValue store with given name does not exist,
// ErrBucketNotFound will be returned.
KeyValue(ctx context.Context, bucket string) (KeyValue, error)

// CreateKeyValue will create a KeyValue store with the given
// configuration.
//
// If a KeyValue store with the same name already exists and the
// configuration is different, ErrBucketExists will be returned.
CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error)

// UpdateKeyValue will update an existing KeyValue store with the given
// configuration.
//
// If a KeyValue store with the given name does not exist, ErrBucketNotFound
// will be returned.
UpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error)

// CreateOrUpdateKeyValue will create a KeyValue store if it does not
// exist or update an existing KeyValue store with the given
// configuration (if possible).
CreateOrUpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error)

// DeleteKeyValue will delete this KeyValue store.
//
// If the KeyValue store with given name does not exist,
// ErrBucketNotFound will be returned.
DeleteKeyValue(ctx context.Context, bucket string) error

// KeyValueStoreNames is used to retrieve a list of key value store
Expand Down Expand Up @@ -457,20 +476,81 @@ func (js *jetStream) KeyValue(ctx context.Context, bucket string) (KeyValue, err
return mapStreamToKVS(js, pushJS, stream), nil
}

// CreateKeyValue will create a KeyValue store with the following configuration.
func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) {
scfg, err := js.prepareKeyValueConfig(ctx, cfg)
if err != nil {
return nil, err
}

stream, err := js.CreateStream(ctx, scfg)
if err != nil {
if errors.Is(err, ErrStreamNameAlreadyInUse) {
// errors are joined so that backwards compatibility is retained
// and previous checks for ErrStreamNameAlreadyInUse will still work.
err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err)
}
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToKVS(js, pushJS, stream), nil
}

func (js *jetStream) UpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) {
scfg, err := js.prepareKeyValueConfig(ctx, cfg)
if err != nil {
return nil, err
}

stream, err := js.UpdateStream(ctx, scfg)
if err != nil {
if errors.Is(err, ErrStreamNotFound) {
err = fmt.Errorf("%w: %s", ErrBucketNotFound, cfg.Bucket)
}
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToKVS(js, pushJS, stream), nil
}

func (js *jetStream) CreateOrUpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) {
scfg, err := js.prepareKeyValueConfig(ctx, cfg)
if err != nil {
return nil, err
}

stream, err := js.CreateOrUpdateStream(ctx, scfg)
if err != nil {
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToKVS(js, pushJS, stream), nil
}

func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConfig) (StreamConfig, error) {
if !validBucketRe.MatchString(cfg.Bucket) {
return nil, ErrInvalidBucketName
return StreamConfig{}, ErrInvalidBucketName
}
if _, err := js.AccountInfo(ctx); err != nil {
return nil, err
return StreamConfig{}, err
}

// Default to 1 for history. Max is 64 for now.
history := int64(1)
if cfg.History > 0 {
if cfg.History > KeyValueMaxHistory {
return nil, ErrHistoryTooLarge
return StreamConfig{}, ErrHistoryTooLarge
}
history = int64(cfg.History)
}
Expand Down Expand Up @@ -551,16 +631,7 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}

stream, err := js.CreateStream(ctx, scfg)
if err != nil {
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToKVS(js, pushJS, stream), nil
return scfg, nil
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
Expand All @@ -569,7 +640,13 @@ func (js *jetStream) DeleteKeyValue(ctx context.Context, bucket string) error {
return ErrInvalidBucketName
}
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
return js.DeleteStream(ctx, stream)
if err := js.DeleteStream(ctx, stream); err != nil {
if errors.Is(err, ErrStreamNotFound) {
err = errors.Join(fmt.Errorf("%w: %s", ErrBucketNotFound, bucket), err)
}
return err
}
return nil
}

// KeyValueStoreNames is used to retrieve a list of key value store names
Expand Down
76 changes: 73 additions & 3 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -113,6 +113,69 @@ func TestKeyValueBasics(t *testing.T) {
}
}

func TestCreateKeyValue(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// invalid bucket name
_, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST.", Description: "Test KV"})
expectErr(t, err, jetstream.ErrInvalidBucketName)

_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "Test KV"})
expectOk(t, err)

// Check that we can't overwrite existing bucket.
_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "New KV"})
expectErr(t, err, jetstream.ErrBucketExists)

// assert that we're backwards compatible
expectErr(t, err, jetstream.ErrStreamNameAlreadyInUse)
}

func TestUpdateKeyValue(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// cannot update a non-existing bucket
_, err := js.UpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "Test KV"})
expectErr(t, err, jetstream.ErrBucketNotFound)

_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "Test KV"})
expectOk(t, err)

// update the bucket
_, err = js.UpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "New KV"})
expectOk(t, err)
}

func TestCreateOrUpdateKeyValue(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// invalid bucket name
_, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST.", Description: "Test KV"})
expectErr(t, err, jetstream.ErrInvalidBucketName)

_, err = js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "Test KV"})
expectOk(t, err)

// update the bucket
_, err = js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", Description: "New KV"})
expectOk(t, err)
}

func TestKeyValueHistory(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down Expand Up @@ -565,8 +628,15 @@ func TestKeyValueDeleteStore(t *testing.T) {
err = js.DeleteKeyValue(ctx, "WATCH")
expectOk(t, err)

// delete again should fail
err = js.DeleteKeyValue(ctx, "WATCH")
expectErr(t, err, jetstream.ErrBucketNotFound)

// check that we're backwards compatible
expectErr(t, err, jetstream.ErrStreamNotFound)

_, err = js.KeyValue(ctx, "WATCH")
expectErr(t, err)
expectErr(t, err, jetstream.ErrBucketNotFound)
}

func TestKeyValueDeleteVsPurge(t *testing.T) {
Expand Down Expand Up @@ -1474,7 +1544,7 @@ func expectErr(t *testing.T, err error, expected ...error) {
return
}
for _, e := range expected {
if err == e || strings.Contains(e.Error(), err.Error()) {
if errors.Is(err, e) {
return
}
}
Expand Down

0 comments on commit 547cafa

Please sign in to comment.