From 178337cb803b97825a7dd19d6e1fdab5e1e47ee3 Mon Sep 17 00:00:00 2001 From: nicktorwald Date: Wed, 19 Jun 2019 02:58:58 +0700 Subject: [PATCH] Soft automatic schema reload Now client keeps actual schema metadata and sends schemaId header to be checked against current Tarantool schema version. If client version mismatches DB version client does schema reloading in the background. Client operation interface was reworked in scope of support not only number identifiers for spaces and indexes but also their string names. Closes: #7, #137 --- README.md | 12 + .../org/tarantool/AbstractTarantoolOps.java | 201 ++++++++++-- .../java/org/tarantool/TarantoolBase.java | 12 +- .../java/org/tarantool/TarantoolClient.java | 8 +- .../org/tarantool/TarantoolClientImpl.java | 290 ++++++++++++++---- .../org/tarantool/TarantoolClientOps.java | 30 +- .../org/tarantool/TarantoolClusterClient.java | 42 +-- .../org/tarantool/TarantoolConnection.java | 18 +- .../org/tarantool/TarantoolException.java | 12 +- ...antoolClusterStoredFunctionDiscoverer.java | 9 +- .../tarantool/protocol/ProtoConstants.java | 19 ++ .../org/tarantool/protocol/ProtoUtils.java | 26 +- .../tarantool/protocol/TarantoolPacket.java | 15 +- .../tarantool/schema/TarantoolIndexMeta.java | 147 +++++++++ .../TarantoolIndexNotFoundException.java | 16 + .../schema/TarantoolSchemaException.java | 15 + .../tarantool/schema/TarantoolSchemaMeta.java | 86 ++++++ .../tarantool/schema/TarantoolSpaceMeta.java | 94 ++++++ .../TarantoolSpaceNotFoundException.java | 9 + .../tarantool/ClientAsyncOperationsIT.java | 248 ++++++++++++++- .../tarantool/ClientReconnectClusterIT.java | 28 +- .../FireAndForgetClientOperationsIT.java | 41 ++- src/test/java/org/tarantool/IteratorTest.java | 14 +- .../org/tarantool/TarantoolClientOpsIT.java | 14 +- ...sterServiceStoredFunctionDiscovererIT.java | 3 +- .../jdbc/JdbcExceptionHandlingTest.java | 6 +- .../org/tarantool/schema/ClientSchemaIT.java | 226 ++++++++++++++ 27 files changed, 1436 insertions(+), 205 deletions(-) create mode 100644 src/main/java/org/tarantool/protocol/ProtoConstants.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolIndexMeta.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSchemaException.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java create mode 100644 src/test/java/org/tarantool/schema/ClientSchemaIT.java diff --git a/README.md b/README.md index 025726e9..5201f071 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,18 @@ makes possible for the client to configure a socket provider. * `ComposableAsyncOps` - return the operation result as a `CompletionStage` * `FireAndForgetOps` - returns the query ID +Each operation that requires space or index to be executed, can work with +number ID as well as string name of a space or an index. +Assume, we have `my_space` space with space ID `512` and its primary index +`primary` with index ID `0`. Then, for instance, `select` operations can be +performed as the following: + +```java +client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ); +// or using more convenient way +client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ); +``` + Feel free to override any method of `TarantoolClientImpl`. For example, to hook all the results, you could override this: diff --git a/src/main/java/org/tarantool/AbstractTarantoolOps.java b/src/main/java/org/tarantool/AbstractTarantoolOps.java index 8cefb379..8dea1038 100644 --- a/src/main/java/org/tarantool/AbstractTarantoolOps.java +++ b/src/main/java/org/tarantool/AbstractTarantoolOps.java @@ -1,57 +1,163 @@ package org.tarantool; +import static org.tarantool.AbstractTarantoolOps.ResolvableArgument.ofResolved; +import static org.tarantool.AbstractTarantoolOps.ResolvableArgument.ofUnresolved; -public abstract class AbstractTarantoolOps - implements TarantoolClientOps { +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +public abstract class AbstractTarantoolOps + implements TarantoolClientOps { + + private final Function defaultSpaceResolver = space -> resolveSpace((String) space); private Code callCode = Code.CALL; - protected abstract Result exec(Code code, Object... args); + protected Result exec(Code code, Object... args) { + return exec(code, ResolvableArgument.ofAllResolved(args)); + } + + protected abstract Result exec(Code code, ResolvableArgument... args); + + protected abstract int resolveSpace(String space); + + protected abstract int resolveSpaceIndex(String space, String index); + + protected static class ResolvableArgument { + + private final Object value; + private final Function resolver; + + private ResolvableArgument(Object value, Function resolver) { + Objects.requireNonNull(value); + this.value = value; + this.resolver = resolver; + } + + public static ResolvableArgument ofResolved(Object resolvedValue) { + return new ResolvableArgument(resolvedValue, null); + } + + public static ResolvableArgument[] ofAllResolved(Object[] resolvedValue) { + return Stream.of(resolvedValue).map(ResolvableArgument::ofResolved).toArray(ResolvableArgument[]::new); + } + + public static ResolvableArgument ofUnresolved(Object unresolvedValue, Function resolver) { + return new ResolvableArgument(unresolvedValue, resolver); + } + + public boolean isUnresolved() { + try { + return isResolvable() && resolver.apply(value) == null; + } catch (Exception ignored) { + return true; + } + } + + public boolean isResolvable() { + return resolver != null; + } - public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) { + public Object getValue() { + return isResolvable() ? resolver.apply(value) : value; + } + + @Override + public String toString() { + if (isUnresolved()) { + return "{unresolved: " + value + "}"; + } + return value.toString(); + } + + } + + @Override + public Result select(Integer space, Integer index, Tuple key, int offset, int limit, Iterator iterator) { return select(space, index, key, offset, limit, iterator.getValue()); } - public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) { - return exec( - Code.SELECT, - Key.SPACE, space, - Key.INDEX, index, - Key.KEY, key, - Key.ITERATOR, iterator, - Key.LIMIT, limit, - Key.OFFSET, offset + @Override + public Result select(String space, String index, Tuple key, int offset, int limit, Iterator iterator) { + return select(space, index, key, offset, limit, iterator.getValue()); + } + + @Override + public Result select(Integer space, Integer index, Tuple key, int offset, int limit, int iterator) { + return doSelect(ofResolved(space), ofResolved(index), key, offset, limit, iterator); + } + + @Override + public Result select(String space, String index, Tuple key, int offset, int limit, int iterator) { + return doSelect( + ofUnresolved(space, defaultSpaceResolver), + ofUnresolved(index, indexName -> resolveSpaceIndex(space, (String) indexName)), + key, offset, limit, iterator ); } - public Result insert(Space space, Tuple tuple) { - return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result insert(Integer space, Tuple tuple) { + return doInsert(ofResolved(space), tuple); } - public Result replace(Space space, Tuple tuple) { - return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result insert(String space, Tuple tuple) { + return doInsert(ofUnresolved(space, defaultSpaceResolver), tuple); } - public Result update(Space space, Tuple key, Operation... args) { - return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args); + @Override + public Result replace(Integer space, Tuple tuple) { + return doReplace(ofResolved(space), tuple); } - public Result upsert(Space space, Tuple key, Tuple def, Operation... args) { - return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args); + @Override + public Result replace(String space, Tuple tuple) { + return doReplace(ofUnresolved(space, defaultSpaceResolver), tuple); } - public Result delete(Space space, Tuple key) { - return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key); + @Override + public Result update(Integer space, Tuple key, Operation... args) { + return doUpdate(ofResolved(space), key, args); } + @Override + public Result update(String space, Tuple key, Operation... tuple) { + return doUpdate(ofUnresolved(space, defaultSpaceResolver), key, tuple); + } + + @Override + public Result upsert(Integer space, Tuple key, Tuple defTuple, Operation... ops) { + return doUpsert(ofResolved(space), key, defTuple, ops); + } + + @Override + public Result upsert(String space, Tuple key, Tuple defTuple, Operation... ops) { + return doUpsert(ofUnresolved(space, defaultSpaceResolver), key, defTuple, ops); + } + + @Override + public Result delete(Integer space, Tuple key) { + return doDelete(ofResolved(space), key); + } + + @Override + public Result delete(String space, Tuple key) { + return doDelete(ofUnresolved(space, defaultSpaceResolver), key); + } + + @Override public Result call(String function, Object... args) { return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args); } + @Override public Result eval(String expression, Object... args) { return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args); } + @Override public void ping() { exec(Code.PING); } @@ -59,4 +165,53 @@ public void ping() { public void setCallCode(Code callCode) { this.callCode = callCode; } + + private Result doDelete(ResolvableArgument space, Tuple key) { + return exec(Code.DELETE, ofResolved(Key.SPACE), space, ofResolved(Key.KEY), ofResolved(key)); + } + + private Result doUpsert(ResolvableArgument space, Tuple key, Tuple defTuple, Operation... ops) { + return exec( + Code.UPSERT, + ofResolved(Key.SPACE), space, + ofResolved(Key.KEY), ofResolved(key), + ofResolved(Key.TUPLE), ofResolved(defTuple), + ofResolved(Key.UPSERT_OPS), ofResolved(ops) + ); + } + + private Result doUpdate(ResolvableArgument space, Tuple key, Operation... ops) { + return exec( + Code.UPDATE, + ofResolved(Key.SPACE), space, + ofResolved(Key.KEY), ofResolved(key), + ofResolved(Key.TUPLE), ofResolved(ops) + ); + } + + private Result doReplace(ResolvableArgument space, Tuple tuple) { + return exec(Code.REPLACE, ofResolved(Key.SPACE), space, ofResolved(Key.TUPLE), ofResolved(tuple)); + } + + private Result doInsert(ResolvableArgument space, Tuple tuple) { + return exec(Code.INSERT, ofResolved(Key.SPACE), space, ofResolved(Key.TUPLE), ofResolved(tuple)); + } + + private Result doSelect(ResolvableArgument space, + ResolvableArgument index, + Tuple key, + int offset, + int limit, + int iterator) { + return exec( + Code.SELECT, + ofResolved(Key.SPACE), space, + ofResolved(Key.INDEX), index, + ofResolved(Key.KEY), ofResolved(key), + ofResolved(Key.ITERATOR), ofResolved(iterator), + ofResolved(Key.LIMIT), ofResolved(limit), + ofResolved(Key.OFFSET), ofResolved(offset) + ); + } + } diff --git a/src/main/java/org/tarantool/TarantoolBase.java b/src/main/java/org/tarantool/TarantoolBase.java index c74647ae..5f89f1fc 100644 --- a/src/main/java/org/tarantool/TarantoolBase.java +++ b/src/main/java/org/tarantool/TarantoolBase.java @@ -9,7 +9,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; -public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { +public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { protected String serverVersion; protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE; protected AtomicLong syncId = new AtomicLong(); @@ -42,16 +42,6 @@ protected void closeChannel(SocketChannel channel) { } } - protected void validateArgs(Object[] args) { - if (args != null) { - for (int i = 0; i < args.length; i += 2) { - if (args[i + 1] == null) { - throw new NullPointerException(((Key) args[i]).name() + " should not be null"); - } - } - } - } - public void setInitialRequestSize(int initialRequestSize) { this.initialRequestSize = initialRequestSize; } diff --git a/src/main/java/org/tarantool/TarantoolClient.java b/src/main/java/org/tarantool/TarantoolClient.java index 2ad0c84c..0848fb56 100644 --- a/src/main/java/org/tarantool/TarantoolClient.java +++ b/src/main/java/org/tarantool/TarantoolClient.java @@ -7,13 +7,13 @@ import java.util.concurrent.TimeUnit; public interface TarantoolClient { - TarantoolClientOps, Object, List> syncOps(); + TarantoolClientOps, Object, List> syncOps(); - TarantoolClientOps, Object, Future>> asyncOps(); + TarantoolClientOps, Object, Future>> asyncOps(); - TarantoolClientOps, Object, CompletionStage>> composableAsyncOps(); + TarantoolClientOps, Object, CompletionStage>> composableAsyncOps(); - TarantoolClientOps, Object, Long> fireAndForgetOps(); + TarantoolClientOps, Object, Long> fireAndForgetOps(); TarantoolSQLOps>> sqlSyncOps(); diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index eed09a45..31f929a0 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -1,9 +1,11 @@ package org.tarantool; +import org.tarantool.protocol.ProtoConstants; import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.ReadableViaSelectorChannel; import org.tarantool.protocol.TarantoolGreeting; import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.schema.TarantoolSchemaMeta; import java.io.IOException; import java.nio.ByteBuffer; @@ -11,12 +13,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -25,6 +31,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.StampedLock; +import java.util.stream.Stream; public class TarantoolClientImpl extends TarantoolBase> implements TarantoolClient { @@ -43,6 +51,11 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected volatile Exception thumbstone; + protected ScheduledExecutorService workExecutor; + + protected StampedLock schemaLock = new StampedLock(); + protected BlockingQueue> delayedOpsQueue; + protected Map> futures; protected AtomicInteger pendingResponsesCount = new AtomicInteger(); @@ -63,6 +76,7 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected SyncOps syncOps; protected FireAndForgetOps fireAndForgetOps; protected ComposableAsyncOps composableAsyncOps; + protected UnsafeSchemaOps unsafeSchemaOps; /** * Inner. @@ -72,6 +86,9 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected Thread reader; protected Thread writer; + protected volatile long currentSchemaId = 1L; + protected TarantoolSchemaMeta schemaMeta = new TarantoolSchemaMeta(this); + protected Thread connector = new Thread(new Runnable() { @Override public void run() { @@ -108,6 +125,9 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon this.socketProvider = socketProvider; this.stats = new TarantoolClientStats(); this.futures = new ConcurrentHashMap<>(config.predictedFutures); + this.delayedOpsQueue = new PriorityBlockingQueue<>(128); + this.workExecutor = + Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantool-worker")); this.sharedBuffer = ByteBuffer.allocateDirect(config.sharedBufferSize); this.writerBuffer = ByteBuffer.allocateDirect(sharedBuffer.capacity()); this.connector.setDaemon(true); @@ -115,6 +135,7 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon this.syncOps = new SyncOps(); this.composableAsyncOps = new ComposableAsyncOps(); this.fireAndForgetOps = new FireAndForgetOps(); + this.unsafeSchemaOps = new UnsafeSchemaOps(); if (!config.useNewCall) { setCallCode(Code.OLD_CALL); this.syncOps.setCallCode(Code.OLD_CALL); @@ -136,6 +157,7 @@ private void startConnector(long initTimeoutMillis) { close(e); throw e; } + updateSchema(currentSchemaId); } catch (InterruptedException e) { close(e); throw new IllegalStateException(e); @@ -175,6 +197,7 @@ protected void connect(final SocketChannel channel) throws Exception { try { TarantoolGreeting greeting = ProtoUtils.connect(channel, config.username, config.password, msgPackLite); this.serverVersion = greeting.getServerVersion(); + currentSchemaId = ProtoUtils.getSchemaId(channel, msgPackLite); } catch (IOException e) { closeChannel(channel); throw new CommunicationException("Couldn't connect to tarantool", e); @@ -203,7 +226,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { readThread(); } finally { - state.release(StateHelper.READING); + state.release(StateHelper.READING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -217,7 +240,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { writeThread(); } finally { - state.release(StateHelper.WRITING); + state.release(StateHelper.WRITING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -250,7 +273,8 @@ protected void configureThreads(String threadName) { * * @see #setOperationTimeout(long) */ - protected Future exec(Code code, Object... args) { + @Override + protected Future exec(Code code, ResolvableArgument... args) { return doExec(operationTimeout, code, args); } @@ -265,33 +289,63 @@ protected Future exec(Code code, Object... args) { * @return deferred result */ protected Future exec(long timeoutMillis, Code code, Object... args) { - return doExec(timeoutMillis, code, args); + return doExec(timeoutMillis, code, ResolvableArgument.ofAllResolved(args)); } - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); + @Override + protected int resolveSpace(String space) { + return schemaMeta.getSpace(space).getId(); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + return schemaMeta.getSpaceIndex(space, index).getId(); + } + protected TarantoolOp doExec(long timeoutMillis, Code code, ResolvableArgument[] args) { + long sid = syncId.incrementAndGet(); TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); + // space or index name weren't resolved + // it's possible the client keeps the outdated schema + // refresh the schema and delay operation execution + if (future.hasUnresolvedArguments()) { + delayedOpsQueue.add(future); + updateSchema(currentSchemaId); + return future; + } + long stamp = schemaLock.readLock(); + try { + // postpone operation if the schema is being reloaded + if (state.isStateSet(StateHelper.SCHEMA_UPDATING)) { + delayedOpsQueue.add(future); + return future; + } + return registerOperation(future, currentSchemaId); + } finally { + schemaLock.unlockRead(stamp); + } + } + + protected TarantoolOp registerOperation(TarantoolOp future, long schemaId) { if (isDead(future)) { return future; } - futures.put(sid, future); + futures.put(future.getId(), future); if (isDead(future)) { - futures.remove(sid); + futures.remove(future.getId()); return future; } try { - write(code, sid, null, args); + write(future.getCode(), future.getId(), schemaId, future.getResolvedArgs()); } catch (Exception e) { - futures.remove(sid); + futures.remove(future.getId()); fail(future, e); } return future; } - protected TarantoolOp makeNewOperation(long timeoutMillis, long sid, Code code, Object[] args) { + protected TarantoolOp makeNewOperation(long timeoutMillis, long sid, Code code, ResolvableArgument[] args) { return new TarantoolOp<>(sid, code, args) .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); } @@ -313,6 +367,12 @@ protected synchronized void die(String message, Exception cause) { iterator.remove(); } } + + TarantoolOp op; + while ((op = delayedOpsQueue.poll()) != null) { + fail(op, error); + } + pendingResponsesCount.set(0); bufferLock.lock(); @@ -466,17 +526,66 @@ protected void fail(TarantoolOp future, Exception e) { } protected void complete(TarantoolPacket packet, TarantoolOp future) { - if (future != null) { - long code = packet.getCode(); - if (code == 0) { - if (future.getCode() == Code.EXECUTE) { - completeSql(future, packet); - } else { - ((TarantoolOp) future).complete(packet.getBody().get(Key.DATA.getId())); - } + if (future == null || future.isDone()) { + return; + } + + long code = packet.getCode(); + long schemaId = packet.getSchemaId(); + + if (code == ProtoConstants.SUCCESS) { + if (future.getCode() == Code.EXECUTE) { + completeSql(future, packet); } else { - Object error = packet.getBody().get(Key.ERROR.getId()); - fail(future, serverError(code, error)); + ((TarantoolOp) future).complete(packet.getData()); + } + } else if (code == ProtoConstants.ERR_WRONG_SCHEMA_VERSION) { + if (future.hasResolvableArguments()) { + delayedOpsQueue.add(future); + } else { + registerOperation(future, schemaId); + } + } else { + Object error = packet.getError(); + fail(future, serverError(code, error)); + } + // it's possible to receive bigger version than current + // i.e. after DML operation or wrong schema version response + if (schemaId > currentSchemaId) { + updateSchema(schemaId); + } + } + + private void updateSchema(long schemaId) { + long stamp = schemaLock.writeLock(); + try { + if (state.acquire(StateHelper.SCHEMA_UPDATING)) { + workExecutor.execute(createUpdateSchemaTask(schemaId)); + } + } finally { + schemaLock.unlockWrite(stamp); + } + } + + private Runnable createUpdateSchemaTask(long schemaId) { + return () -> { + schemaMeta.refresh(); + long stamp = schemaLock.writeLock(); + try { + currentSchemaId = schemaId; + rescheduleDelayedOperations(); + state.release(StateHelper.SCHEMA_UPDATING); + } finally { + schemaLock.unlock(stamp); + } + }; + } + + private void rescheduleDelayedOperations() { + while (delayedOpsQueue.peek() != null) { + TarantoolOp op = delayedOpsQueue.poll(); + if (!op.isDone()) { + registerOperation(op, currentSchemaId); } } } @@ -523,6 +632,9 @@ public void close() { protected void close(Exception e) { if (state.close()) { + if (workExecutor != null) { + workExecutor.shutdownNow(); + } connector.interrupt(); die(e.getMessage(), e); } @@ -565,12 +677,12 @@ public void setOperationTimeout(long operationTimeout) { @Override public boolean isAlive() { - return state.getState() == StateHelper.ALIVE && thumbstone == null; + return state.isStateSet(StateHelper.ALIVE) && thumbstone == null; } @Override public boolean isClosed() { - return state.getState() == StateHelper.CLOSED; + return state.isStateSet(StateHelper.CLOSED); } @Override @@ -584,25 +696,29 @@ public boolean waitAlive(long timeout, TimeUnit unit) throws InterruptedExceptio } @Override - public TarantoolClientOps, Object, List> syncOps() { + public TarantoolClientOps, Object, List> syncOps() { return syncOps; } @Override - public TarantoolClientOps, Object, Future>> asyncOps() { + public TarantoolClientOps, Object, Future>> asyncOps() { return (TarantoolClientOps) this; } @Override - public TarantoolClientOps, Object, CompletionStage>> composableAsyncOps() { + public TarantoolClientOps, Object, CompletionStage>> composableAsyncOps() { return composableAsyncOps; } @Override - public TarantoolClientOps, Object, Long> fireAndForgetOps() { + public TarantoolClientOps, Object, Long> fireAndForgetOps() { return fireAndForgetOps; } + public TarantoolClientOps, Object, List> unsafeSchemaOps() { + return unsafeSchemaOps; + } + @Override public TarantoolSQLOps>> sqlSyncOps() { return new TarantoolSQLOps>>() { @@ -633,29 +749,23 @@ public Future>> query(String sql, Object... bind) { }; } - protected class SyncOps extends AbstractTarantoolOps, Object, List> { + protected class SyncOps extends ClientDelegatingOps> { @Override - public List exec(Code code, Object... args) { + public List exec(Code code, ResolvableArgument... args) { return (List) syncGet(TarantoolClientImpl.this.exec(code, args)); } - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); - } - } - protected class FireAndForgetOps extends AbstractTarantoolOps, Object, Long> { + protected class FireAndForgetOps extends ClientDelegatingOps { @Override - public Long exec(Code code, Object... args) { + public Long exec(Code code, ResolvableArgument... args) { if (thumbstone == null) { try { - long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); - write(code, syncId, null, args); - return syncId; + TarantoolOp operation = doExec(operationTimeout, code, args); + return operation.getId(); } catch (Exception e) { throw new CommunicationException("Execute failed", e); } @@ -664,11 +774,6 @@ public Long exec(Code code, Object... args) { } } - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); - } - } protected boolean isDead(TarantoolOp future) { @@ -679,7 +784,7 @@ protected boolean isDead(TarantoolOp future) { return false; } - protected static class TarantoolOp extends CompletableFuture { + protected static class TarantoolOp extends CompletableFuture implements Comparable { /** * A task identifier used in {@link TarantoolClientImpl#futures}. @@ -694,9 +799,9 @@ protected static class TarantoolOp extends CompletableFuture { /** * Arguments of operation. */ - private final Object[] args; + private final ResolvableArgument[] args; - public TarantoolOp(long id, Code code, Object[] args) { + public TarantoolOp(long id, Code code, ResolvableArgument[] args) { this.id = id; this.code = code; this.args = args; @@ -710,10 +815,22 @@ public Code getCode() { return code; } - public Object[] getArgs() { + public ResolvableArgument[] getArgs() { return args; } + public Object[] getResolvedArgs() { + return Stream.of(args).map(ResolvableArgument::getValue).toArray(); + } + + public boolean hasUnresolvedArguments() { + return Stream.of(args).anyMatch(ResolvableArgument::isUnresolved); + } + + public boolean hasResolvableArguments() { + return Stream.of(args).anyMatch(ResolvableArgument::isResolvable); + } + /** * Missed in jdk8 CompletableFuture operator to limit execution * by time. @@ -746,6 +863,11 @@ public TarantoolOp orTimeout(long timeout, TimeUnit unit) { return this; } + @Override + public int compareTo(TarantoolOp otherOp) { + return Long.compareUnsigned(this.id, otherOp.id); + } + /** * Runs timeout operation as a delayed task. */ @@ -786,9 +908,12 @@ protected final class StateHelper { static final int UNINITIALIZED = 0; static final int READING = 1; static final int WRITING = 2; + static final int ALIVE = READING | WRITING; - static final int RECONNECT = 4; - static final int CLOSED = 8; + static final int SCHEMA_UPDATING = 1 << 2; + + static final int RECONNECT = 1 << 3; + static final int CLOSED = 1 << 4; private final AtomicInteger state; @@ -817,6 +942,10 @@ protected int getState() { return state.get(); } + boolean isStateSet(int mask) { + return (getState() & mask) == mask; + } + /** * Set CLOSED state, drop RECONNECT state. */ @@ -825,12 +954,12 @@ protected boolean close() { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if (isStateSet(CLOSED)) { return false; } - /* Drop RECONNECT, set CLOSED. */ - if (compareAndSet(currentState, (currentState & ~RECONNECT) | CLOSED)) { + /* Clear all states and set CLOSED. */ + if (compareAndSet(currentState, CLOSED)) { return true; } } @@ -846,7 +975,7 @@ protected boolean acquire(int mask) { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if ((isStateSet(CLOSED))) { return false; } @@ -856,8 +985,8 @@ protected boolean acquire(int mask) { } /* Cannot move from a state to the same state. */ - if ((currentState & mask) != 0) { - throw new IllegalStateException("State is already " + mask); + if (isStateSet(mask)) { + return false; } /* Set acquired state. */ @@ -881,7 +1010,8 @@ protected boolean compareAndSet(int expect, int update) { return false; } - if (update == ALIVE) { + boolean wasAlreadyAlive = (expect & ALIVE) == ALIVE; + if (!wasAlreadyAlive && update == ALIVE) { CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1)); latch.countDown(); onReconnect(); @@ -916,13 +1046,13 @@ private CountDownLatch getStateLatch(int state) { return closedLatch; } if (state == ALIVE) { - if (getState() == CLOSED) { + if (isStateSet(CLOSED)) { throw new IllegalStateException("State is CLOSED."); } CountDownLatch latch = nextAliveLatch.get(); /* It may happen so that an error is detected but the state is still alive. Wait for the 'next' alive state in such cases. */ - return (getState() == ALIVE && thumbstone == null) ? null : latch; + return (isStateSet(ALIVE) && thumbstone == null) ? null : latch; } return null; } @@ -935,7 +1065,7 @@ private CountDownLatch getStateLatch(int state) { private void awaitReconnection() throws InterruptedException { connectorLock.lock(); try { - while (getState() != StateHelper.RECONNECT) { + while (!isStateSet(RECONNECT)) { reconnectRequired.await(); } } finally { @@ -961,11 +1091,10 @@ private void trySignalForReconnection() { } - protected class ComposableAsyncOps - extends AbstractTarantoolOps, Object, CompletionStage>> { + protected class ComposableAsyncOps extends ClientDelegatingOps>> { @Override - public CompletionStage> exec(Code code, Object... args) { + public CompletionStage> exec(Code code, ResolvableArgument... args) { return (CompletionStage>) TarantoolClientImpl.this.exec(code, args); } @@ -976,4 +1105,37 @@ public void close() { } + /** + * Used by internal services to ignore schema ID issues. + */ + protected class UnsafeSchemaOps extends ClientDelegatingOps> { + + @Override + protected List exec(Code code, ResolvableArgument... args) { + long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); + TarantoolOp op = makeNewOperation(operationTimeout, syncId, code, args); + return (List) syncGet(registerOperation(op, 0L)); + } + + } + + protected abstract class ClientDelegatingOps extends AbstractTarantoolOps, Object, R> { + + @Override + protected int resolveSpace(String space) { + return TarantoolClientImpl.this.resolveSpace(space); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + return TarantoolClientImpl.this.resolveSpaceIndex(space, index); + } + + @Override + public void close() { + throw new IllegalStateException("You should close TarantoolClient instead."); + } + + } + } diff --git a/src/main/java/org/tarantool/TarantoolClientOps.java b/src/main/java/org/tarantool/TarantoolClientOps.java index 69ab3a9e..1d7878e1 100644 --- a/src/main/java/org/tarantool/TarantoolClientOps.java +++ b/src/main/java/org/tarantool/TarantoolClientOps.java @@ -1,20 +1,34 @@ package org.tarantool; -public interface TarantoolClientOps { - R select(T space, T index, O key, int offset, int limit, int iterator); +public interface TarantoolClientOps { + R select(Integer space, Integer index, O key, int offset, int limit, int iterator); - R select(T space, T index, O key, int offset, int limit, Iterator iterator); + R select(String space, String index, O key, int offset, int limit, int iterator); - R insert(T space, O tuple); + R select(Integer space, Integer index, O key, int offset, int limit, Iterator iterator); - R replace(T space, O tuple); + R select(String space, String index, O key, int offset, int limit, Iterator iterator); - R update(T space, O key, P... tuple); + R insert(Integer space, O tuple); - R upsert(T space, O key, O defTuple, P... ops); + R insert(String space, O tuple); - R delete(T space, O key); + R replace(Integer space, O tuple); + + R replace(String space, O tuple); + + R update(Integer space, O key, P... tuple); + + R update(String space, O key, P... tuple); + + R upsert(Integer space, O key, O defTuple, P... ops); + + R upsert(String space, O key, O defTuple, P... ops); + + R delete(Integer space, O key); + + R delete(String space, O key); R call(String function, Object... args); diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index b0c4711b..f7dabbdf 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -14,7 +14,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; @@ -35,7 +34,6 @@ public class TarantoolClusterClient extends TarantoolClientImpl { /** * Discovery activity. */ - private ScheduledExecutorService instancesDiscoveryExecutor; private Runnable instancesDiscovererTask; private StampedLock discoveryLock = new StampedLock(); @@ -70,14 +68,12 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannel if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) { this.instancesDiscovererTask = createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this)); - this.instancesDiscoveryExecutor - = Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer")); int delay = config.clusterDiscoveryDelayMillis > 0 ? config.clusterDiscoveryDelayMillis : TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS; // todo: it's better to start a job later (out of ctor) - this.instancesDiscoveryExecutor.scheduleWithFixedDelay( + this.workExecutor.scheduleWithFixedDelay( this.instancesDiscovererTask, 0, delay, @@ -99,14 +95,6 @@ protected boolean isDead(TarantoolOp future) { return false; } - @Override - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); - TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); - return registerOperation(future); - } - /** * Registers a new async operation which will be resolved later. * Registration is discovery-aware in term of synchronization and @@ -116,26 +104,11 @@ protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { * * @return registered operation */ - private TarantoolOp registerOperation(TarantoolOp future) { + @Override + protected TarantoolOp registerOperation(TarantoolOp future, long schemaId) { long stamp = discoveryLock.readLock(); try { - if (isDead(future)) { - return future; - } - futures.put(future.getId(), future); - if (isDead(future)) { - futures.remove(future.getId()); - return future; - } - - try { - write(future.getCode(), future.getId(), null, future.getArgs()); - } catch (Exception e) { - futures.remove(future.getId()); - fail(future, e); - } - - return future; + return super.registerOperation(future, schemaId); } finally { discoveryLock.unlock(stamp); } @@ -161,10 +134,6 @@ protected boolean checkFail(TarantoolOp future, Exception e) { protected void close(Exception e) { super.close(e); - if (instancesDiscoveryExecutor != null) { - instancesDiscoveryExecutor.shutdownNow(); - } - if (retries == null) { // May happen within constructor. return; @@ -198,7 +167,7 @@ protected void onReconnect() { retries.clear(); for (final TarantoolOp future : futuresToRetry) { if (!future.isDone()) { - executor.execute(() -> registerOperation(future)); + executor.execute(() -> registerOperation(future, currentSchemaId)); } } } @@ -278,7 +247,6 @@ public synchronized void run() { onInstancesRefreshed(lastInstances); } } catch (Exception ignored) { - ignored.getCause(); // no-op } } diff --git a/src/main/java/org/tarantool/TarantoolConnection.java b/src/main/java/org/tarantool/TarantoolConnection.java index cf9c553f..3d658153 100644 --- a/src/main/java/org/tarantool/TarantoolConnection.java +++ b/src/main/java/org/tarantool/TarantoolConnection.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.stream.Stream; public class TarantoolConnection extends TarantoolBase> implements TarantoolSQLOps>> { @@ -27,11 +28,22 @@ public TarantoolConnection(String username, String password, Socket socket) thro } @Override - protected List exec(Code code, Object... args) { - TarantoolPacket responsePacket = writeAndRead(code, args); + protected List exec(Code code, ResolvableArgument... args) { + Object[] resolvedArgs = Stream.of(args).map(ResolvableArgument::getValue).toArray(); + TarantoolPacket responsePacket = writeAndRead(code, resolvedArgs); return (List) responsePacket.getBody().get(Key.DATA.getId()); } + @Override + protected int resolveSpace(String space) { + throw new UnsupportedOperationException(); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + throw new UnsupportedOperationException(); + } + protected TarantoolPacket writeAndRead(Code code, Object... args) { try { ByteBuffer packet = ProtoUtils.createPacket(initialRequestSize, msgPackLite, @@ -44,7 +56,7 @@ protected TarantoolPacket writeAndRead(Code code, Object... args) { Long c = responsePacket.getCode(); if (c != 0) { - throw serverError(c, responsePacket.getBody().get(Key.ERROR.getId())); + throw serverError(c, responsePacket.getError()); } return responsePacket; diff --git a/src/main/java/org/tarantool/TarantoolException.java b/src/main/java/org/tarantool/TarantoolException.java index aed93b14..cbc377fb 100644 --- a/src/main/java/org/tarantool/TarantoolException.java +++ b/src/main/java/org/tarantool/TarantoolException.java @@ -1,5 +1,11 @@ package org.tarantool; +import static org.tarantool.protocol.ProtoConstants.ERR_LOADING; +import static org.tarantool.protocol.ProtoConstants.ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_READONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_TIMEOUT; +import static org.tarantool.protocol.ProtoConstants.ERR_WRONG_SCHEMA_VERSION; + /** * A remote server error with error code and message. * @@ -7,11 +13,6 @@ * @version $Id: $ */ public class TarantoolException extends RuntimeException { - /* taken from src/box/errcode.h */ - public static final int ERR_READONLY = 7; - public static final int ERR_TIMEOUT = 78; - public static final int ERR_LOADING = 116; - public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; private static final long serialVersionUID = 1L; long code; @@ -60,6 +61,7 @@ boolean isTransient() { switch ((int) code) { case ERR_READONLY: case ERR_TIMEOUT: + case ERR_WRONG_SCHEMA_VERSION: case ERR_LOADING: case ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY: return true; diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java index da92cd3a..0d8406e9 100644 --- a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java +++ b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java @@ -1,6 +1,6 @@ package org.tarantool.cluster; -import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClientOps; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.util.StringUtils; @@ -19,17 +19,18 @@ */ public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer { - private TarantoolClient client; + private TarantoolClientImpl client; private String entryFunction; - public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) { + public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, + TarantoolClientImpl client) { this.client = client; this.entryFunction = clientConfig.clusterDiscoveryEntryFunction; } @Override public Set getInstances() { - TarantoolClientOps, Object, List> syncOperations = client.syncOps(); + TarantoolClientOps, Object, List> syncOperations = client.unsafeSchemaOps(); List list = syncOperations.call(entryFunction); // discoverer expects a single array result from the function now; diff --git a/src/main/java/org/tarantool/protocol/ProtoConstants.java b/src/main/java/org/tarantool/protocol/ProtoConstants.java new file mode 100644 index 00000000..daee4dd7 --- /dev/null +++ b/src/main/java/org/tarantool/protocol/ProtoConstants.java @@ -0,0 +1,19 @@ +package org.tarantool.protocol; + +public class ProtoConstants { + + private ProtoConstants() { + } + + public static final long ERROR_TYPE_MARKER = 0x8000; + + public static final long SUCCESS = 0x0; + + /* taken from src/box/errcode.h */ + public static final int ERR_READONLY = 7; + public static final int ERR_TIMEOUT = 78; + public static final int ERR_WRONG_SCHEMA_VERSION = 109; + public static final int ERR_LOADING = 116; + public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; + +} diff --git a/src/main/java/org/tarantool/protocol/ProtoUtils.java b/src/main/java/org/tarantool/protocol/ProtoUtils.java index f5654c5a..12cf1831 100644 --- a/src/main/java/org/tarantool/protocol/ProtoUtils.java +++ b/src/main/java/org/tarantool/protocol/ProtoUtils.java @@ -195,6 +195,14 @@ public static TarantoolGreeting connect(SocketChannel channel, return new TarantoolGreeting(serverVersion); } + public static long getSchemaId(SocketChannel channel, MsgPackLite msgPackLite) throws IOException { + ByteBuffer pingPacket = createPacket(msgPackLite, Code.PING, 0L, 0L); + writeFully(channel, pingPacket); + + TarantoolPacket pongPacket = readPacket(channel, msgPackLite); + return pongPacket.getSchemaId(); + } + private static void assertCorrectWelcome(String firstLine, SocketAddress remoteAddress) { if (!firstLine.startsWith(WELCOME)) { String errMsg = "Failed to connect to node " + remoteAddress.toString() + @@ -208,7 +216,7 @@ private static void assertCorrectWelcome(String firstLine, SocketAddress remoteA private static void assertNoErrCode(TarantoolPacket authResponse) { Long code = (Long) authResponse.getHeaders().get(Key.CODE.getId()); if (code != 0) { - Object error = authResponse.getBody().get(Key.ERROR.getId()); + Object error = authResponse.getError(); String errorMsg = error instanceof String ? (String) error : new String((byte[]) error); throw new TarantoolException(code, errorMsg); } @@ -300,7 +308,22 @@ public static ByteBuffer createPacket(int initialRequestSize, return buffer; } + /** + * Extracts an error code. + * + * @param code in 0x8XXX format + * + * @return actual error code (which is a XXX part) + */ + public static long extractErrorCode(long code) { + if ((code & ProtoConstants.ERROR_TYPE_MARKER) == 0) { + throw new IllegalArgumentException(String.format("Code %h does not follow 0x8XXX format", code)); + } + return (~ProtoConstants.ERROR_TYPE_MARKER & code); + } + private static class ByteArrayOutputStream extends java.io.ByteArrayOutputStream { + public ByteArrayOutputStream(int size) { super(size); } @@ -308,6 +331,7 @@ public ByteArrayOutputStream(int size) { ByteBuffer toByteBuffer() { return ByteBuffer.wrap(buf, 0, count); } + } } diff --git a/src/main/java/org/tarantool/protocol/TarantoolPacket.java b/src/main/java/org/tarantool/protocol/TarantoolPacket.java index ca131177..ca661ed9 100644 --- a/src/main/java/org/tarantool/protocol/TarantoolPacket.java +++ b/src/main/java/org/tarantool/protocol/TarantoolPacket.java @@ -29,8 +29,9 @@ public Long getCode() { potenticalCode != null ? potenticalCode.getClass().toString() : "null" ); } + Long code = (Long) potenticalCode; - return (Long) potenticalCode; + return code == 0 ? code : ProtoUtils.extractErrorCode(code); } public Long getSync() { @@ -48,4 +49,16 @@ public Map getBody() { public boolean hasBody() { return body != null && body.size() > 0; } + + public long getSchemaId() { + return (Long) headers.get(Key.SCHEMA_ID.getId()); + } + + public Object getData() { + return hasBody() ? body.get(Key.DATA.getId()) : null; + } + + public Object getError() { + return hasBody() ? body.get(Key.ERROR.getId()) : null; + } } diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java new file mode 100644 index 00000000..41c1cc6e --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java @@ -0,0 +1,147 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Keeps a space index metadata. + */ +public class TarantoolIndexMeta { + + private final int id; + private final String name; + private final String type; + private final IndexOptions options; + private final List parts; + + public TarantoolIndexMeta(int id, + String name, + String type, + IndexOptions options, + List parts) { + this.id = id; + this.name = name; + this.type = type; + this.options = options; + this.parts = parts; + } + + public static TarantoolIndexMeta fromTuple(List tuple) { + Map optionsMap = (Map) tuple.get(4); + + List parts = Collections.emptyList(); + List partsTuple = (List) tuple.get(5); + if (!partsTuple.isEmpty()) { + if (partsTuple.get(0) instanceof List) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart((Integer) part.get(0), (String) part.get(1))) + .collect(Collectors.toList()); + } else if (partsTuple.get(0) instanceof Map) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart((Integer) part.get("field"), (String) part.get("type"))) + .collect(Collectors.toList()); + } + } + + return new TarantoolIndexMeta( + (Integer) tuple.get(1), + (String) tuple.get(2), + (String) tuple.get(3), + new IndexOptions((Boolean) optionsMap.get("unique")), + parts + ); + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public IndexOptions getOptions() { + return options; + } + + public List getParts() { + return parts; + } + + public static class IndexOptions { + private final boolean unique; + + public IndexOptions(boolean unique) { + this.unique = unique; + } + + public boolean isUnique() { + return unique; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IndexOptions that = (IndexOptions) o; + return unique == that.unique; + } + + @Override + public int hashCode() { + return Objects.hash(unique); + } + + } + + public static class IndexPart { + private final int fieldNumber; + private final String type; + + public IndexPart(int fieldNumber, String type) { + this.fieldNumber = fieldNumber; + this.type = type; + } + + public int getFieldNumber() { + return fieldNumber; + } + + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IndexPart indexPart = (IndexPart) o; + return fieldNumber == indexPart.fieldNumber && + Objects.equals(type, indexPart.type); + } + + @Override + public int hashCode() { + return Objects.hash(fieldNumber, type); + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java new file mode 100644 index 00000000..49406fae --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java @@ -0,0 +1,16 @@ +package org.tarantool.schema; + +public class TarantoolIndexNotFoundException extends TarantoolSchemaException { + + private final String indexName; + + public TarantoolIndexNotFoundException(String targetSpace, String indexName) { + super(targetSpace); + this.indexName = indexName; + } + + public String getIndexName() { + return indexName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaException.java b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java new file mode 100644 index 00000000..877f6c6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java @@ -0,0 +1,15 @@ +package org.tarantool.schema; + +public class TarantoolSchemaException extends RuntimeException { + + private final String schemaName; + + public TarantoolSchemaException(String schemaName) { + this.schemaName = schemaName; + } + + public String getSchemaName() { + return schemaName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java new file mode 100644 index 00000000..4dc98cd6 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java @@ -0,0 +1,86 @@ +package org.tarantool.schema; + +import org.tarantool.Iterator; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClientOps; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Schema storage. + *

+ * Keeps spaces and theirs attributes and indexes. + */ +public class TarantoolSchemaMeta { + + private static final int VSPACE_ID = 281; + private static final int VSPACE_ID_INDEX_ID = 0; + + private static final int VINDEX_ID = 289; + private static final int VINDEX_ID_INDEX_ID = 0; + + private static final int MAX_TUPLES = 65535; + + private TarantoolClientImpl client; + + private volatile Map cachedSpaces = new ConcurrentHashMap<>(); + + public TarantoolSchemaMeta(TarantoolClientImpl client) { + this.client = client; + } + + public TarantoolSpaceMeta getSpace(String spaceName) { + TarantoolSpaceMeta space = cachedSpaces.get(spaceName); + if (space == null) { + throw new TarantoolSpaceNotFoundException(spaceName); + } + return space; + } + + public TarantoolIndexMeta getSpaceIndex(String spaceName, String indexName) { + TarantoolIndexMeta index = getSpace(spaceName).getIndex(indexName); + if (index == null) { + throw new TarantoolIndexNotFoundException(spaceName, indexName); + } + return index; + } + + public synchronized void refresh() { + cachedSpaces = fetchSpaces() + .stream().collect( + Collectors.toConcurrentMap( + TarantoolSpaceMeta::getName, + Function.identity(), + (oldValue, newValue) -> newValue, + ConcurrentHashMap::new + ) + ); + } + + private List fetchSpaces() { + TarantoolClientOps, Object, List> clientOps = client.unsafeSchemaOps(); + + List spaces = clientOps + .select(VSPACE_ID, VSPACE_ID_INDEX_ID, Collections.emptyList(), 0, MAX_TUPLES, Iterator.EQ); + + Map>> indexesBySpace = clientOps + .select(VINDEX_ID, VINDEX_ID_INDEX_ID, Collections.emptyList(), 0, MAX_TUPLES, Iterator.EQ) + .stream() + .map(tuple -> (List) tuple) + .collect(Collectors.groupingBy(tuple -> (Integer) tuple.get(0))); + + return spaces.stream() + .map(tuple -> (List) tuple) + .map(tuple -> TarantoolSpaceMeta.fromTuple( + tuple, + indexesBySpace.getOrDefault((Integer) tuple.get(0), Collections.emptyList())) + ) + .collect(Collectors.toList()); + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java new file mode 100644 index 00000000..cb312c14 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java @@ -0,0 +1,94 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Keeps a space metadata. + */ +public class TarantoolSpaceMeta { + + private final int id; + private final String name; + private final String engine; + private final List format; + private final Map indexes; + + public static TarantoolSpaceMeta fromTuple(List spaceTuple, List> indexTuples) { + List fields = ((List>) spaceTuple.get(6)).stream() + .map(field -> new SpaceField(field.get("name").toString(), field.get("type").toString())) + .collect(Collectors.toList()); + + Map indexesMap = indexTuples.stream() + .map(TarantoolIndexMeta::fromTuple) + .collect(Collectors.toMap(TarantoolIndexMeta::getName, Function.identity())); + + return new TarantoolSpaceMeta( + (Integer) spaceTuple.get(0), + spaceTuple.get(2).toString(), + spaceTuple.get(3).toString(), + Collections.unmodifiableList(fields), + Collections.unmodifiableMap(indexesMap) + ); + } + + public TarantoolSpaceMeta(int id, + String name, + String engine, + List format, + Map indexes) { + this.id = id; + this.name = name; + this.engine = engine; + this.format = format; + this.indexes = indexes; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getEngine() { + return engine; + } + + public List getFormat() { + return format; + } + + public Map getIndexes() { + return indexes; + } + + public TarantoolIndexMeta getIndex(String indexName) { + return indexes.get(indexName); + } + + public static class SpaceField { + + private final String name; + private final String type; + + public SpaceField(String name, String type) { + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java new file mode 100644 index 00000000..28498e6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java @@ -0,0 +1,9 @@ +package org.tarantool.schema; + +public class TarantoolSpaceNotFoundException extends TarantoolSchemaException { + + public TarantoolSpaceNotFoundException(String spaceName) { + super(spaceName); + } + +} diff --git a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java index 087a4760..6aa2b10e 100644 --- a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java +++ b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java @@ -27,7 +27,7 @@ /** * Class with test cases for asynchronous operations - * + *

* NOTE: Parametrized tests can be simplified after * https://github.com/junit-team/junit5/issues/878 */ @@ -120,7 +120,7 @@ void testAsyncError(AsyncOpsProvider provider) { @MethodSource("getAsyncOps") void testOperations(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { - TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); + TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); List>> futures = new ArrayList<>(); @@ -145,7 +145,7 @@ void testOperations(AsyncOpsProvider provider) // Check the effects. checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); checkRawTupleResult(consoleSelect(20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(30), Collections.emptyList()); + assertEquals(Collections.emptyList(), consoleSelect(30)); provider.close(); } @@ -185,16 +185,201 @@ void testCall(AsyncOpsProvider provider) throws ExecutionException, InterruptedE provider.close(); } + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringSelect(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringInsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + Future> resultOne = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringReplace(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringDelete(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + testHelper.executeLua("box.space.basic_test:insert{20, '20'}"); + + Future> resultOne = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(1)); + + Future> resultTen = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(20)); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals(Collections.emptyList(), consoleSelect(1)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "10")); + assertEquals(Collections.emptyList(), consoleSelect(20)); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpdate(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(2), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(10), Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + assertEquals(Collections.emptyList(), consoleSelect(2)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(1), Arrays.asList(1, "001"), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(2), Arrays.asList(2, "002"), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(10), Arrays.asList(10, "010"), + Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(2), Arrays.asList(2, "002")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringMultipleInderectChanges(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{2, 'two'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(2), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(2, "two")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{3, 'three'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(3), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(3, "three")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + private List consoleSelect(Object key) { return testHelper.evaluate(TestUtils.toLuaSelect("basic_test", key)); } private interface AsyncOpsProvider { - TarantoolClientOps, Object, Future>> getAsyncOps(); + + TarantoolClientOps, Object, Future>> getAsyncOps(); TarantoolClient getClient(); void close(); + } private static class ClientAsyncOpsProvider implements AsyncOpsProvider { @@ -202,7 +387,7 @@ private static class ClientAsyncOpsProvider implements AsyncOpsProvider { TarantoolClient client = TestUtils.makeTestClient(TestUtils.makeDefaultClientConfig(), 2000); @Override - public TarantoolClientOps, Object, Future>> getAsyncOps() { + public TarantoolClientOps, Object, Future>> getAsyncOps() { return client.asyncOps(); } @@ -221,11 +406,11 @@ public void close() { private static class ComposableAsyncOpsProvider implements AsyncOpsProvider { TarantoolClient client = TestUtils.makeTestClient(TestUtils.makeDefaultClientConfig(), 2000); - TarantoolClientOps, Object, Future>> composableOps = + TarantoolClientOps, Object, Future>> composableOps = new Composable2FutureClientOpsAdapter(client.composableAsyncOps()); @Override - public TarantoolClientOps, Object, Future>> getAsyncOps() { + public TarantoolClientOps, Object, Future>> getAsyncOps() { return composableOps; } @@ -236,18 +421,18 @@ public TarantoolClient getClient() { @Override public void close() { - composableOps.close(); + client.close(); } } private static class Composable2FutureClientOpsAdapter - implements TarantoolClientOps, Object, Future>> { + implements TarantoolClientOps, Object, Future>> { - private final TarantoolClientOps, Object, CompletionStage>> originOps; + private final TarantoolClientOps, Object, CompletionStage>> originOps; private Composable2FutureClientOpsAdapter( - TarantoolClientOps, Object, CompletionStage>> originOps) { + TarantoolClientOps, Object, CompletionStage>> originOps) { this.originOps = originOps; } @@ -257,6 +442,11 @@ public Future> select(Integer space, Integer index, List key, int off return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, String index, List key, int offset, int limit, int iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> select(Integer space, Integer index, @@ -267,31 +457,66 @@ public Future> select(Integer space, return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, + String index, + List key, + int offset, + int limit, + Iterator iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> insert(Integer space, List tuple) { return originOps.insert(space, tuple).toCompletableFuture(); } + @Override + public Future> insert(String space, List tuple) { + return originOps.insert(space, tuple).toCompletableFuture(); + } + @Override public Future> replace(Integer space, List tuple) { return originOps.replace(space, tuple).toCompletableFuture(); } + @Override + public Future> replace(String space, List tuple) { + return originOps.replace(space, tuple).toCompletableFuture(); + } + @Override public Future> update(Integer space, List key, Object... tuple) { return originOps.update(space, key, tuple).toCompletableFuture(); } + @Override + public Future> update(String space, List key, Object... tuple) { + return originOps.update(space, key, tuple).toCompletableFuture(); + } + @Override public Future> upsert(Integer space, List key, List defTuple, Object... ops) { return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); } + @Override + public Future> upsert(String space, List key, List defTuple, Object... ops) { + return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); + } + @Override public Future> delete(Integer space, List key) { return originOps.delete(space, key).toCompletableFuture(); } + @Override + public Future> delete(String space, List key) { + return originOps.delete(space, key).toCompletableFuture(); + } + @Override public Future> call(String function, Object... args) { return originOps.call(function, args).toCompletableFuture(); @@ -311,6 +536,7 @@ public void ping() { public void close() { originOps.close(); } + } } diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java index c0228874..90c884e0 100644 --- a/src/test/java/org/tarantool/ClientReconnectClusterIT.java +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -120,7 +120,7 @@ void testUpdateExtendedNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = @@ -131,7 +131,7 @@ void testUpdateExtendedNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -139,7 +139,7 @@ void testUpdateExtendedNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1, srv2 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1, srv2 } expectConnected(client, spaceId, pkId); @@ -165,7 +165,7 @@ void testUpdateNarrowNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service1Address)); @@ -175,7 +175,7 @@ void testUpdateNarrowNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address, service2Address ); @@ -184,7 +184,7 @@ void testUpdateNarrowNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1, srv2 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1, srv2 }; wait for { srv1 } expectConnected(client, spaceId, pkId); @@ -349,7 +349,7 @@ void testDelayFunctionResultFetch() { String service2Address = "127.0.0.1:" + PORTS[1]; String service3Address = "localhost:" + PORTS[2]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddressesFunction"; String functionBody = Stream.of(service1Address, service2Address) @@ -367,7 +367,7 @@ void testDelayFunctionResultFetch() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 3000, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -375,16 +375,16 @@ void testDelayFunctionResultFetch() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1 } expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv1 }; wait for { srv2 } + tryAwait(phaser, 1); // client = { srv1 }; wait for { srv2 } stopInstancesAndAwait(SRV1); expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv2 }; wait for { srv3 } + tryAwait(phaser, 2); // client = { srv2 }; wait for { srv3 } stopInstancesAndAwait(SRV2); expectConnected(client, spaceId, pkId); @@ -393,9 +393,9 @@ void testDelayFunctionResultFetch() { expectDisconnected(client, spaceId, pkId); } - private void tryAwait(CyclicBarrier barrier) { + private void tryAwait(Phaser phaser, int phase) { try { - barrier.await(6000, TimeUnit.MILLISECONDS); + phaser.awaitAdvanceInterruptibly(phase, 6000, TimeUnit.MILLISECONDS); } catch (Throwable e) { e.printStackTrace(); } diff --git a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java index 97cf697a..71b91520 100644 --- a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java +++ b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java @@ -91,12 +91,12 @@ public void execute() throws Throwable { @Test public void testFireAndForgetOperations() { - TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); - Set syncIds = new HashSet(); + Set syncIds = new HashSet<>(); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); - syncIds.add(ffOps.delete(spaceId, Collections.singletonList(10))); + syncIds.add(ffOps.insert(spaceId, Arrays.asList(1, "1"))); + syncIds.add(ffOps.delete(spaceId, Collections.singletonList(1))); syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); syncIds.add(ffOps.update(spaceId, Collections.singletonList(10), Arrays.asList("=", 1, "ten"))); @@ -117,9 +117,40 @@ public void testFireAndForgetOperations() { client.syncOps().ping(); // Check the effects + assertEquals(consoleSelect(SPACE_NAME, 1), Collections.emptyList()); checkRawTupleResult(consoleSelect(SPACE_NAME, 10), Arrays.asList(10, "ten")); checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(SPACE_NAME, 30), Collections.emptyList()); + assertEquals(Collections.emptyList(), consoleSelect(SPACE_NAME, 30)); + } + + @Test + public void testFireAndForgetStringOperations() { + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); + + Set syncIds = new HashSet<>(); + + syncIds.add(ffOps.insert(SPACE_NAME, Arrays.asList(2, "2"))); + syncIds.add(ffOps.delete(SPACE_NAME, Collections.singletonList(2))); + + syncIds.add(ffOps.insert(SPACE_NAME, Arrays.asList(20, "20"))); + syncIds.add(ffOps.update(SPACE_NAME, Collections.singletonList(20), Arrays.asList("=", 1, "twenty"))); + + syncIds.add(ffOps.replace(SPACE_NAME, Arrays.asList(200, "200"))); + syncIds.add(ffOps.upsert(SPACE_NAME, Collections.singletonList(200), Arrays.asList(200, "two hundred"), + Arrays.asList("=", 1, "two hundred"))); + + // Check the syncs. + assertFalse(syncIds.contains(0L)); + assertEquals(6, syncIds.size()); + + // The reply for synchronous ping will + // indicate to us that previous fire & forget operations are completed. + client.syncOps().ping(); + + // Check the effects + assertEquals(consoleSelect(SPACE_NAME, 2), Collections.emptyList()); + checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 200), Arrays.asList(200, "two hundred")); } private List consoleSelect(String spaceName, Object key) { diff --git a/src/test/java/org/tarantool/IteratorTest.java b/src/test/java/org/tarantool/IteratorTest.java index 7fd68b61..0fed2352 100644 --- a/src/test/java/org/tarantool/IteratorTest.java +++ b/src/test/java/org/tarantool/IteratorTest.java @@ -10,13 +10,23 @@ import java.util.List; class IteratorTest { - protected class MockOps extends AbstractTarantoolOps, Object, List> { + protected class MockOps extends AbstractTarantoolOps, Object, List> { @Override - public List exec(Code code, Object... args) { + protected List exec(Code code, ResolvableArgument... args) { return null; } + @Override + protected int resolveSpace(String space) { + throw new UnsupportedOperationException(); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + throw new UnsupportedOperationException(); + } + @Override public void close() { throw new UnsupportedOperationException(); diff --git a/src/test/java/org/tarantool/TarantoolClientOpsIT.java b/src/test/java/org/tarantool/TarantoolClientOpsIT.java index ebaca9f5..5b2e9dec 100644 --- a/src/test/java/org/tarantool/TarantoolClientOpsIT.java +++ b/src/test/java/org/tarantool/TarantoolClientOpsIT.java @@ -256,7 +256,7 @@ public void testReplaceMultiPartKey(SyncOpsProvider provider) { provider.close(); } - private void checkReplace(TarantoolClientOps, Object, List> clientOps, + private void checkReplace(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -332,7 +332,7 @@ public void testUpdateMultiPart(SyncOpsProvider provider) { provider.close(); } - private void checkUpdate(TarantoolClientOps, Object, List> clientOps, + private void checkUpdate(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -391,7 +391,7 @@ public void testUpsertMultiPart(SyncOpsProvider provider) { provider.close(); } - private void checkUpsert(TarantoolClientOps, Object, List> clientOps, + private void checkUpsert(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -446,7 +446,7 @@ public void testDeleteMultiPartKey(SyncOpsProvider provider) { provider.close(); } - private void checkDelete(TarantoolClientOps, Object, List> clientOps, + private void checkDelete(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -663,7 +663,7 @@ private static TarantoolConnection makeConnection() { } private interface SyncOpsProvider { - TarantoolClientOps, Object, List> getClientOps(); + TarantoolClientOps, Object, List> getClientOps(); void close(); } @@ -673,7 +673,7 @@ private static class ClientSyncOpsProvider implements SyncOpsProvider { private TarantoolClient client = makeTestClient(makeDefaultClientConfig(), RESTART_TIMEOUT); @Override - public TarantoolClientOps, Object, List> getClientOps() { + public TarantoolClientOps, Object, List> getClientOps() { return client.syncOps(); } @@ -689,7 +689,7 @@ private static class ConnectionSyncOpsProvider implements SyncOpsProvider { private TarantoolConnection connection = makeConnection(); @Override - public TarantoolClientOps, Object, List> getClientOps() { + public TarantoolClientOps, Object, List> getClientOps() { return connection; } diff --git a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java index 4e940033..858e2143 100644 --- a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java +++ b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java @@ -9,7 +9,6 @@ import static org.tarantool.TestUtils.makeDiscoveryFunction; import org.tarantool.CommunicationException; -import org.tarantool.TarantoolClient; import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.TarantoolException; @@ -36,7 +35,7 @@ public class ClusterServiceStoredFunctionDiscovererIT { private static TarantoolTestHelper testHelper; private TarantoolClusterClientConfig clusterConfig; - private TarantoolClient client; + private TarantoolClientImpl client; @BeforeAll public static void setupEnv() { diff --git a/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java b/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java index 08c02d96..2848f192 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java +++ b/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java @@ -45,7 +45,7 @@ public class JdbcExceptionHandlingTest { @Test public void testDatabaseMetaDataGetPrimaryKeysFormatError() throws SQLException { - TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); + TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); Object[] spc = new Object[7]; spc[FORMAT_IDX] = Collections.singletonList(new HashMap()); @@ -196,7 +196,7 @@ public void execute() throws Throwable { private void checkDatabaseMetaDataCommunicationException(final ThrowingConsumer consumer, String msg) throws SQLException { Exception ex = new CommunicationException("TEST"); - TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); + TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); doThrow(ex).when(syncOps).select(_VSPACE, 0, Arrays.asList(), 0, SPACES_MAX, 0); doThrow(ex).when(syncOps).select(_VSPACE, 2, Arrays.asList("TEST"), 0, 1, 0); @@ -213,7 +213,7 @@ private void checkDatabaseMetaDataCommunicationException(final ThrowingConsumer< } private SQLTarantoolClientImpl buildSQLClient(SQLTarantoolClientImpl.SQLRawOps sqlOps, - TarantoolClientOps, Object, List> ops) { + TarantoolClientOps, Object, List> ops) { SQLTarantoolClientImpl client = mock(SQLTarantoolClientImpl.class); when(client.sqlRawOps()).thenReturn(sqlOps); when(client.syncOps()).thenReturn(ops); diff --git a/src/test/java/org/tarantool/schema/ClientSchemaIT.java b/src/test/java/org/tarantool/schema/ClientSchemaIT.java new file mode 100644 index 00000000..47bb3aa5 --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientSchemaIT.java @@ -0,0 +1,226 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.tarantool.TestUtils.makeDefaultClientConfig; + +import org.tarantool.ServerVersion; +import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolTestHelper; +import org.tarantool.TestAssumptions; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +@DisplayName("A schema meta") +public class ClientSchemaIT { + + private static TarantoolTestHelper testHelper; + + private TarantoolClientImpl client; + + @BeforeAll + public static void setupEnv() { + testHelper = new TarantoolTestHelper("client-schema-it"); + testHelper.createInstance(); + testHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + testHelper.stopInstance(); + } + + @BeforeEach + public void setup() { + TarantoolClientConfig config = makeDefaultClientConfig(); + + client = new TarantoolClientImpl( + TarantoolTestHelper.HOST + ":" + TarantoolTestHelper.PORT, + config + ); + } + + @AfterEach + public void tearDown() { + client.close(); + testHelper.executeLua("box.space.count_space and box.space.count_space:drop()"); + } + + @Test + @DisplayName("fetched a space with its index") + void testFetchSpaces() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + + List spaceFormat = space.getFormat(); + assertEquals(2, spaceFormat.size()); + assertEquals("id", spaceFormat.get(0).getName()); + assertEquals("integer", spaceFormat.get(0).getType()); + assertEquals("counts", spaceFormat.get(1).getName()); + assertEquals("integer", spaceFormat.get(1).getType()); + + TarantoolIndexMeta primaryIndex = space.getIndex("pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("fetched a space with its index") + void testFetchNewSpaces() { + // add count_space + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + + // add count_space_2 + testHelper.executeLua( + "box.schema.space.create('count_space_2', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + meta.refresh(); + space = meta.getSpace("count_space_2"); + assertNotNull(space); + assertEquals("count_space_2", space.getName()); + + // add a primary index for count_space_2 + testHelper.executeLua( + "box.space.count_space_2:create_index('pk', { unique = true, type = 'TREE', parts = {'id'} } )" + ); + meta.refresh(); + TarantoolIndexMeta spaceIndex = meta.getSpaceIndex("count_space_2", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, spaceIndex); + } + + @Test + @DisplayName("fetched space indexes of a space") + void testFetchIndexes() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua( + "box.space.count_space:create_index('pk', { type = 'HASH', parts = {'id'} } )", + "box.space.count_space:create_index('c_index', { unique = false, type = 'TREE', parts = {'counts'} } )" + ); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("count_space", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "HASH", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + + TarantoolIndexMeta secondaryIndex = meta.getSpaceIndex("count_space", "c_index"); + TarantoolIndexMeta expectedSecondaryIndex = new TarantoolIndexMeta( + 1, "c_index", "TREE", + new TarantoolIndexMeta.IndexOptions(false), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(1, "integer")) + ); + assertIndex(expectedSecondaryIndex, secondaryIndex); + } + + @Test + @DisplayName("fetched sql table primary index") + void testFetchSqlIndexes() { + TestAssumptions.assumeMinimalServerVersion(testHelper.getInstanceVersion(), ServerVersion.V_2_1); + testHelper.executeSql("create table my_table (id int primary key, val varchar(100))"); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("MY_TABLE", "pk_unnamed_MY_TABLE_1"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk_unnamed_MY_TABLE_1", "tree", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("got an error with a wrong space name") + void tesGetUnknownSpace() { + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolSpaceNotFoundException exception = assertThrows( + TarantoolSpaceNotFoundException.class, + () -> meta.getSpace("unknown_space") + ); + assertEquals("unknown_space", exception.getSchemaName()); + } + + @Test + @DisplayName("got an error with a wrong space index name") + void testGetUnknownSpaceIndex() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + assertEquals("count_space", meta.getSpace("count_space").getName()); + TarantoolIndexNotFoundException exception = assertThrows( + TarantoolIndexNotFoundException.class, + () -> meta.getSpaceIndex("count_space", "wrong_pk") + ); + assertEquals("wrong_pk", exception.getIndexName()); + } + + private void assertIndex(TarantoolIndexMeta expectedIndex, TarantoolIndexMeta actualIndex) { + assertEquals(expectedIndex.getId(), actualIndex.getId()); + assertEquals(expectedIndex.getName(), actualIndex.getName()); + assertEquals(expectedIndex.getType(), actualIndex.getType()); + assertEquals(expectedIndex.getOptions(), actualIndex.getOptions()); + assertEquals(expectedIndex.getParts(), actualIndex.getParts()); + } + +}