Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object store tuning #731

Merged
merged 5 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ Subscription creation has many checks to make sure that a valid, operable subscr
| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. |
| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. |
| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. |
| OsLinkNotAllowOnPut | OS | 90209 | Link not allowed in metadata when putting an object. |
| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. |

Expand Down
13 changes: 12 additions & 1 deletion src/main/java/io/nats/client/ObjectStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public interface ObjectStore {
ObjectInfo get(String objectName, OutputStream outputStream) throws IOException, JetStreamApiException, InterruptedException, NoSuchAlgorithmException;

/**
* Get the info for an object if the object exists or did exist and is now deleted.
* Get the info for an object if the object exists / is not deleted.
* OBJECT STORE IMPLEMENTATION IS EXPERIMENTAL AND SUBJECT TO CHANGE.
* @param objectName The name of the object
* @return the ObjectInfo for the object name or throw an exception if it does not exist.
Expand All @@ -106,6 +106,17 @@ public interface ObjectStore {
*/
ObjectInfo getInfo(String objectName) throws IOException, JetStreamApiException;

/**
* Get the info for an object if the object exists, optionally including deleted.
* OBJECT STORE IMPLEMENTATION IS EXPERIMENTAL AND SUBJECT TO CHANGE.
* @param objectName The name of the object
* @param includingDeleted whether to return info for deleted objects
* @return the ObjectInfo for the object name or throw an exception if it does not exist.
* @throws IOException covers various communication issues with the NATS server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
ObjectInfo getInfo(String objectName, boolean includingDeleted) throws IOException, JetStreamApiException;

/**
* Update the metadata of name, description or headers. All other changes are ignored.
* OBJECT STORE IMPLEMENTATION IS EXPERIMENTAL AND SUBJECT TO CHANGE.
Expand Down
36 changes: 25 additions & 11 deletions src/main/java/io/nats/client/impl/NatsObjectStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public ObjectInfo put(ObjectMeta meta, InputStream inputStream) throws IOExcepti
Validator.validateNotNull(meta, "ObjectMeta");
Validator.validateNotNull(meta.getObjectName(), "ObjectMeta name");
Validator.validateNotNull(inputStream, "InputStream");
if (meta.getObjectMetaOptions().getLink() != null) {
throw OsLinkNotAllowOnPut.instance();
}

String nuid = NUID.nextGlobal();
String chunkSubject = pubSubChunkSubject(nuid);
Expand Down Expand Up @@ -191,8 +194,8 @@ public ObjectInfo put(File file) throws IOException, JetStreamApiException, NoSu
*/
@Override
public ObjectInfo get(String objectName, OutputStream out) throws IOException, JetStreamApiException, InterruptedException, NoSuchAlgorithmException {
ObjectInfo oi = getInfo(objectName);
if (oi == null || oi.isDeleted()) {
ObjectInfo oi = getInfo(objectName, false);
if (oi == null) {
throw OsObjectNotFound.instance();
}

Expand Down Expand Up @@ -266,8 +269,20 @@ public ObjectInfo get(String objectName, OutputStream out) throws IOException, J
*/
@Override
public ObjectInfo getInfo(String objectName) throws IOException, JetStreamApiException {
return getInfo(objectName, false);
}

/**
* {@inheritDoc}
*/
@Override
public ObjectInfo getInfo(String objectName, boolean includingDeleted) throws IOException, JetStreamApiException {
MessageInfo mi = _getLast(rawMetaSubject(objectName));
return mi == null ? null : new ObjectInfo(mi);
if (mi == null) {
return null;
}
ObjectInfo info = new ObjectInfo(mi);
return includingDeleted || !info.isDeleted() ? info : null;
}

/**
Expand All @@ -279,7 +294,7 @@ public ObjectInfo updateMeta(String objectName, ObjectMeta meta) throws IOExcept
Validator.validateNotNull(meta, "ObjectMeta");
Validator.validateNotNull(meta.getObjectName(), "ObjectMeta name");

ObjectInfo currentInfo = getInfo(objectName);
ObjectInfo currentInfo = getInfo(objectName, true);
if (currentInfo == null) {
throw OsObjectNotFound.instance();
}
Expand All @@ -289,8 +304,7 @@ public ObjectInfo updateMeta(String objectName, ObjectMeta meta) throws IOExcept

boolean nameChange = !objectName.equals(meta.getObjectName());
if (nameChange) {
ObjectInfo infoForNewName = getInfo(meta.getObjectName());
if (infoForNewName != null && !infoForNewName.isDeleted()) {
if (getInfo(meta.getObjectName(), false) != null) {
throw OsObjectAlreadyExists.instance();
}
}
Expand All @@ -314,7 +328,7 @@ public ObjectInfo updateMeta(String objectName, ObjectMeta meta) throws IOExcept
*/
@Override
public ObjectInfo delete(String objectName) throws IOException, JetStreamApiException {
ObjectInfo info = getInfo(objectName);
ObjectInfo info = getInfo(objectName, true);
if (info == null) {
throw OsObjectNotFound.instance();
}
Expand Down Expand Up @@ -351,8 +365,8 @@ public ObjectInfo addLink(String objectName, ObjectInfo toInfo) throws IOExcepti
throw OsCantLinkToLink.instance();
}

ObjectInfo info = getInfo(objectName);
if (info != null && !info.isDeleted() && !info.isLink()) {
ObjectInfo info = getInfo(objectName, false);
if (info != null && !info.isLink()) {
throw OsObjectAlreadyExists.instance();
}

Expand All @@ -370,8 +384,8 @@ public ObjectInfo addBucketLink(String objectName, ObjectStore toStore) throws I
Validator.validateNotNull(objectName, "object name");
Validator.validateNotNull(toStore, "Link-To ObjectStore");

ObjectInfo info = getInfo(objectName);
if (info != null && !info.isDeleted() && !info.isLink()) {
ObjectInfo info = getInfo(objectName, false);
if (info != null && !info.isLink()) {
throw OsObjectAlreadyExists.instance();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public class NatsJetStreamClientError {
public static final NatsJetStreamClientError OsGetChunksMismatch = new NatsJetStreamClientError(OS, 90206, "Number of chunks does not match meta data.");
public static final NatsJetStreamClientError OsGetSizeMismatch = new NatsJetStreamClientError(OS, 90207, "Total size does not match meta data.");
public static final NatsJetStreamClientError OsGetLinkToBucket = new NatsJetStreamClientError(OS, 90208, "Cannot get object, it is a link to a bucket.");
public static final NatsJetStreamClientError OsLinkNotAllowOnPut = new NatsJetStreamClientError(OS, 90209, "Link not allowed in metadata when putting an object.");

public static final NatsJetStreamClientError JsConsumerCreate290NotAvailable = new NatsJetStreamClientError(CON, 90301, "Name field not valid when v2.9.0 consumer create api is not available.");
public static final NatsJetStreamClientError JsConsumerNameDurableMismatch = new NatsJetStreamClientError(CON, 90302, "Name must match durable if both are supplied.");


@Deprecated // Fixed spelling error
public static final NatsJetStreamClientError JsSubFcHbHbNotValidQueue = new NatsJetStreamClientError(SUB, 90006, "Flow Control and/or heartbeat is not valid in queue mode.");

Expand Down
11 changes: 8 additions & 3 deletions src/test/java/io/nats/client/impl/ObjectStoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ public void testObjectLinks() throws Exception {
ObjectInfo targetWillBeDeleted = os2.addLink("targetWillBeDeleted", info21);
validateLink(targetWillBeDeleted, "targetWillBeDeleted", info21, null);
os2.delete(info21.getObjectName());
ObjectInfo oiDeleted = os2.getInfo(info21.getObjectName());
assertTrue(oiDeleted.isDeleted()); // object is deleted
ObjectInfo oiDeleted = os2.getInfo(info21.getObjectName(), true);
assertTrue(oiDeleted.isDeleted()); // object is deleted but includingDeleted = true
assertNull(os2.getInfo(info21.getObjectName())); // does includingDeleted = false
assertNull(os2.getInfo(info21.getObjectName(), false)); // explicit includingDeleted = false
assertClientError(OsObjectIsDeleted, () -> os2.addLink("willException", oiDeleted));
ObjectInfo oiLink = os2.getInfo("targetWillBeDeleted");
assertNotNull(oiLink); // link is still there but link target is deleted
Expand All @@ -364,6 +366,9 @@ public void testObjectLinks() throws Exception {
assertEquals(info12, crossInfo12);
assertEquals(2, baos.size());
assertEquals("12", baos.toString());

ObjectMeta linkInPut = ObjectMeta.builder("linkInPut").link(ObjectLink.object("na", "na")).build();
assertClientError(OsLinkNotAllowOnPut, () -> os2.put(linkInPut, new ByteArrayInputStream("na".getBytes())));
});
}

Expand All @@ -389,7 +394,7 @@ private void validateLink(ObjectInfo oiLink, String linkName, ObjectInfo targetI
}

@Test
public void testObjectList() throws Exception {
public void testList() throws Exception {
runInJsServer(nc -> {
ObjectStoreManagement osm = nc.objectStoreManagement();
osm.create(ObjectStoreConfiguration.builder(BUCKET).storageType(StorageType.Memory).build());
Expand Down