Skip to content

Commit

Permalink
Soft automatic schema reload
Browse files Browse the repository at this point in the history
Now client keeps actual schema metadata and sends schemaId header to be
checked against current Tarantool schema version. If client version
mismatches DB version client does schema reloading in the background.

Client operation interface was reworked in scope of support not only
number identifiers for spaces and indexes but also their string names.

Closes: #7, #137
  • Loading branch information
nicktorwald committed Jun 19, 2019
1 parent b53e0ba commit 178337c
Show file tree
Hide file tree
Showing 27 changed files with 1,436 additions and 205 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ makes possible for the client to configure a socket provider.
* `ComposableAsyncOps` - return the operation result as a `CompletionStage`
* `FireAndForgetOps` - returns the query ID

Each operation that requires space or index to be executed, can work with
number ID as well as string name of a space or an index.
Assume, we have `my_space` space with space ID `512` and its primary index
`primary` with index ID `0`. Then, for instance, `select` operations can be
performed as the following:

```java
client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ);
// or using more convenient way
client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ);
```

Feel free to override any method of `TarantoolClientImpl`. For example, to hook
all the results, you could override this:

Expand Down
201 changes: 178 additions & 23 deletions src/main/java/org/tarantool/AbstractTarantoolOps.java
Original file line number Diff line number Diff line change
@@ -1,62 +1,217 @@
package org.tarantool;

import static org.tarantool.AbstractTarantoolOps.ResolvableArgument.ofResolved;
import static org.tarantool.AbstractTarantoolOps.ResolvableArgument.ofUnresolved;

public abstract class AbstractTarantoolOps<Space, Tuple, Operation, Result>
implements TarantoolClientOps<Space, Tuple, Operation, Result> {
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Stream;

public abstract class AbstractTarantoolOps<Tuple, Operation, Result>
implements TarantoolClientOps<Tuple, Operation, Result> {

private final Function<Object, Object> defaultSpaceResolver = space -> resolveSpace((String) space);

private Code callCode = Code.CALL;

protected abstract Result exec(Code code, Object... args);
protected Result exec(Code code, Object... args) {
return exec(code, ResolvableArgument.ofAllResolved(args));
}

protected abstract Result exec(Code code, ResolvableArgument... args);

protected abstract int resolveSpace(String space);

protected abstract int resolveSpaceIndex(String space, String index);

protected static class ResolvableArgument {

private final Object value;
private final Function<Object, Object> resolver;

private ResolvableArgument(Object value, Function<Object, Object> resolver) {
Objects.requireNonNull(value);
this.value = value;
this.resolver = resolver;
}

public static ResolvableArgument ofResolved(Object resolvedValue) {
return new ResolvableArgument(resolvedValue, null);
}

public static ResolvableArgument[] ofAllResolved(Object[] resolvedValue) {
return Stream.of(resolvedValue).map(ResolvableArgument::ofResolved).toArray(ResolvableArgument[]::new);
}

public static ResolvableArgument ofUnresolved(Object unresolvedValue, Function<Object, Object> resolver) {
return new ResolvableArgument(unresolvedValue, resolver);
}

public boolean isUnresolved() {
try {
return isResolvable() && resolver.apply(value) == null;
} catch (Exception ignored) {
return true;
}
}

public boolean isResolvable() {
return resolver != null;
}

public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) {
public Object getValue() {
return isResolvable() ? resolver.apply(value) : value;
}

@Override
public String toString() {
if (isUnresolved()) {
return "{unresolved: " + value + "}";
}
return value.toString();
}

}

@Override
public Result select(Integer space, Integer index, Tuple key, int offset, int limit, Iterator iterator) {
return select(space, index, key, offset, limit, iterator.getValue());
}

public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) {
return exec(
Code.SELECT,
Key.SPACE, space,
Key.INDEX, index,
Key.KEY, key,
Key.ITERATOR, iterator,
Key.LIMIT, limit,
Key.OFFSET, offset
@Override
public Result select(String space, String index, Tuple key, int offset, int limit, Iterator iterator) {
return select(space, index, key, offset, limit, iterator.getValue());
}

@Override
public Result select(Integer space, Integer index, Tuple key, int offset, int limit, int iterator) {
return doSelect(ofResolved(space), ofResolved(index), key, offset, limit, iterator);
}

@Override
public Result select(String space, String index, Tuple key, int offset, int limit, int iterator) {
return doSelect(
ofUnresolved(space, defaultSpaceResolver),
ofUnresolved(index, indexName -> resolveSpaceIndex(space, (String) indexName)),
key, offset, limit, iterator
);
}

public Result insert(Space space, Tuple tuple) {
return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple);
@Override
public Result insert(Integer space, Tuple tuple) {
return doInsert(ofResolved(space), tuple);
}

public Result replace(Space space, Tuple tuple) {
return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple);
@Override
public Result insert(String space, Tuple tuple) {
return doInsert(ofUnresolved(space, defaultSpaceResolver), tuple);
}

public Result update(Space space, Tuple key, Operation... args) {
return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args);
@Override
public Result replace(Integer space, Tuple tuple) {
return doReplace(ofResolved(space), tuple);
}

public Result upsert(Space space, Tuple key, Tuple def, Operation... args) {
return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args);
@Override
public Result replace(String space, Tuple tuple) {
return doReplace(ofUnresolved(space, defaultSpaceResolver), tuple);
}

public Result delete(Space space, Tuple key) {
return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key);
@Override
public Result update(Integer space, Tuple key, Operation... args) {
return doUpdate(ofResolved(space), key, args);
}

@Override
public Result update(String space, Tuple key, Operation... tuple) {
return doUpdate(ofUnresolved(space, defaultSpaceResolver), key, tuple);
}

@Override
public Result upsert(Integer space, Tuple key, Tuple defTuple, Operation... ops) {
return doUpsert(ofResolved(space), key, defTuple, ops);
}

@Override
public Result upsert(String space, Tuple key, Tuple defTuple, Operation... ops) {
return doUpsert(ofUnresolved(space, defaultSpaceResolver), key, defTuple, ops);
}

@Override
public Result delete(Integer space, Tuple key) {
return doDelete(ofResolved(space), key);
}

@Override
public Result delete(String space, Tuple key) {
return doDelete(ofUnresolved(space, defaultSpaceResolver), key);
}

@Override
public Result call(String function, Object... args) {
return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args);
}

@Override
public Result eval(String expression, Object... args) {
return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args);
}

@Override
public void ping() {
exec(Code.PING);
}

public void setCallCode(Code callCode) {
this.callCode = callCode;
}

private Result doDelete(ResolvableArgument space, Tuple key) {
return exec(Code.DELETE, ofResolved(Key.SPACE), space, ofResolved(Key.KEY), ofResolved(key));
}

private Result doUpsert(ResolvableArgument space, Tuple key, Tuple defTuple, Operation... ops) {
return exec(
Code.UPSERT,
ofResolved(Key.SPACE), space,
ofResolved(Key.KEY), ofResolved(key),
ofResolved(Key.TUPLE), ofResolved(defTuple),
ofResolved(Key.UPSERT_OPS), ofResolved(ops)
);
}

private Result doUpdate(ResolvableArgument space, Tuple key, Operation... ops) {
return exec(
Code.UPDATE,
ofResolved(Key.SPACE), space,
ofResolved(Key.KEY), ofResolved(key),
ofResolved(Key.TUPLE), ofResolved(ops)
);
}

private Result doReplace(ResolvableArgument space, Tuple tuple) {
return exec(Code.REPLACE, ofResolved(Key.SPACE), space, ofResolved(Key.TUPLE), ofResolved(tuple));
}

private Result doInsert(ResolvableArgument space, Tuple tuple) {
return exec(Code.INSERT, ofResolved(Key.SPACE), space, ofResolved(Key.TUPLE), ofResolved(tuple));
}

private Result doSelect(ResolvableArgument space,
ResolvableArgument index,
Tuple key,
int offset,
int limit,
int iterator) {
return exec(
Code.SELECT,
ofResolved(Key.SPACE), space,
ofResolved(Key.INDEX), index,
ofResolved(Key.KEY), ofResolved(key),
ofResolved(Key.ITERATOR), ofResolved(iterator),
ofResolved(Key.LIMIT), ofResolved(limit),
ofResolved(Key.OFFSET), ofResolved(offset)
);
}

}
12 changes: 1 addition & 11 deletions src/main/java/org/tarantool/TarantoolBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<List<?>, Object, Result> {
protected String serverVersion;
protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE;
protected AtomicLong syncId = new AtomicLong();
Expand Down Expand Up @@ -42,16 +42,6 @@ protected void closeChannel(SocketChannel channel) {
}
}

protected void validateArgs(Object[] args) {
if (args != null) {
for (int i = 0; i < args.length; i += 2) {
if (args[i + 1] == null) {
throw new NullPointerException(((Key) args[i]).name() + " should not be null");
}
}
}
}

public void setInitialRequestSize(int initialRequestSize) {
this.initialRequestSize = initialRequestSize;
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/tarantool/TarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import java.util.concurrent.TimeUnit;

public interface TarantoolClient {
TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps();
TarantoolClientOps<List<?>, Object, List<?>> syncOps();

TarantoolClientOps<Integer, List<?>, Object, Future<List<?>>> asyncOps();
TarantoolClientOps<List<?>, Object, Future<List<?>>> asyncOps();

TarantoolClientOps<Integer, List<?>, Object, CompletionStage<List<?>>> composableAsyncOps();
TarantoolClientOps<List<?>, Object, CompletionStage<List<?>>> composableAsyncOps();

TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps();
TarantoolClientOps<List<?>, Object, Long> fireAndForgetOps();

TarantoolSQLOps<Object, Long, List<Map<String, Object>>> sqlSyncOps();

Expand Down
Loading

0 comments on commit 178337c

Please sign in to comment.