diff --git a/README.md b/README.md index 025726e9..5201f071 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,18 @@ makes possible for the client to configure a socket provider. * `ComposableAsyncOps` - return the operation result as a `CompletionStage` * `FireAndForgetOps` - returns the query ID +Each operation that requires space or index to be executed, can work with +number ID as well as string name of a space or an index. +Assume, we have `my_space` space with space ID `512` and its primary index +`primary` with index ID `0`. Then, for instance, `select` operations can be +performed as the following: + +```java +client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ); +// or using more convenient way +client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ); +``` + Feel free to override any method of `TarantoolClientImpl`. For example, to hook all the results, you could override this: diff --git a/src/main/java/org/tarantool/AbstractTarantoolOps.java b/src/main/java/org/tarantool/AbstractTarantoolOps.java index 8cefb379..8dea1038 100644 --- a/src/main/java/org/tarantool/AbstractTarantoolOps.java +++ b/src/main/java/org/tarantool/AbstractTarantoolOps.java @@ -1,57 +1,163 @@ package org.tarantool; +import static org.tarantool.AbstractTarantoolOps.ResolvableArgument.ofResolved; +import static org.tarantool.AbstractTarantoolOps.ResolvableArgument.ofUnresolved; -public abstract class AbstractTarantoolOps - implements TarantoolClientOps { +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +public abstract class AbstractTarantoolOps + implements TarantoolClientOps { + + private final Function defaultSpaceResolver = space -> resolveSpace((String) space); private Code callCode = Code.CALL; - protected abstract Result exec(Code code, Object... args); + protected Result exec(Code code, Object... args) { + return exec(code, ResolvableArgument.ofAllResolved(args)); + } + + protected abstract Result exec(Code code, ResolvableArgument... args); + + protected abstract int resolveSpace(String space); + + protected abstract int resolveSpaceIndex(String space, String index); + + protected static class ResolvableArgument { + + private final Object value; + private final Function resolver; + + private ResolvableArgument(Object value, Function resolver) { + Objects.requireNonNull(value); + this.value = value; + this.resolver = resolver; + } + + public static ResolvableArgument ofResolved(Object resolvedValue) { + return new ResolvableArgument(resolvedValue, null); + } + + public static ResolvableArgument[] ofAllResolved(Object[] resolvedValue) { + return Stream.of(resolvedValue).map(ResolvableArgument::ofResolved).toArray(ResolvableArgument[]::new); + } + + public static ResolvableArgument ofUnresolved(Object unresolvedValue, Function resolver) { + return new ResolvableArgument(unresolvedValue, resolver); + } + + public boolean isUnresolved() { + try { + return isResolvable() && resolver.apply(value) == null; + } catch (Exception ignored) { + return true; + } + } + + public boolean isResolvable() { + return resolver != null; + } - public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) { + public Object getValue() { + return isResolvable() ? resolver.apply(value) : value; + } + + @Override + public String toString() { + if (isUnresolved()) { + return "{unresolved: " + value + "}"; + } + return value.toString(); + } + + } + + @Override + public Result select(Integer space, Integer index, Tuple key, int offset, int limit, Iterator iterator) { return select(space, index, key, offset, limit, iterator.getValue()); } - public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) { - return exec( - Code.SELECT, - Key.SPACE, space, - Key.INDEX, index, - Key.KEY, key, - Key.ITERATOR, iterator, - Key.LIMIT, limit, - Key.OFFSET, offset + @Override + public Result select(String space, String index, Tuple key, int offset, int limit, Iterator iterator) { + return select(space, index, key, offset, limit, iterator.getValue()); + } + + @Override + public Result select(Integer space, Integer index, Tuple key, int offset, int limit, int iterator) { + return doSelect(ofResolved(space), ofResolved(index), key, offset, limit, iterator); + } + + @Override + public Result select(String space, String index, Tuple key, int offset, int limit, int iterator) { + return doSelect( + ofUnresolved(space, defaultSpaceResolver), + ofUnresolved(index, indexName -> resolveSpaceIndex(space, (String) indexName)), + key, offset, limit, iterator ); } - public Result insert(Space space, Tuple tuple) { - return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result insert(Integer space, Tuple tuple) { + return doInsert(ofResolved(space), tuple); } - public Result replace(Space space, Tuple tuple) { - return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result insert(String space, Tuple tuple) { + return doInsert(ofUnresolved(space, defaultSpaceResolver), tuple); } - public Result update(Space space, Tuple key, Operation... args) { - return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args); + @Override + public Result replace(Integer space, Tuple tuple) { + return doReplace(ofResolved(space), tuple); } - public Result upsert(Space space, Tuple key, Tuple def, Operation... args) { - return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args); + @Override + public Result replace(String space, Tuple tuple) { + return doReplace(ofUnresolved(space, defaultSpaceResolver), tuple); } - public Result delete(Space space, Tuple key) { - return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key); + @Override + public Result update(Integer space, Tuple key, Operation... args) { + return doUpdate(ofResolved(space), key, args); } + @Override + public Result update(String space, Tuple key, Operation... tuple) { + return doUpdate(ofUnresolved(space, defaultSpaceResolver), key, tuple); + } + + @Override + public Result upsert(Integer space, Tuple key, Tuple defTuple, Operation... ops) { + return doUpsert(ofResolved(space), key, defTuple, ops); + } + + @Override + public Result upsert(String space, Tuple key, Tuple defTuple, Operation... ops) { + return doUpsert(ofUnresolved(space, defaultSpaceResolver), key, defTuple, ops); + } + + @Override + public Result delete(Integer space, Tuple key) { + return doDelete(ofResolved(space), key); + } + + @Override + public Result delete(String space, Tuple key) { + return doDelete(ofUnresolved(space, defaultSpaceResolver), key); + } + + @Override public Result call(String function, Object... args) { return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args); } + @Override public Result eval(String expression, Object... args) { return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args); } + @Override public void ping() { exec(Code.PING); } @@ -59,4 +165,53 @@ public void ping() { public void setCallCode(Code callCode) { this.callCode = callCode; } + + private Result doDelete(ResolvableArgument space, Tuple key) { + return exec(Code.DELETE, ofResolved(Key.SPACE), space, ofResolved(Key.KEY), ofResolved(key)); + } + + private Result doUpsert(ResolvableArgument space, Tuple key, Tuple defTuple, Operation... ops) { + return exec( + Code.UPSERT, + ofResolved(Key.SPACE), space, + ofResolved(Key.KEY), ofResolved(key), + ofResolved(Key.TUPLE), ofResolved(defTuple), + ofResolved(Key.UPSERT_OPS), ofResolved(ops) + ); + } + + private Result doUpdate(ResolvableArgument space, Tuple key, Operation... ops) { + return exec( + Code.UPDATE, + ofResolved(Key.SPACE), space, + ofResolved(Key.KEY), ofResolved(key), + ofResolved(Key.TUPLE), ofResolved(ops) + ); + } + + private Result doReplace(ResolvableArgument space, Tuple tuple) { + return exec(Code.REPLACE, ofResolved(Key.SPACE), space, ofResolved(Key.TUPLE), ofResolved(tuple)); + } + + private Result doInsert(ResolvableArgument space, Tuple tuple) { + return exec(Code.INSERT, ofResolved(Key.SPACE), space, ofResolved(Key.TUPLE), ofResolved(tuple)); + } + + private Result doSelect(ResolvableArgument space, + ResolvableArgument index, + Tuple key, + int offset, + int limit, + int iterator) { + return exec( + Code.SELECT, + ofResolved(Key.SPACE), space, + ofResolved(Key.INDEX), index, + ofResolved(Key.KEY), ofResolved(key), + ofResolved(Key.ITERATOR), ofResolved(iterator), + ofResolved(Key.LIMIT), ofResolved(limit), + ofResolved(Key.OFFSET), ofResolved(offset) + ); + } + } diff --git a/src/main/java/org/tarantool/TarantoolBase.java b/src/main/java/org/tarantool/TarantoolBase.java index c74647ae..5f89f1fc 100644 --- a/src/main/java/org/tarantool/TarantoolBase.java +++ b/src/main/java/org/tarantool/TarantoolBase.java @@ -9,7 +9,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; -public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { +public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { protected String serverVersion; protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE; protected AtomicLong syncId = new AtomicLong(); @@ -42,16 +42,6 @@ protected void closeChannel(SocketChannel channel) { } } - protected void validateArgs(Object[] args) { - if (args != null) { - for (int i = 0; i < args.length; i += 2) { - if (args[i + 1] == null) { - throw new NullPointerException(((Key) args[i]).name() + " should not be null"); - } - } - } - } - public void setInitialRequestSize(int initialRequestSize) { this.initialRequestSize = initialRequestSize; } diff --git a/src/main/java/org/tarantool/TarantoolClient.java b/src/main/java/org/tarantool/TarantoolClient.java index 2ad0c84c..0848fb56 100644 --- a/src/main/java/org/tarantool/TarantoolClient.java +++ b/src/main/java/org/tarantool/TarantoolClient.java @@ -7,13 +7,13 @@ import java.util.concurrent.TimeUnit; public interface TarantoolClient { - TarantoolClientOps, Object, List> syncOps(); + TarantoolClientOps, Object, List> syncOps(); - TarantoolClientOps, Object, Future>> asyncOps(); + TarantoolClientOps, Object, Future>> asyncOps(); - TarantoolClientOps, Object, CompletionStage>> composableAsyncOps(); + TarantoolClientOps, Object, CompletionStage>> composableAsyncOps(); - TarantoolClientOps, Object, Long> fireAndForgetOps(); + TarantoolClientOps, Object, Long> fireAndForgetOps(); TarantoolSQLOps>> sqlSyncOps(); diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index eed09a45..31f929a0 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -1,9 +1,11 @@ package org.tarantool; +import org.tarantool.protocol.ProtoConstants; import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.ReadableViaSelectorChannel; import org.tarantool.protocol.TarantoolGreeting; import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.schema.TarantoolSchemaMeta; import java.io.IOException; import java.nio.ByteBuffer; @@ -11,12 +13,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -25,6 +31,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.StampedLock; +import java.util.stream.Stream; public class TarantoolClientImpl extends TarantoolBase> implements TarantoolClient { @@ -43,6 +51,11 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected volatile Exception thumbstone; + protected ScheduledExecutorService workExecutor; + + protected StampedLock schemaLock = new StampedLock(); + protected BlockingQueue> delayedOpsQueue; + protected Map> futures; protected AtomicInteger pendingResponsesCount = new AtomicInteger(); @@ -63,6 +76,7 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected SyncOps syncOps; protected FireAndForgetOps fireAndForgetOps; protected ComposableAsyncOps composableAsyncOps; + protected UnsafeSchemaOps unsafeSchemaOps; /** * Inner. @@ -72,6 +86,9 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected Thread reader; protected Thread writer; + protected volatile long currentSchemaId = 1L; + protected TarantoolSchemaMeta schemaMeta = new TarantoolSchemaMeta(this); + protected Thread connector = new Thread(new Runnable() { @Override public void run() { @@ -108,6 +125,9 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon this.socketProvider = socketProvider; this.stats = new TarantoolClientStats(); this.futures = new ConcurrentHashMap<>(config.predictedFutures); + this.delayedOpsQueue = new PriorityBlockingQueue<>(128); + this.workExecutor = + Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantool-worker")); this.sharedBuffer = ByteBuffer.allocateDirect(config.sharedBufferSize); this.writerBuffer = ByteBuffer.allocateDirect(sharedBuffer.capacity()); this.connector.setDaemon(true); @@ -115,6 +135,7 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon this.syncOps = new SyncOps(); this.composableAsyncOps = new ComposableAsyncOps(); this.fireAndForgetOps = new FireAndForgetOps(); + this.unsafeSchemaOps = new UnsafeSchemaOps(); if (!config.useNewCall) { setCallCode(Code.OLD_CALL); this.syncOps.setCallCode(Code.OLD_CALL); @@ -136,6 +157,7 @@ private void startConnector(long initTimeoutMillis) { close(e); throw e; } + updateSchema(currentSchemaId); } catch (InterruptedException e) { close(e); throw new IllegalStateException(e); @@ -175,6 +197,7 @@ protected void connect(final SocketChannel channel) throws Exception { try { TarantoolGreeting greeting = ProtoUtils.connect(channel, config.username, config.password, msgPackLite); this.serverVersion = greeting.getServerVersion(); + currentSchemaId = ProtoUtils.getSchemaId(channel, msgPackLite); } catch (IOException e) { closeChannel(channel); throw new CommunicationException("Couldn't connect to tarantool", e); @@ -203,7 +226,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { readThread(); } finally { - state.release(StateHelper.READING); + state.release(StateHelper.READING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -217,7 +240,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { writeThread(); } finally { - state.release(StateHelper.WRITING); + state.release(StateHelper.WRITING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -250,7 +273,8 @@ protected void configureThreads(String threadName) { * * @see #setOperationTimeout(long) */ - protected Future exec(Code code, Object... args) { + @Override + protected Future exec(Code code, ResolvableArgument... args) { return doExec(operationTimeout, code, args); } @@ -265,33 +289,63 @@ protected Future exec(Code code, Object... args) { * @return deferred result */ protected Future exec(long timeoutMillis, Code code, Object... args) { - return doExec(timeoutMillis, code, args); + return doExec(timeoutMillis, code, ResolvableArgument.ofAllResolved(args)); } - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); + @Override + protected int resolveSpace(String space) { + return schemaMeta.getSpace(space).getId(); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + return schemaMeta.getSpaceIndex(space, index).getId(); + } + protected TarantoolOp doExec(long timeoutMillis, Code code, ResolvableArgument[] args) { + long sid = syncId.incrementAndGet(); TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); + // space or index name weren't resolved + // it's possible the client keeps the outdated schema + // refresh the schema and delay operation execution + if (future.hasUnresolvedArguments()) { + delayedOpsQueue.add(future); + updateSchema(currentSchemaId); + return future; + } + long stamp = schemaLock.readLock(); + try { + // postpone operation if the schema is being reloaded + if (state.isStateSet(StateHelper.SCHEMA_UPDATING)) { + delayedOpsQueue.add(future); + return future; + } + return registerOperation(future, currentSchemaId); + } finally { + schemaLock.unlockRead(stamp); + } + } + + protected TarantoolOp registerOperation(TarantoolOp future, long schemaId) { if (isDead(future)) { return future; } - futures.put(sid, future); + futures.put(future.getId(), future); if (isDead(future)) { - futures.remove(sid); + futures.remove(future.getId()); return future; } try { - write(code, sid, null, args); + write(future.getCode(), future.getId(), schemaId, future.getResolvedArgs()); } catch (Exception e) { - futures.remove(sid); + futures.remove(future.getId()); fail(future, e); } return future; } - protected TarantoolOp makeNewOperation(long timeoutMillis, long sid, Code code, Object[] args) { + protected TarantoolOp makeNewOperation(long timeoutMillis, long sid, Code code, ResolvableArgument[] args) { return new TarantoolOp<>(sid, code, args) .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); } @@ -313,6 +367,12 @@ protected synchronized void die(String message, Exception cause) { iterator.remove(); } } + + TarantoolOp op; + while ((op = delayedOpsQueue.poll()) != null) { + fail(op, error); + } + pendingResponsesCount.set(0); bufferLock.lock(); @@ -466,17 +526,66 @@ protected void fail(TarantoolOp future, Exception e) { } protected void complete(TarantoolPacket packet, TarantoolOp future) { - if (future != null) { - long code = packet.getCode(); - if (code == 0) { - if (future.getCode() == Code.EXECUTE) { - completeSql(future, packet); - } else { - ((TarantoolOp) future).complete(packet.getBody().get(Key.DATA.getId())); - } + if (future == null || future.isDone()) { + return; + } + + long code = packet.getCode(); + long schemaId = packet.getSchemaId(); + + if (code == ProtoConstants.SUCCESS) { + if (future.getCode() == Code.EXECUTE) { + completeSql(future, packet); } else { - Object error = packet.getBody().get(Key.ERROR.getId()); - fail(future, serverError(code, error)); + ((TarantoolOp) future).complete(packet.getData()); + } + } else if (code == ProtoConstants.ERR_WRONG_SCHEMA_VERSION) { + if (future.hasResolvableArguments()) { + delayedOpsQueue.add(future); + } else { + registerOperation(future, schemaId); + } + } else { + Object error = packet.getError(); + fail(future, serverError(code, error)); + } + // it's possible to receive bigger version than current + // i.e. after DML operation or wrong schema version response + if (schemaId > currentSchemaId) { + updateSchema(schemaId); + } + } + + private void updateSchema(long schemaId) { + long stamp = schemaLock.writeLock(); + try { + if (state.acquire(StateHelper.SCHEMA_UPDATING)) { + workExecutor.execute(createUpdateSchemaTask(schemaId)); + } + } finally { + schemaLock.unlockWrite(stamp); + } + } + + private Runnable createUpdateSchemaTask(long schemaId) { + return () -> { + schemaMeta.refresh(); + long stamp = schemaLock.writeLock(); + try { + currentSchemaId = schemaId; + rescheduleDelayedOperations(); + state.release(StateHelper.SCHEMA_UPDATING); + } finally { + schemaLock.unlock(stamp); + } + }; + } + + private void rescheduleDelayedOperations() { + while (delayedOpsQueue.peek() != null) { + TarantoolOp op = delayedOpsQueue.poll(); + if (!op.isDone()) { + registerOperation(op, currentSchemaId); } } } @@ -523,6 +632,9 @@ public void close() { protected void close(Exception e) { if (state.close()) { + if (workExecutor != null) { + workExecutor.shutdownNow(); + } connector.interrupt(); die(e.getMessage(), e); } @@ -565,12 +677,12 @@ public void setOperationTimeout(long operationTimeout) { @Override public boolean isAlive() { - return state.getState() == StateHelper.ALIVE && thumbstone == null; + return state.isStateSet(StateHelper.ALIVE) && thumbstone == null; } @Override public boolean isClosed() { - return state.getState() == StateHelper.CLOSED; + return state.isStateSet(StateHelper.CLOSED); } @Override @@ -584,25 +696,29 @@ public boolean waitAlive(long timeout, TimeUnit unit) throws InterruptedExceptio } @Override - public TarantoolClientOps, Object, List> syncOps() { + public TarantoolClientOps, Object, List> syncOps() { return syncOps; } @Override - public TarantoolClientOps, Object, Future>> asyncOps() { + public TarantoolClientOps, Object, Future>> asyncOps() { return (TarantoolClientOps) this; } @Override - public TarantoolClientOps, Object, CompletionStage>> composableAsyncOps() { + public TarantoolClientOps, Object, CompletionStage>> composableAsyncOps() { return composableAsyncOps; } @Override - public TarantoolClientOps, Object, Long> fireAndForgetOps() { + public TarantoolClientOps, Object, Long> fireAndForgetOps() { return fireAndForgetOps; } + public TarantoolClientOps, Object, List> unsafeSchemaOps() { + return unsafeSchemaOps; + } + @Override public TarantoolSQLOps>> sqlSyncOps() { return new TarantoolSQLOps>>() { @@ -633,29 +749,23 @@ public Future>> query(String sql, Object... bind) { }; } - protected class SyncOps extends AbstractTarantoolOps, Object, List> { + protected class SyncOps extends ClientDelegatingOps> { @Override - public List exec(Code code, Object... args) { + public List exec(Code code, ResolvableArgument... args) { return (List) syncGet(TarantoolClientImpl.this.exec(code, args)); } - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); - } - } - protected class FireAndForgetOps extends AbstractTarantoolOps, Object, Long> { + protected class FireAndForgetOps extends ClientDelegatingOps { @Override - public Long exec(Code code, Object... args) { + public Long exec(Code code, ResolvableArgument... args) { if (thumbstone == null) { try { - long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); - write(code, syncId, null, args); - return syncId; + TarantoolOp operation = doExec(operationTimeout, code, args); + return operation.getId(); } catch (Exception e) { throw new CommunicationException("Execute failed", e); } @@ -664,11 +774,6 @@ public Long exec(Code code, Object... args) { } } - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); - } - } protected boolean isDead(TarantoolOp future) { @@ -679,7 +784,7 @@ protected boolean isDead(TarantoolOp future) { return false; } - protected static class TarantoolOp extends CompletableFuture { + protected static class TarantoolOp extends CompletableFuture implements Comparable { /** * A task identifier used in {@link TarantoolClientImpl#futures}. @@ -694,9 +799,9 @@ protected static class TarantoolOp extends CompletableFuture { /** * Arguments of operation. */ - private final Object[] args; + private final ResolvableArgument[] args; - public TarantoolOp(long id, Code code, Object[] args) { + public TarantoolOp(long id, Code code, ResolvableArgument[] args) { this.id = id; this.code = code; this.args = args; @@ -710,10 +815,22 @@ public Code getCode() { return code; } - public Object[] getArgs() { + public ResolvableArgument[] getArgs() { return args; } + public Object[] getResolvedArgs() { + return Stream.of(args).map(ResolvableArgument::getValue).toArray(); + } + + public boolean hasUnresolvedArguments() { + return Stream.of(args).anyMatch(ResolvableArgument::isUnresolved); + } + + public boolean hasResolvableArguments() { + return Stream.of(args).anyMatch(ResolvableArgument::isResolvable); + } + /** * Missed in jdk8 CompletableFuture operator to limit execution * by time. @@ -746,6 +863,11 @@ public TarantoolOp orTimeout(long timeout, TimeUnit unit) { return this; } + @Override + public int compareTo(TarantoolOp otherOp) { + return Long.compareUnsigned(this.id, otherOp.id); + } + /** * Runs timeout operation as a delayed task. */ @@ -786,9 +908,12 @@ protected final class StateHelper { static final int UNINITIALIZED = 0; static final int READING = 1; static final int WRITING = 2; + static final int ALIVE = READING | WRITING; - static final int RECONNECT = 4; - static final int CLOSED = 8; + static final int SCHEMA_UPDATING = 1 << 2; + + static final int RECONNECT = 1 << 3; + static final int CLOSED = 1 << 4; private final AtomicInteger state; @@ -817,6 +942,10 @@ protected int getState() { return state.get(); } + boolean isStateSet(int mask) { + return (getState() & mask) == mask; + } + /** * Set CLOSED state, drop RECONNECT state. */ @@ -825,12 +954,12 @@ protected boolean close() { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if (isStateSet(CLOSED)) { return false; } - /* Drop RECONNECT, set CLOSED. */ - if (compareAndSet(currentState, (currentState & ~RECONNECT) | CLOSED)) { + /* Clear all states and set CLOSED. */ + if (compareAndSet(currentState, CLOSED)) { return true; } } @@ -846,7 +975,7 @@ protected boolean acquire(int mask) { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if ((isStateSet(CLOSED))) { return false; } @@ -856,8 +985,8 @@ protected boolean acquire(int mask) { } /* Cannot move from a state to the same state. */ - if ((currentState & mask) != 0) { - throw new IllegalStateException("State is already " + mask); + if (isStateSet(mask)) { + return false; } /* Set acquired state. */ @@ -881,7 +1010,8 @@ protected boolean compareAndSet(int expect, int update) { return false; } - if (update == ALIVE) { + boolean wasAlreadyAlive = (expect & ALIVE) == ALIVE; + if (!wasAlreadyAlive && update == ALIVE) { CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1)); latch.countDown(); onReconnect(); @@ -916,13 +1046,13 @@ private CountDownLatch getStateLatch(int state) { return closedLatch; } if (state == ALIVE) { - if (getState() == CLOSED) { + if (isStateSet(CLOSED)) { throw new IllegalStateException("State is CLOSED."); } CountDownLatch latch = nextAliveLatch.get(); /* It may happen so that an error is detected but the state is still alive. Wait for the 'next' alive state in such cases. */ - return (getState() == ALIVE && thumbstone == null) ? null : latch; + return (isStateSet(ALIVE) && thumbstone == null) ? null : latch; } return null; } @@ -935,7 +1065,7 @@ private CountDownLatch getStateLatch(int state) { private void awaitReconnection() throws InterruptedException { connectorLock.lock(); try { - while (getState() != StateHelper.RECONNECT) { + while (!isStateSet(RECONNECT)) { reconnectRequired.await(); } } finally { @@ -961,11 +1091,10 @@ private void trySignalForReconnection() { } - protected class ComposableAsyncOps - extends AbstractTarantoolOps, Object, CompletionStage>> { + protected class ComposableAsyncOps extends ClientDelegatingOps>> { @Override - public CompletionStage> exec(Code code, Object... args) { + public CompletionStage> exec(Code code, ResolvableArgument... args) { return (CompletionStage>) TarantoolClientImpl.this.exec(code, args); } @@ -976,4 +1105,37 @@ public void close() { } + /** + * Used by internal services to ignore schema ID issues. + */ + protected class UnsafeSchemaOps extends ClientDelegatingOps> { + + @Override + protected List exec(Code code, ResolvableArgument... args) { + long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); + TarantoolOp op = makeNewOperation(operationTimeout, syncId, code, args); + return (List) syncGet(registerOperation(op, 0L)); + } + + } + + protected abstract class ClientDelegatingOps extends AbstractTarantoolOps, Object, R> { + + @Override + protected int resolveSpace(String space) { + return TarantoolClientImpl.this.resolveSpace(space); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + return TarantoolClientImpl.this.resolveSpaceIndex(space, index); + } + + @Override + public void close() { + throw new IllegalStateException("You should close TarantoolClient instead."); + } + + } + } diff --git a/src/main/java/org/tarantool/TarantoolClientOps.java b/src/main/java/org/tarantool/TarantoolClientOps.java index 69ab3a9e..1d7878e1 100644 --- a/src/main/java/org/tarantool/TarantoolClientOps.java +++ b/src/main/java/org/tarantool/TarantoolClientOps.java @@ -1,20 +1,34 @@ package org.tarantool; -public interface TarantoolClientOps { - R select(T space, T index, O key, int offset, int limit, int iterator); +public interface TarantoolClientOps { + R select(Integer space, Integer index, O key, int offset, int limit, int iterator); - R select(T space, T index, O key, int offset, int limit, Iterator iterator); + R select(String space, String index, O key, int offset, int limit, int iterator); - R insert(T space, O tuple); + R select(Integer space, Integer index, O key, int offset, int limit, Iterator iterator); - R replace(T space, O tuple); + R select(String space, String index, O key, int offset, int limit, Iterator iterator); - R update(T space, O key, P... tuple); + R insert(Integer space, O tuple); - R upsert(T space, O key, O defTuple, P... ops); + R insert(String space, O tuple); - R delete(T space, O key); + R replace(Integer space, O tuple); + + R replace(String space, O tuple); + + R update(Integer space, O key, P... tuple); + + R update(String space, O key, P... tuple); + + R upsert(Integer space, O key, O defTuple, P... ops); + + R upsert(String space, O key, O defTuple, P... ops); + + R delete(Integer space, O key); + + R delete(String space, O key); R call(String function, Object... args); diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index b0c4711b..f7dabbdf 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -14,7 +14,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; @@ -35,7 +34,6 @@ public class TarantoolClusterClient extends TarantoolClientImpl { /** * Discovery activity. */ - private ScheduledExecutorService instancesDiscoveryExecutor; private Runnable instancesDiscovererTask; private StampedLock discoveryLock = new StampedLock(); @@ -70,14 +68,12 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannel if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) { this.instancesDiscovererTask = createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this)); - this.instancesDiscoveryExecutor - = Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer")); int delay = config.clusterDiscoveryDelayMillis > 0 ? config.clusterDiscoveryDelayMillis : TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS; // todo: it's better to start a job later (out of ctor) - this.instancesDiscoveryExecutor.scheduleWithFixedDelay( + this.workExecutor.scheduleWithFixedDelay( this.instancesDiscovererTask, 0, delay, @@ -99,14 +95,6 @@ protected boolean isDead(TarantoolOp future) { return false; } - @Override - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); - TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); - return registerOperation(future); - } - /** * Registers a new async operation which will be resolved later. * Registration is discovery-aware in term of synchronization and @@ -116,26 +104,11 @@ protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { * * @return registered operation */ - private TarantoolOp registerOperation(TarantoolOp future) { + @Override + protected TarantoolOp registerOperation(TarantoolOp future, long schemaId) { long stamp = discoveryLock.readLock(); try { - if (isDead(future)) { - return future; - } - futures.put(future.getId(), future); - if (isDead(future)) { - futures.remove(future.getId()); - return future; - } - - try { - write(future.getCode(), future.getId(), null, future.getArgs()); - } catch (Exception e) { - futures.remove(future.getId()); - fail(future, e); - } - - return future; + return super.registerOperation(future, schemaId); } finally { discoveryLock.unlock(stamp); } @@ -161,10 +134,6 @@ protected boolean checkFail(TarantoolOp future, Exception e) { protected void close(Exception e) { super.close(e); - if (instancesDiscoveryExecutor != null) { - instancesDiscoveryExecutor.shutdownNow(); - } - if (retries == null) { // May happen within constructor. return; @@ -198,7 +167,7 @@ protected void onReconnect() { retries.clear(); for (final TarantoolOp future : futuresToRetry) { if (!future.isDone()) { - executor.execute(() -> registerOperation(future)); + executor.execute(() -> registerOperation(future, currentSchemaId)); } } } @@ -278,7 +247,6 @@ public synchronized void run() { onInstancesRefreshed(lastInstances); } } catch (Exception ignored) { - ignored.getCause(); // no-op } } diff --git a/src/main/java/org/tarantool/TarantoolConnection.java b/src/main/java/org/tarantool/TarantoolConnection.java index cf9c553f..3d658153 100644 --- a/src/main/java/org/tarantool/TarantoolConnection.java +++ b/src/main/java/org/tarantool/TarantoolConnection.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.stream.Stream; public class TarantoolConnection extends TarantoolBase> implements TarantoolSQLOps>> { @@ -27,11 +28,22 @@ public TarantoolConnection(String username, String password, Socket socket) thro } @Override - protected List exec(Code code, Object... args) { - TarantoolPacket responsePacket = writeAndRead(code, args); + protected List exec(Code code, ResolvableArgument... args) { + Object[] resolvedArgs = Stream.of(args).map(ResolvableArgument::getValue).toArray(); + TarantoolPacket responsePacket = writeAndRead(code, resolvedArgs); return (List) responsePacket.getBody().get(Key.DATA.getId()); } + @Override + protected int resolveSpace(String space) { + throw new UnsupportedOperationException(); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + throw new UnsupportedOperationException(); + } + protected TarantoolPacket writeAndRead(Code code, Object... args) { try { ByteBuffer packet = ProtoUtils.createPacket(initialRequestSize, msgPackLite, @@ -44,7 +56,7 @@ protected TarantoolPacket writeAndRead(Code code, Object... args) { Long c = responsePacket.getCode(); if (c != 0) { - throw serverError(c, responsePacket.getBody().get(Key.ERROR.getId())); + throw serverError(c, responsePacket.getError()); } return responsePacket; diff --git a/src/main/java/org/tarantool/TarantoolException.java b/src/main/java/org/tarantool/TarantoolException.java index aed93b14..cbc377fb 100644 --- a/src/main/java/org/tarantool/TarantoolException.java +++ b/src/main/java/org/tarantool/TarantoolException.java @@ -1,5 +1,11 @@ package org.tarantool; +import static org.tarantool.protocol.ProtoConstants.ERR_LOADING; +import static org.tarantool.protocol.ProtoConstants.ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_READONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_TIMEOUT; +import static org.tarantool.protocol.ProtoConstants.ERR_WRONG_SCHEMA_VERSION; + /** * A remote server error with error code and message. * @@ -7,11 +13,6 @@ * @version $Id: $ */ public class TarantoolException extends RuntimeException { - /* taken from src/box/errcode.h */ - public static final int ERR_READONLY = 7; - public static final int ERR_TIMEOUT = 78; - public static final int ERR_LOADING = 116; - public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; private static final long serialVersionUID = 1L; long code; @@ -60,6 +61,7 @@ boolean isTransient() { switch ((int) code) { case ERR_READONLY: case ERR_TIMEOUT: + case ERR_WRONG_SCHEMA_VERSION: case ERR_LOADING: case ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY: return true; diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java index da92cd3a..0d8406e9 100644 --- a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java +++ b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java @@ -1,6 +1,6 @@ package org.tarantool.cluster; -import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClientOps; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.util.StringUtils; @@ -19,17 +19,18 @@ */ public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer { - private TarantoolClient client; + private TarantoolClientImpl client; private String entryFunction; - public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) { + public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, + TarantoolClientImpl client) { this.client = client; this.entryFunction = clientConfig.clusterDiscoveryEntryFunction; } @Override public Set getInstances() { - TarantoolClientOps, Object, List> syncOperations = client.syncOps(); + TarantoolClientOps, Object, List> syncOperations = client.unsafeSchemaOps(); List list = syncOperations.call(entryFunction); // discoverer expects a single array result from the function now; diff --git a/src/main/java/org/tarantool/protocol/ProtoConstants.java b/src/main/java/org/tarantool/protocol/ProtoConstants.java new file mode 100644 index 00000000..daee4dd7 --- /dev/null +++ b/src/main/java/org/tarantool/protocol/ProtoConstants.java @@ -0,0 +1,19 @@ +package org.tarantool.protocol; + +public class ProtoConstants { + + private ProtoConstants() { + } + + public static final long ERROR_TYPE_MARKER = 0x8000; + + public static final long SUCCESS = 0x0; + + /* taken from src/box/errcode.h */ + public static final int ERR_READONLY = 7; + public static final int ERR_TIMEOUT = 78; + public static final int ERR_WRONG_SCHEMA_VERSION = 109; + public static final int ERR_LOADING = 116; + public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; + +} diff --git a/src/main/java/org/tarantool/protocol/ProtoUtils.java b/src/main/java/org/tarantool/protocol/ProtoUtils.java index f5654c5a..12cf1831 100644 --- a/src/main/java/org/tarantool/protocol/ProtoUtils.java +++ b/src/main/java/org/tarantool/protocol/ProtoUtils.java @@ -195,6 +195,14 @@ public static TarantoolGreeting connect(SocketChannel channel, return new TarantoolGreeting(serverVersion); } + public static long getSchemaId(SocketChannel channel, MsgPackLite msgPackLite) throws IOException { + ByteBuffer pingPacket = createPacket(msgPackLite, Code.PING, 0L, 0L); + writeFully(channel, pingPacket); + + TarantoolPacket pongPacket = readPacket(channel, msgPackLite); + return pongPacket.getSchemaId(); + } + private static void assertCorrectWelcome(String firstLine, SocketAddress remoteAddress) { if (!firstLine.startsWith(WELCOME)) { String errMsg = "Failed to connect to node " + remoteAddress.toString() + @@ -208,7 +216,7 @@ private static void assertCorrectWelcome(String firstLine, SocketAddress remoteA private static void assertNoErrCode(TarantoolPacket authResponse) { Long code = (Long) authResponse.getHeaders().get(Key.CODE.getId()); if (code != 0) { - Object error = authResponse.getBody().get(Key.ERROR.getId()); + Object error = authResponse.getError(); String errorMsg = error instanceof String ? (String) error : new String((byte[]) error); throw new TarantoolException(code, errorMsg); } @@ -300,7 +308,22 @@ public static ByteBuffer createPacket(int initialRequestSize, return buffer; } + /** + * Extracts an error code. + * + * @param code in 0x8XXX format + * + * @return actual error code (which is a XXX part) + */ + public static long extractErrorCode(long code) { + if ((code & ProtoConstants.ERROR_TYPE_MARKER) == 0) { + throw new IllegalArgumentException(String.format("Code %h does not follow 0x8XXX format", code)); + } + return (~ProtoConstants.ERROR_TYPE_MARKER & code); + } + private static class ByteArrayOutputStream extends java.io.ByteArrayOutputStream { + public ByteArrayOutputStream(int size) { super(size); } @@ -308,6 +331,7 @@ public ByteArrayOutputStream(int size) { ByteBuffer toByteBuffer() { return ByteBuffer.wrap(buf, 0, count); } + } } diff --git a/src/main/java/org/tarantool/protocol/TarantoolPacket.java b/src/main/java/org/tarantool/protocol/TarantoolPacket.java index ca131177..ca661ed9 100644 --- a/src/main/java/org/tarantool/protocol/TarantoolPacket.java +++ b/src/main/java/org/tarantool/protocol/TarantoolPacket.java @@ -29,8 +29,9 @@ public Long getCode() { potenticalCode != null ? potenticalCode.getClass().toString() : "null" ); } + Long code = (Long) potenticalCode; - return (Long) potenticalCode; + return code == 0 ? code : ProtoUtils.extractErrorCode(code); } public Long getSync() { @@ -48,4 +49,16 @@ public Map getBody() { public boolean hasBody() { return body != null && body.size() > 0; } + + public long getSchemaId() { + return (Long) headers.get(Key.SCHEMA_ID.getId()); + } + + public Object getData() { + return hasBody() ? body.get(Key.DATA.getId()) : null; + } + + public Object getError() { + return hasBody() ? body.get(Key.ERROR.getId()) : null; + } } diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java new file mode 100644 index 00000000..41c1cc6e --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java @@ -0,0 +1,147 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Keeps a space index metadata. + */ +public class TarantoolIndexMeta { + + private final int id; + private final String name; + private final String type; + private final IndexOptions options; + private final List parts; + + public TarantoolIndexMeta(int id, + String name, + String type, + IndexOptions options, + List parts) { + this.id = id; + this.name = name; + this.type = type; + this.options = options; + this.parts = parts; + } + + public static TarantoolIndexMeta fromTuple(List tuple) { + Map optionsMap = (Map) tuple.get(4); + + List parts = Collections.emptyList(); + List partsTuple = (List) tuple.get(5); + if (!partsTuple.isEmpty()) { + if (partsTuple.get(0) instanceof List) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart((Integer) part.get(0), (String) part.get(1))) + .collect(Collectors.toList()); + } else if (partsTuple.get(0) instanceof Map) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart((Integer) part.get("field"), (String) part.get("type"))) + .collect(Collectors.toList()); + } + } + + return new TarantoolIndexMeta( + (Integer) tuple.get(1), + (String) tuple.get(2), + (String) tuple.get(3), + new IndexOptions((Boolean) optionsMap.get("unique")), + parts + ); + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public IndexOptions getOptions() { + return options; + } + + public List getParts() { + return parts; + } + + public static class IndexOptions { + private final boolean unique; + + public IndexOptions(boolean unique) { + this.unique = unique; + } + + public boolean isUnique() { + return unique; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IndexOptions that = (IndexOptions) o; + return unique == that.unique; + } + + @Override + public int hashCode() { + return Objects.hash(unique); + } + + } + + public static class IndexPart { + private final int fieldNumber; + private final String type; + + public IndexPart(int fieldNumber, String type) { + this.fieldNumber = fieldNumber; + this.type = type; + } + + public int getFieldNumber() { + return fieldNumber; + } + + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IndexPart indexPart = (IndexPart) o; + return fieldNumber == indexPart.fieldNumber && + Objects.equals(type, indexPart.type); + } + + @Override + public int hashCode() { + return Objects.hash(fieldNumber, type); + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java new file mode 100644 index 00000000..49406fae --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java @@ -0,0 +1,16 @@ +package org.tarantool.schema; + +public class TarantoolIndexNotFoundException extends TarantoolSchemaException { + + private final String indexName; + + public TarantoolIndexNotFoundException(String targetSpace, String indexName) { + super(targetSpace); + this.indexName = indexName; + } + + public String getIndexName() { + return indexName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaException.java b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java new file mode 100644 index 00000000..877f6c6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java @@ -0,0 +1,15 @@ +package org.tarantool.schema; + +public class TarantoolSchemaException extends RuntimeException { + + private final String schemaName; + + public TarantoolSchemaException(String schemaName) { + this.schemaName = schemaName; + } + + public String getSchemaName() { + return schemaName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java new file mode 100644 index 00000000..4dc98cd6 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java @@ -0,0 +1,86 @@ +package org.tarantool.schema; + +import org.tarantool.Iterator; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClientOps; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Schema storage. + *

+ * Keeps spaces and theirs attributes and indexes. + */ +public class TarantoolSchemaMeta { + + private static final int VSPACE_ID = 281; + private static final int VSPACE_ID_INDEX_ID = 0; + + private static final int VINDEX_ID = 289; + private static final int VINDEX_ID_INDEX_ID = 0; + + private static final int MAX_TUPLES = 65535; + + private TarantoolClientImpl client; + + private volatile Map cachedSpaces = new ConcurrentHashMap<>(); + + public TarantoolSchemaMeta(TarantoolClientImpl client) { + this.client = client; + } + + public TarantoolSpaceMeta getSpace(String spaceName) { + TarantoolSpaceMeta space = cachedSpaces.get(spaceName); + if (space == null) { + throw new TarantoolSpaceNotFoundException(spaceName); + } + return space; + } + + public TarantoolIndexMeta getSpaceIndex(String spaceName, String indexName) { + TarantoolIndexMeta index = getSpace(spaceName).getIndex(indexName); + if (index == null) { + throw new TarantoolIndexNotFoundException(spaceName, indexName); + } + return index; + } + + public synchronized void refresh() { + cachedSpaces = fetchSpaces() + .stream().collect( + Collectors.toConcurrentMap( + TarantoolSpaceMeta::getName, + Function.identity(), + (oldValue, newValue) -> newValue, + ConcurrentHashMap::new + ) + ); + } + + private List fetchSpaces() { + TarantoolClientOps, Object, List> clientOps = client.unsafeSchemaOps(); + + List spaces = clientOps + .select(VSPACE_ID, VSPACE_ID_INDEX_ID, Collections.emptyList(), 0, MAX_TUPLES, Iterator.EQ); + + Map>> indexesBySpace = clientOps + .select(VINDEX_ID, VINDEX_ID_INDEX_ID, Collections.emptyList(), 0, MAX_TUPLES, Iterator.EQ) + .stream() + .map(tuple -> (List) tuple) + .collect(Collectors.groupingBy(tuple -> (Integer) tuple.get(0))); + + return spaces.stream() + .map(tuple -> (List) tuple) + .map(tuple -> TarantoolSpaceMeta.fromTuple( + tuple, + indexesBySpace.getOrDefault((Integer) tuple.get(0), Collections.emptyList())) + ) + .collect(Collectors.toList()); + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java new file mode 100644 index 00000000..cb312c14 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java @@ -0,0 +1,94 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Keeps a space metadata. + */ +public class TarantoolSpaceMeta { + + private final int id; + private final String name; + private final String engine; + private final List format; + private final Map indexes; + + public static TarantoolSpaceMeta fromTuple(List spaceTuple, List> indexTuples) { + List fields = ((List>) spaceTuple.get(6)).stream() + .map(field -> new SpaceField(field.get("name").toString(), field.get("type").toString())) + .collect(Collectors.toList()); + + Map indexesMap = indexTuples.stream() + .map(TarantoolIndexMeta::fromTuple) + .collect(Collectors.toMap(TarantoolIndexMeta::getName, Function.identity())); + + return new TarantoolSpaceMeta( + (Integer) spaceTuple.get(0), + spaceTuple.get(2).toString(), + spaceTuple.get(3).toString(), + Collections.unmodifiableList(fields), + Collections.unmodifiableMap(indexesMap) + ); + } + + public TarantoolSpaceMeta(int id, + String name, + String engine, + List format, + Map indexes) { + this.id = id; + this.name = name; + this.engine = engine; + this.format = format; + this.indexes = indexes; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getEngine() { + return engine; + } + + public List getFormat() { + return format; + } + + public Map getIndexes() { + return indexes; + } + + public TarantoolIndexMeta getIndex(String indexName) { + return indexes.get(indexName); + } + + public static class SpaceField { + + private final String name; + private final String type; + + public SpaceField(String name, String type) { + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java new file mode 100644 index 00000000..28498e6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java @@ -0,0 +1,9 @@ +package org.tarantool.schema; + +public class TarantoolSpaceNotFoundException extends TarantoolSchemaException { + + public TarantoolSpaceNotFoundException(String spaceName) { + super(spaceName); + } + +} diff --git a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java index 087a4760..6aa2b10e 100644 --- a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java +++ b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java @@ -27,7 +27,7 @@ /** * Class with test cases for asynchronous operations - * + *

* NOTE: Parametrized tests can be simplified after * https://github.com/junit-team/junit5/issues/878 */ @@ -120,7 +120,7 @@ void testAsyncError(AsyncOpsProvider provider) { @MethodSource("getAsyncOps") void testOperations(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { - TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); + TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); List>> futures = new ArrayList<>(); @@ -145,7 +145,7 @@ void testOperations(AsyncOpsProvider provider) // Check the effects. checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); checkRawTupleResult(consoleSelect(20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(30), Collections.emptyList()); + assertEquals(Collections.emptyList(), consoleSelect(30)); provider.close(); } @@ -185,16 +185,201 @@ void testCall(AsyncOpsProvider provider) throws ExecutionException, InterruptedE provider.close(); } + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringSelect(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringInsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + Future> resultOne = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringReplace(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringDelete(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + testHelper.executeLua("box.space.basic_test:insert{20, '20'}"); + + Future> resultOne = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(1)); + + Future> resultTen = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(20)); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals(Collections.emptyList(), consoleSelect(1)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "10")); + assertEquals(Collections.emptyList(), consoleSelect(20)); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpdate(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(2), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(10), Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + assertEquals(Collections.emptyList(), consoleSelect(2)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(1), Arrays.asList(1, "001"), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(2), Arrays.asList(2, "002"), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(10), Arrays.asList(10, "010"), + Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(2), Arrays.asList(2, "002")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringMultipleInderectChanges(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{2, 'two'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(2), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(2, "two")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{3, 'three'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(3), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(3, "three")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + private List consoleSelect(Object key) { return testHelper.evaluate(TestUtils.toLuaSelect("basic_test", key)); } private interface AsyncOpsProvider { - TarantoolClientOps, Object, Future>> getAsyncOps(); + + TarantoolClientOps, Object, Future>> getAsyncOps(); TarantoolClient getClient(); void close(); + } private static class ClientAsyncOpsProvider implements AsyncOpsProvider { @@ -202,7 +387,7 @@ private static class ClientAsyncOpsProvider implements AsyncOpsProvider { TarantoolClient client = TestUtils.makeTestClient(TestUtils.makeDefaultClientConfig(), 2000); @Override - public TarantoolClientOps, Object, Future>> getAsyncOps() { + public TarantoolClientOps, Object, Future>> getAsyncOps() { return client.asyncOps(); } @@ -221,11 +406,11 @@ public void close() { private static class ComposableAsyncOpsProvider implements AsyncOpsProvider { TarantoolClient client = TestUtils.makeTestClient(TestUtils.makeDefaultClientConfig(), 2000); - TarantoolClientOps, Object, Future>> composableOps = + TarantoolClientOps, Object, Future>> composableOps = new Composable2FutureClientOpsAdapter(client.composableAsyncOps()); @Override - public TarantoolClientOps, Object, Future>> getAsyncOps() { + public TarantoolClientOps, Object, Future>> getAsyncOps() { return composableOps; } @@ -236,18 +421,18 @@ public TarantoolClient getClient() { @Override public void close() { - composableOps.close(); + client.close(); } } private static class Composable2FutureClientOpsAdapter - implements TarantoolClientOps, Object, Future>> { + implements TarantoolClientOps, Object, Future>> { - private final TarantoolClientOps, Object, CompletionStage>> originOps; + private final TarantoolClientOps, Object, CompletionStage>> originOps; private Composable2FutureClientOpsAdapter( - TarantoolClientOps, Object, CompletionStage>> originOps) { + TarantoolClientOps, Object, CompletionStage>> originOps) { this.originOps = originOps; } @@ -257,6 +442,11 @@ public Future> select(Integer space, Integer index, List key, int off return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, String index, List key, int offset, int limit, int iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> select(Integer space, Integer index, @@ -267,31 +457,66 @@ public Future> select(Integer space, return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, + String index, + List key, + int offset, + int limit, + Iterator iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> insert(Integer space, List tuple) { return originOps.insert(space, tuple).toCompletableFuture(); } + @Override + public Future> insert(String space, List tuple) { + return originOps.insert(space, tuple).toCompletableFuture(); + } + @Override public Future> replace(Integer space, List tuple) { return originOps.replace(space, tuple).toCompletableFuture(); } + @Override + public Future> replace(String space, List tuple) { + return originOps.replace(space, tuple).toCompletableFuture(); + } + @Override public Future> update(Integer space, List key, Object... tuple) { return originOps.update(space, key, tuple).toCompletableFuture(); } + @Override + public Future> update(String space, List key, Object... tuple) { + return originOps.update(space, key, tuple).toCompletableFuture(); + } + @Override public Future> upsert(Integer space, List key, List defTuple, Object... ops) { return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); } + @Override + public Future> upsert(String space, List key, List defTuple, Object... ops) { + return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); + } + @Override public Future> delete(Integer space, List key) { return originOps.delete(space, key).toCompletableFuture(); } + @Override + public Future> delete(String space, List key) { + return originOps.delete(space, key).toCompletableFuture(); + } + @Override public Future> call(String function, Object... args) { return originOps.call(function, args).toCompletableFuture(); @@ -311,6 +536,7 @@ public void ping() { public void close() { originOps.close(); } + } } diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java index c0228874..90c884e0 100644 --- a/src/test/java/org/tarantool/ClientReconnectClusterIT.java +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -120,7 +120,7 @@ void testUpdateExtendedNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = @@ -131,7 +131,7 @@ void testUpdateExtendedNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -139,7 +139,7 @@ void testUpdateExtendedNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1, srv2 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1, srv2 } expectConnected(client, spaceId, pkId); @@ -165,7 +165,7 @@ void testUpdateNarrowNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service1Address)); @@ -175,7 +175,7 @@ void testUpdateNarrowNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address, service2Address ); @@ -184,7 +184,7 @@ void testUpdateNarrowNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1, srv2 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1, srv2 }; wait for { srv1 } expectConnected(client, spaceId, pkId); @@ -349,7 +349,7 @@ void testDelayFunctionResultFetch() { String service2Address = "127.0.0.1:" + PORTS[1]; String service3Address = "localhost:" + PORTS[2]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddressesFunction"; String functionBody = Stream.of(service1Address, service2Address) @@ -367,7 +367,7 @@ void testDelayFunctionResultFetch() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 3000, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -375,16 +375,16 @@ void testDelayFunctionResultFetch() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1 } expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv1 }; wait for { srv2 } + tryAwait(phaser, 1); // client = { srv1 }; wait for { srv2 } stopInstancesAndAwait(SRV1); expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv2 }; wait for { srv3 } + tryAwait(phaser, 2); // client = { srv2 }; wait for { srv3 } stopInstancesAndAwait(SRV2); expectConnected(client, spaceId, pkId); @@ -393,9 +393,9 @@ void testDelayFunctionResultFetch() { expectDisconnected(client, spaceId, pkId); } - private void tryAwait(CyclicBarrier barrier) { + private void tryAwait(Phaser phaser, int phase) { try { - barrier.await(6000, TimeUnit.MILLISECONDS); + phaser.awaitAdvanceInterruptibly(phase, 6000, TimeUnit.MILLISECONDS); } catch (Throwable e) { e.printStackTrace(); } diff --git a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java index 97cf697a..71b91520 100644 --- a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java +++ b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java @@ -91,12 +91,12 @@ public void execute() throws Throwable { @Test public void testFireAndForgetOperations() { - TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); - Set syncIds = new HashSet(); + Set syncIds = new HashSet<>(); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); - syncIds.add(ffOps.delete(spaceId, Collections.singletonList(10))); + syncIds.add(ffOps.insert(spaceId, Arrays.asList(1, "1"))); + syncIds.add(ffOps.delete(spaceId, Collections.singletonList(1))); syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); syncIds.add(ffOps.update(spaceId, Collections.singletonList(10), Arrays.asList("=", 1, "ten"))); @@ -117,9 +117,40 @@ public void testFireAndForgetOperations() { client.syncOps().ping(); // Check the effects + assertEquals(consoleSelect(SPACE_NAME, 1), Collections.emptyList()); checkRawTupleResult(consoleSelect(SPACE_NAME, 10), Arrays.asList(10, "ten")); checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(SPACE_NAME, 30), Collections.emptyList()); + assertEquals(Collections.emptyList(), consoleSelect(SPACE_NAME, 30)); + } + + @Test + public void testFireAndForgetStringOperations() { + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); + + Set syncIds = new HashSet<>(); + + syncIds.add(ffOps.insert(SPACE_NAME, Arrays.asList(2, "2"))); + syncIds.add(ffOps.delete(SPACE_NAME, Collections.singletonList(2))); + + syncIds.add(ffOps.insert(SPACE_NAME, Arrays.asList(20, "20"))); + syncIds.add(ffOps.update(SPACE_NAME, Collections.singletonList(20), Arrays.asList("=", 1, "twenty"))); + + syncIds.add(ffOps.replace(SPACE_NAME, Arrays.asList(200, "200"))); + syncIds.add(ffOps.upsert(SPACE_NAME, Collections.singletonList(200), Arrays.asList(200, "two hundred"), + Arrays.asList("=", 1, "two hundred"))); + + // Check the syncs. + assertFalse(syncIds.contains(0L)); + assertEquals(6, syncIds.size()); + + // The reply for synchronous ping will + // indicate to us that previous fire & forget operations are completed. + client.syncOps().ping(); + + // Check the effects + assertEquals(consoleSelect(SPACE_NAME, 2), Collections.emptyList()); + checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 200), Arrays.asList(200, "two hundred")); } private List consoleSelect(String spaceName, Object key) { diff --git a/src/test/java/org/tarantool/IteratorTest.java b/src/test/java/org/tarantool/IteratorTest.java index 7fd68b61..0fed2352 100644 --- a/src/test/java/org/tarantool/IteratorTest.java +++ b/src/test/java/org/tarantool/IteratorTest.java @@ -10,13 +10,23 @@ import java.util.List; class IteratorTest { - protected class MockOps extends AbstractTarantoolOps, Object, List> { + protected class MockOps extends AbstractTarantoolOps, Object, List> { @Override - public List exec(Code code, Object... args) { + protected List exec(Code code, ResolvableArgument... args) { return null; } + @Override + protected int resolveSpace(String space) { + throw new UnsupportedOperationException(); + } + + @Override + protected int resolveSpaceIndex(String space, String index) { + throw new UnsupportedOperationException(); + } + @Override public void close() { throw new UnsupportedOperationException(); diff --git a/src/test/java/org/tarantool/TarantoolClientOpsIT.java b/src/test/java/org/tarantool/TarantoolClientOpsIT.java index ebaca9f5..5b2e9dec 100644 --- a/src/test/java/org/tarantool/TarantoolClientOpsIT.java +++ b/src/test/java/org/tarantool/TarantoolClientOpsIT.java @@ -256,7 +256,7 @@ public void testReplaceMultiPartKey(SyncOpsProvider provider) { provider.close(); } - private void checkReplace(TarantoolClientOps, Object, List> clientOps, + private void checkReplace(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -332,7 +332,7 @@ public void testUpdateMultiPart(SyncOpsProvider provider) { provider.close(); } - private void checkUpdate(TarantoolClientOps, Object, List> clientOps, + private void checkUpdate(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -391,7 +391,7 @@ public void testUpsertMultiPart(SyncOpsProvider provider) { provider.close(); } - private void checkUpsert(TarantoolClientOps, Object, List> clientOps, + private void checkUpsert(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -446,7 +446,7 @@ public void testDeleteMultiPartKey(SyncOpsProvider provider) { provider.close(); } - private void checkDelete(TarantoolClientOps, Object, List> clientOps, + private void checkDelete(TarantoolClientOps, Object, List> clientOps, String space, int spaceId, List key, @@ -663,7 +663,7 @@ private static TarantoolConnection makeConnection() { } private interface SyncOpsProvider { - TarantoolClientOps, Object, List> getClientOps(); + TarantoolClientOps, Object, List> getClientOps(); void close(); } @@ -673,7 +673,7 @@ private static class ClientSyncOpsProvider implements SyncOpsProvider { private TarantoolClient client = makeTestClient(makeDefaultClientConfig(), RESTART_TIMEOUT); @Override - public TarantoolClientOps, Object, List> getClientOps() { + public TarantoolClientOps, Object, List> getClientOps() { return client.syncOps(); } @@ -689,7 +689,7 @@ private static class ConnectionSyncOpsProvider implements SyncOpsProvider { private TarantoolConnection connection = makeConnection(); @Override - public TarantoolClientOps, Object, List> getClientOps() { + public TarantoolClientOps, Object, List> getClientOps() { return connection; } diff --git a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java index 4e940033..858e2143 100644 --- a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java +++ b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java @@ -9,7 +9,6 @@ import static org.tarantool.TestUtils.makeDiscoveryFunction; import org.tarantool.CommunicationException; -import org.tarantool.TarantoolClient; import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.TarantoolException; @@ -36,7 +35,7 @@ public class ClusterServiceStoredFunctionDiscovererIT { private static TarantoolTestHelper testHelper; private TarantoolClusterClientConfig clusterConfig; - private TarantoolClient client; + private TarantoolClientImpl client; @BeforeAll public static void setupEnv() { diff --git a/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java b/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java index 08c02d96..2848f192 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java +++ b/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java @@ -45,7 +45,7 @@ public class JdbcExceptionHandlingTest { @Test public void testDatabaseMetaDataGetPrimaryKeysFormatError() throws SQLException { - TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); + TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); Object[] spc = new Object[7]; spc[FORMAT_IDX] = Collections.singletonList(new HashMap()); @@ -196,7 +196,7 @@ public void execute() throws Throwable { private void checkDatabaseMetaDataCommunicationException(final ThrowingConsumer consumer, String msg) throws SQLException { Exception ex = new CommunicationException("TEST"); - TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); + TarantoolClientOps, Object, List> syncOps = mock(TarantoolClientOps.class); doThrow(ex).when(syncOps).select(_VSPACE, 0, Arrays.asList(), 0, SPACES_MAX, 0); doThrow(ex).when(syncOps).select(_VSPACE, 2, Arrays.asList("TEST"), 0, 1, 0); @@ -213,7 +213,7 @@ private void checkDatabaseMetaDataCommunicationException(final ThrowingConsumer< } private SQLTarantoolClientImpl buildSQLClient(SQLTarantoolClientImpl.SQLRawOps sqlOps, - TarantoolClientOps, Object, List> ops) { + TarantoolClientOps, Object, List> ops) { SQLTarantoolClientImpl client = mock(SQLTarantoolClientImpl.class); when(client.sqlRawOps()).thenReturn(sqlOps); when(client.syncOps()).thenReturn(ops); diff --git a/src/test/java/org/tarantool/schema/ClientSchemaIT.java b/src/test/java/org/tarantool/schema/ClientSchemaIT.java new file mode 100644 index 00000000..47bb3aa5 --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientSchemaIT.java @@ -0,0 +1,226 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.tarantool.TestUtils.makeDefaultClientConfig; + +import org.tarantool.ServerVersion; +import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolTestHelper; +import org.tarantool.TestAssumptions; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +@DisplayName("A schema meta") +public class ClientSchemaIT { + + private static TarantoolTestHelper testHelper; + + private TarantoolClientImpl client; + + @BeforeAll + public static void setupEnv() { + testHelper = new TarantoolTestHelper("client-schema-it"); + testHelper.createInstance(); + testHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + testHelper.stopInstance(); + } + + @BeforeEach + public void setup() { + TarantoolClientConfig config = makeDefaultClientConfig(); + + client = new TarantoolClientImpl( + TarantoolTestHelper.HOST + ":" + TarantoolTestHelper.PORT, + config + ); + } + + @AfterEach + public void tearDown() { + client.close(); + testHelper.executeLua("box.space.count_space and box.space.count_space:drop()"); + } + + @Test + @DisplayName("fetched a space with its index") + void testFetchSpaces() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + + List spaceFormat = space.getFormat(); + assertEquals(2, spaceFormat.size()); + assertEquals("id", spaceFormat.get(0).getName()); + assertEquals("integer", spaceFormat.get(0).getType()); + assertEquals("counts", spaceFormat.get(1).getName()); + assertEquals("integer", spaceFormat.get(1).getType()); + + TarantoolIndexMeta primaryIndex = space.getIndex("pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("fetched a space with its index") + void testFetchNewSpaces() { + // add count_space + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + + // add count_space_2 + testHelper.executeLua( + "box.schema.space.create('count_space_2', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + meta.refresh(); + space = meta.getSpace("count_space_2"); + assertNotNull(space); + assertEquals("count_space_2", space.getName()); + + // add a primary index for count_space_2 + testHelper.executeLua( + "box.space.count_space_2:create_index('pk', { unique = true, type = 'TREE', parts = {'id'} } )" + ); + meta.refresh(); + TarantoolIndexMeta spaceIndex = meta.getSpaceIndex("count_space_2", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, spaceIndex); + } + + @Test + @DisplayName("fetched space indexes of a space") + void testFetchIndexes() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua( + "box.space.count_space:create_index('pk', { type = 'HASH', parts = {'id'} } )", + "box.space.count_space:create_index('c_index', { unique = false, type = 'TREE', parts = {'counts'} } )" + ); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("count_space", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "HASH", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + + TarantoolIndexMeta secondaryIndex = meta.getSpaceIndex("count_space", "c_index"); + TarantoolIndexMeta expectedSecondaryIndex = new TarantoolIndexMeta( + 1, "c_index", "TREE", + new TarantoolIndexMeta.IndexOptions(false), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(1, "integer")) + ); + assertIndex(expectedSecondaryIndex, secondaryIndex); + } + + @Test + @DisplayName("fetched sql table primary index") + void testFetchSqlIndexes() { + TestAssumptions.assumeMinimalServerVersion(testHelper.getInstanceVersion(), ServerVersion.V_2_1); + testHelper.executeSql("create table my_table (id int primary key, val varchar(100))"); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("MY_TABLE", "pk_unnamed_MY_TABLE_1"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk_unnamed_MY_TABLE_1", "tree", + new TarantoolIndexMeta.IndexOptions(true), + Collections.singletonList(new TarantoolIndexMeta.IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("got an error with a wrong space name") + void tesGetUnknownSpace() { + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + TarantoolSpaceNotFoundException exception = assertThrows( + TarantoolSpaceNotFoundException.class, + () -> meta.getSpace("unknown_space") + ); + assertEquals("unknown_space", exception.getSchemaName()); + } + + @Test + @DisplayName("got an error with a wrong space index name") + void testGetUnknownSpaceIndex() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolSchemaMeta(client); + meta.refresh(); + + assertEquals("count_space", meta.getSpace("count_space").getName()); + TarantoolIndexNotFoundException exception = assertThrows( + TarantoolIndexNotFoundException.class, + () -> meta.getSpaceIndex("count_space", "wrong_pk") + ); + assertEquals("wrong_pk", exception.getIndexName()); + } + + private void assertIndex(TarantoolIndexMeta expectedIndex, TarantoolIndexMeta actualIndex) { + assertEquals(expectedIndex.getId(), actualIndex.getId()); + assertEquals(expectedIndex.getName(), actualIndex.getName()); + assertEquals(expectedIndex.getType(), actualIndex.getType()); + assertEquals(expectedIndex.getOptions(), actualIndex.getOptions()); + assertEquals(expectedIndex.getParts(), actualIndex.getParts()); + } + +}