From a7667833db59221c634b5c7d7c86a8f7f1681a7e Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Sun, 18 Dec 2022 10:55:53 -0500 Subject: [PATCH] kv intro plus additional update api (#814) --- .../examples/natsbyexample/KeyValueIntro.java | 110 ++++++++++++++++++ src/main/java/io/nats/client/KeyValue.java | 13 +++ .../io/nats/client/impl/NatsKeyValue.java | 8 ++ 3 files changed, 131 insertions(+) create mode 100644 src/examples/java/io/nats/examples/natsbyexample/KeyValueIntro.java diff --git a/src/examples/java/io/nats/examples/natsbyexample/KeyValueIntro.java b/src/examples/java/io/nats/examples/natsbyexample/KeyValueIntro.java new file mode 100644 index 000000000..253e1359a --- /dev/null +++ b/src/examples/java/io/nats/examples/natsbyexample/KeyValueIntro.java @@ -0,0 +1,110 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.natsbyexample; + +import io.nats.client.*; +import io.nats.client.api.*; +import io.nats.client.impl.Headers; +import io.nats.examples.ExampleUtils; + +import java.util.List; + +/** + * Key-Value Intro + * The key-value (KV) capability in NATS is an abstraction over a stream which models message subjects as keys. It uses a standard set of stream configuration to be optimized for KV workloads. + */ +public class KeyValueIntro { + + public static void main(String[] args) { + try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions("nats://localhost:4222"))) { + KeyValueManagement kvm = nc.keyValueManagement(); + + // create the bucket + KeyValueConfiguration kvc = KeyValueConfiguration.builder() + .name("profiles") + .storageType(StorageType.Memory) + .build(); + + KeyValueStatus keyValueStatus = kvm.create(kvc); + + KeyValue kv = nc.keyValue("profiles"); + + kv.put("sue.color", "blue".getBytes()); + KeyValueEntry entry = kv.get("sue.color"); + System.out.printf("%s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString()); + + kv.put("sue.color", "green"); + entry = kv.get("sue.color"); + System.out.printf("%s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString()); + + try { + kv.update("sue.color", "red".getBytes(), 1); + } + catch (JetStreamApiException e) { + System.out.println(e); + } + + long lastRevision = entry.getRevision(); + kv.update("sue.color", "red".getBytes(), lastRevision); + entry = kv.get("sue.color"); + System.out.printf("%s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString()); + + JetStreamManagement jsm = nc.jetStreamManagement(); + + List streamNames = jsm.getStreamNames(); + System.out.println(streamNames); + + JetStream js = nc.jetStream(); + + PushSubscribeOptions pso = PushSubscribeOptions.builder() + .stream("KV_profiles").build(); + JetStreamSubscription sub = js.subscribe(">", pso); + + Message m = sub.nextMessage(100); + System.out.printf("%s %d -> %s\n", m.getSubject(), m.metaData().streamSequence(), new String(m.getData())); + + kv.put("sue.color", "yellow".getBytes()); + m = sub.nextMessage(100); + System.out.printf("%s %d -> %s\n", m.getSubject(), m.metaData().streamSequence(), new String(m.getData())); + + kv.delete("sue.color"); + m = sub.nextMessage(100); + System.out.printf("%s %d -> %s\n", m.getSubject(), m.metaData().streamSequence(), new String(m.getData())); + System.out.println("Headers:"); + Headers headers = m.getHeaders(); + for (String key : headers.keySet()) { + System.out.printf(" %s:%s\n", key, headers.getFirst(key)); + } + + KeyValueWatcher watcher = new KeyValueWatcher() { + @Override + public void watch(KeyValueEntry entry) { + System.out.printf("Watcher: %s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString()); + } + + @Override + public void endOfData() { + System.out.println("Watcher: Received End Of Data Signal"); + } + }; + + kv.watch("sue.*", watcher, KeyValueWatchOption.UPDATES_ONLY); + + kv.put("sue.color", "purple"); + } + catch (Exception exp) { + exp.printStackTrace(); + } + } +} diff --git a/src/main/java/io/nats/client/KeyValue.java b/src/main/java/io/nats/client/KeyValue.java index 626b9f5ff..50de7ebd0 100644 --- a/src/main/java/io/nats/client/KeyValue.java +++ b/src/main/java/io/nats/client/KeyValue.java @@ -116,6 +116,19 @@ public interface KeyValue { */ long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException; + /** + * Put a string as the value for a key iff the key exists and its last revision matches the expected + * @param key the key + * @param value the UTF-8 string + * @param expectedRevision the expected last revision + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException; + /** * Soft deletes the key by placing a delete marker. * @param key the key diff --git a/src/main/java/io/nats/client/impl/NatsKeyValue.java b/src/main/java/io/nats/client/impl/NatsKeyValue.java index dad5e1fff..e9e974ec6 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValue.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValue.java @@ -186,6 +186,14 @@ public long update(String key, byte[] value, long expectedRevision) throws IOExc return _write(key, value, h).getSeqno(); } + /** + * {@inheritDoc} + */ + @Override + public long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException { + return update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision); + } + /** * {@inheritDoc} */