Skip to content

Commit

Permalink
kv intro plus additional update api (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Dec 18, 2022
1 parent a789e1f commit a766783
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 0 deletions.
110 changes: 110 additions & 0 deletions src/examples/java/io/nats/examples/natsbyexample/KeyValueIntro.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.natsbyexample;

import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.Headers;
import io.nats.examples.ExampleUtils;

import java.util.List;

/**
* Key-Value Intro
* The key-value (KV) capability in NATS is an abstraction over a stream which models message subjects as keys. It uses a standard set of stream configuration to be optimized for KV workloads.
*/
public class KeyValueIntro {

public static void main(String[] args) {
try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions("nats://localhost:4222"))) {
KeyValueManagement kvm = nc.keyValueManagement();

// create the bucket
KeyValueConfiguration kvc = KeyValueConfiguration.builder()
.name("profiles")
.storageType(StorageType.Memory)
.build();

KeyValueStatus keyValueStatus = kvm.create(kvc);

KeyValue kv = nc.keyValue("profiles");

kv.put("sue.color", "blue".getBytes());
KeyValueEntry entry = kv.get("sue.color");
System.out.printf("%s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString());

kv.put("sue.color", "green");
entry = kv.get("sue.color");
System.out.printf("%s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString());

try {
kv.update("sue.color", "red".getBytes(), 1);
}
catch (JetStreamApiException e) {
System.out.println(e);
}

long lastRevision = entry.getRevision();
kv.update("sue.color", "red".getBytes(), lastRevision);
entry = kv.get("sue.color");
System.out.printf("%s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString());

JetStreamManagement jsm = nc.jetStreamManagement();

List<String> streamNames = jsm.getStreamNames();
System.out.println(streamNames);

JetStream js = nc.jetStream();

PushSubscribeOptions pso = PushSubscribeOptions.builder()
.stream("KV_profiles").build();
JetStreamSubscription sub = js.subscribe(">", pso);

Message m = sub.nextMessage(100);
System.out.printf("%s %d -> %s\n", m.getSubject(), m.metaData().streamSequence(), new String(m.getData()));

kv.put("sue.color", "yellow".getBytes());
m = sub.nextMessage(100);
System.out.printf("%s %d -> %s\n", m.getSubject(), m.metaData().streamSequence(), new String(m.getData()));

kv.delete("sue.color");
m = sub.nextMessage(100);
System.out.printf("%s %d -> %s\n", m.getSubject(), m.metaData().streamSequence(), new String(m.getData()));
System.out.println("Headers:");
Headers headers = m.getHeaders();
for (String key : headers.keySet()) {
System.out.printf(" %s:%s\n", key, headers.getFirst(key));
}

KeyValueWatcher watcher = new KeyValueWatcher() {
@Override
public void watch(KeyValueEntry entry) {
System.out.printf("Watcher: %s %d -> %s\n", entry.getKey(), entry.getRevision(), entry.getValueAsString());
}

@Override
public void endOfData() {
System.out.println("Watcher: Received End Of Data Signal");
}
};

kv.watch("sue.*", watcher, KeyValueWatchOption.UPDATES_ONLY);

kv.put("sue.color", "purple");
}
catch (Exception exp) {
exp.printStackTrace();
}
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ public interface KeyValue {
*/
long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException;

/**
* Put a string as the value for a key iff the key exists and its last revision matches the expected
* @param key the key
* @param value the UTF-8 string
* @param expectedRevision the expected last revision
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException;

/**
* Soft deletes the key by placing a delete marker.
* @param key the key
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/nats/client/impl/NatsKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ public long update(String key, byte[] value, long expectedRevision) throws IOExc
return _write(key, value, h).getSeqno();
}

/**
* {@inheritDoc}
*/
@Override
public long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException {
return update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision);
}

/**
* {@inheritDoc}
*/
Expand Down

0 comments on commit a766783

Please sign in to comment.