Skip to content

Commit

Permalink
Soft automatic schema reload
Browse files Browse the repository at this point in the history
Now client keeps actual schema metadata and sends schemaId header to be
checked against current Tarantool schema version. If client version
mismatches DB version client does schema reloading in the background.

Client operation interface was reworked in scope of support not only
number identifiers for spaces and indexes but also their string names.

This also includes set of request builders that can be used as a public
API to construct requests. The main idea here is to provide more
natural DSL-like approach to build operations instead of current
abstract types like List<?> or List<Object>.

Closes: #7, #137
Affects: #212
  • Loading branch information
nicktorwald committed Sep 6, 2019
1 parent 08e37a2 commit 39b7dfb
Show file tree
Hide file tree
Showing 58 changed files with 3,396 additions and 424 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,50 @@ all the results, you could override this:
protected void complete(TarantoolPacket packet, TarantoolOp<?> future);
```

## String space/index resolution

Each operation that requires space or index to be executed, can work with
number ID as well as string name of a space or an index.
Assume, we have `my_space` space with space ID `512` and its primary index
`primary` with index ID `0`. Then, for instance, `select` operations can be
performed as the following:

```java
client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ);
// or using more convenient way
client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ);
```

Because _iproto_ has not yet supported string spaces and indexes, a client caches current server
schema in memory. The client relies on protocol SCHEMA_ID and sends each request with respect to
cached schema version. The schema is used primarily to resolve string names of spaces or indexes
against its integer IDs.

### Schema update

1. Just after a (re-)connection to the Tarantool instance.
The client cannot guarantee that new instance is the same and has same schema,
thus, the client drops the cached schema and fetches new one;
2. receiving a schema version error as a response to our request.
It's possible some request can be rejected by server because of schema
mismatching between client and server. In this case the schema will be
reloaded and the refused request will be resent using the updated schema
version;
3. sending a DDL request and receiving a new version in a response;
4. sending a request against a non-existent space/index name.
The client cannot exactly know whether name was not found because of
it does not exist or it has not the latest schema version.

### Schema support caveats

1. Each schema reloading requires at least two extra requests to fetch spaces and
indexes metadata respectively. There is also a ping request followed by reloading
of the schema to check whether the client has outdated version (see point 4 in
Schema update section).
2. In some circumstance, requests can be rejected several times until both client's
and server's versions matches. It may take significant amount of time or even be
a cause of request timeout.

## Spring NamedParameterJdbcTemplate usage example

The JDBC driver uses `TarantoolClient` implementation to provide a communication with server.
Expand Down
150 changes: 123 additions & 27 deletions src/main/java/org/tarantool/AbstractTarantoolOps.java
Original file line number Diff line number Diff line change
@@ -1,62 +1,158 @@
package org.tarantool;

import static org.tarantool.dsl.Requests.callRequest;
import static org.tarantool.dsl.Requests.deleteRequest;
import static org.tarantool.dsl.Requests.evalRequest;
import static org.tarantool.dsl.Requests.insertRequest;
import static org.tarantool.dsl.Requests.pingRequest;
import static org.tarantool.dsl.Requests.replaceRequest;
import static org.tarantool.dsl.Requests.selectRequest;
import static org.tarantool.dsl.Requests.updateRequest;
import static org.tarantool.dsl.Requests.upsertRequest;

public abstract class AbstractTarantoolOps<Space, Tuple, Operation, Result>
implements TarantoolClientOps<Space, Tuple, Operation, Result> {
import org.tarantool.dsl.Operation;
import org.tarantool.dsl.TarantoolRequestConvertible;
import org.tarantool.schema.TarantoolSchemaMeta;

import java.util.Arrays;
import java.util.List;

public abstract class AbstractTarantoolOps<Result>
implements TarantoolClientOps<List<?>, Object, Result> {

private Code callCode = Code.CALL;

protected abstract Result exec(Code code, Object... args);
protected abstract Result exec(TarantoolRequest request);

protected abstract TarantoolSchemaMeta getSchemaMeta();

public Result select(Integer space, Integer index, List<?> key, int offset, int limit, Iterator iterator) {
return execute(
selectRequest(space, index)
.key(key)
.offset(offset).limit(limit)
.iterator(iterator)
);
}

@Override
public Result select(String space, String index, List<?> key, int offset, int limit, Iterator iterator) {
return execute(
selectRequest(space, index)
.key(key)
.offset(offset).limit(limit)
.iterator(iterator)
);
}

public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) {
return select(space, index, key, offset, limit, iterator.getValue());
@Override
public Result select(Integer space, Integer index, List<?> key, int offset, int limit, int iterator) {
return execute(
selectRequest(space, index)
.key(key)
.offset(offset).limit(limit)
.iterator(iterator)
);
}

public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) {
return exec(
Code.SELECT,
Key.SPACE, space,
Key.INDEX, index,
Key.KEY, key,
Key.ITERATOR, iterator,
Key.LIMIT, limit,
Key.OFFSET, offset
@Override
public Result select(String space, String index, List<?> key, int offset, int limit, int iterator) {
return execute(
selectRequest(space, index)
.key(key)
.offset(offset).limit(limit)
.iterator(iterator)
);
}

public Result insert(Space space, Tuple tuple) {
return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple);
@Override
public Result insert(Integer space, List<?> tuple) {
return execute(insertRequest(space, tuple));
}

@Override
public Result insert(String space, List<?> tuple) {
return execute(insertRequest(space, tuple));
}

public Result replace(Space space, Tuple tuple) {
return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple);
@Override
public Result replace(Integer space, List<?> tuple) {
return execute(replaceRequest(space, tuple));
}

public Result update(Space space, Tuple key, Operation... args) {
return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args);
@Override
public Result replace(String space, List<?> tuple) {
return execute(replaceRequest(space, tuple));
}

public Result upsert(Space space, Tuple key, Tuple def, Operation... args) {
return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args);
@Override
public Result update(Integer space, List<?> key, Object... operations) {
Operation[] ops = Arrays.stream(operations)
.map(Operation::fromArray)
.toArray(org.tarantool.dsl.Operation[]::new);
return execute(updateRequest(space, key, ops));
}

public Result delete(Space space, Tuple key) {
return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key);
@Override
public Result update(String space, List<?> key, Object... operations) {
Operation[] ops = Arrays.stream(operations)
.map(Operation::fromArray)
.toArray(org.tarantool.dsl.Operation[]::new);
return execute(updateRequest(space, key, ops));
}

@Override
public Result upsert(Integer space, List<?> key, List<?> defTuple, Object... operations) {
Operation[] ops = Arrays.stream(operations)
.map(Operation::fromArray)
.toArray(Operation[]::new);
return execute(upsertRequest(space, key, defTuple, ops));
}

@Override
public Result upsert(String space, List<?> key, List<?> defTuple, Object... operations) {
Operation[] ops = Arrays.stream(operations)
.map(Operation::fromArray)
.toArray(Operation[]::new);
return execute(upsertRequest(space, key, defTuple, ops));
}

@Override
public Result delete(Integer space, List<?> key) {
return execute(deleteRequest(space, key));
}

@Override
public Result delete(String space, List<?> key) {
return execute(deleteRequest(space, key));
}

@Override
public Result call(String function, Object... args) {
return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args);
return execute(
callRequest(function)
.arguments(args)
.useCall16(callCode == Code.OLD_CALL)
);
}

@Override
public Result eval(String expression, Object... args) {
return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args);
return execute(evalRequest(expression).arguments(args));
}

@Override
public void ping() {
exec(Code.PING);
execute(pingRequest());
}

@Override
public Result execute(TarantoolRequestConvertible requestSpec) {
return exec(requestSpec.toTarantoolRequest(getSchemaMeta()));
}

public void setCallCode(Code callCode) {
this.callCode = callCode;
}

}
10 changes: 10 additions & 0 deletions src/main/java/org/tarantool/Iterator.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tarantool;

import java.util.Arrays;

// Iterator info was taken from here https://github.com/tarantool/tarantool/blob/f66584c3bcdffe61d6d99a4868a9b72d62338a11/src/box/iterator_type.h#L62
public enum Iterator {
EQ(0), // key == x ASC order
Expand All @@ -24,4 +26,12 @@ public enum Iterator {
public int getValue() {
return value;
}

public static Iterator valueOf(int value) {
return Arrays.stream(Iterator.values())
.filter(v -> value == v.getValue())
.findFirst()
.orElseThrow(IllegalArgumentException::new);
}

}
7 changes: 7 additions & 0 deletions src/main/java/org/tarantool/MsgPackLite.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tarantool;

import org.tarantool.util.TupleTwo;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -226,6 +228,11 @@ public void pack(Object item, OutputStream os) throws IOException {
pack(kvp.getKey(), out);
pack(kvp.getValue(), out);
}
} else if (item instanceof TupleTwo) {
TupleTwo<?, ?> tuple = (TupleTwo<?, ?>) item;
out.write(1 | MP_FIXMAP);
pack(tuple.getFirst(), out);
pack(tuple.getSecond(), out);
} else {
throw new IllegalArgumentException("Cannot msgpack object of type " + item.getClass().getCanonicalName());
}
Expand Down
79 changes: 79 additions & 0 deletions src/main/java/org/tarantool/RequestArguments.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.tarantool;

import java.util.Objects;
import java.util.function.Supplier;

/**
* Request argument factory.
*
* @see TarantoolRequestArgument
*/
public class RequestArguments {

private RequestArguments() {
}

public static TarantoolRequestArgument value(Object value) {
return new SimpleArgument(value);
}

public static TarantoolRequestArgument cacheLookupValue(Supplier<Object> supplier) {
return new LookupArgument(supplier);
}

/**
* Simple wrapper that holds the original value.
*/
private static class SimpleArgument implements TarantoolRequestArgument {

private Object value;

SimpleArgument(Object value) {
Objects.requireNonNull(value);
this.value = value;
}

@Override
public boolean isSerializable() {
return true;
}

@Override
public Object getValue() {
return value;
}

}

/**
* Wrapper that evaluates the value each time
* it is requested.
* <p>
* It works like a function, where {@code argument = f(key)}.
*/
private static class LookupArgument implements TarantoolRequestArgument {

Supplier<Object> lookup;

LookupArgument(Supplier<Object> lookup) {
this.lookup = Objects.requireNonNull(lookup);
}

@Override
public boolean isSerializable() {
try {
lookup.get();
} catch (Exception ignored) {
return false;
}
return true;
}

@Override
public synchronized Object getValue() {
return lookup.get();
}

}

}
13 changes: 1 addition & 12 deletions src/main/java/org/tarantool/TarantoolBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Result> {
protected String serverVersion;
protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE;
protected AtomicLong syncId = new AtomicLong();
Expand Down Expand Up @@ -42,16 +41,6 @@ protected void closeChannel(SocketChannel channel) {
}
}

protected void validateArgs(Object[] args) {
if (args != null) {
for (int i = 0; i < args.length; i += 2) {
if (args[i + 1] == null) {
throw new NullPointerException(((Key) args[i]).name() + " should not be null");
}
}
}
}

public void setInitialRequestSize(int initialRequestSize) {
this.initialRequestSize = initialRequestSize;
}
Expand Down
Loading

0 comments on commit 39b7dfb

Please sign in to comment.