Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce protocol versioning #421

Merged
merged 6 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions common/exception/ErrorMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,42 @@ private ErrorMessage(String codePrefix, int codeNumber, String messagePrefix, St
}

public static class Client extends ErrorMessage {
public static final Client RPC_METHOD_UNAVAILABLE =
new Client(1, "The server does not support this method, please check the client-server compatibility:\n'%s'.");
Comment on lines +31 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message could be friendlier, something like
"The server does not support this method. Please ensure that the TypeDB Client and TypeDB Server versions are compatible ..."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will push an update, thank you!

public static final Client CLIENT_CLOSED =
new Client(1, "The client has been closed and no further operation is allowed.");
new Client(2, "The client has been closed and no further operation is allowed.");
public static final Client SESSION_CLOSED =
new Client(2, "The session has been closed and no further operation is allowed.");
new Client(3, "The session has been closed and no further operation is allowed.");
public static final Client TRANSACTION_CLOSED =
new Client(3, "The transaction has been closed and no further operation is allowed.");
new Client(4, "The transaction has been closed and no further operation is allowed.");
public static final Client TRANSACTION_CLOSED_WITH_ERRORS =
new Client(4, "The transaction has been closed with error(s): \n%s.");
new Client(5, "The transaction has been closed with error(s): \n%s.");
public static final Client UNABLE_TO_CONNECT =
new Client(5, "Unable to connect to TypeDB server.");
new Client(6, "Unable to connect to TypeDB server.");
public static final Client NEGATIVE_VALUE_NOT_ALLOWED =
new Client(6, "Value cannot be less than 1, was: '%d'.");
new Client(7, "Value cannot be less than 1, was: '%d'.");
public static final Client MISSING_DB_NAME =
new Client(7, "Database name cannot be null.");
new Client(8, "Database name cannot be null.");
public static final Client DB_DOES_NOT_EXIST =
new Client(8, "The database '%s' does not exist.");
new Client(9, "The database '%s' does not exist.");
public static final Client MISSING_RESPONSE =
new Client(9, "Unexpected empty response for request ID '%s'.");
new Client(10, "Unexpected empty response for request ID '%s'.");
public static final Client UNKNOWN_REQUEST_ID =
new Client(10, "Received a response with unknown request id '%s':\n%s");
new Client(11, "Received a response with unknown request id '%s':\n%s");
public static final Client CLUSTER_NO_PRIMARY_REPLICA_YET =
new Client(11, "No replica has been marked as the primary replica for latest known term '%d'.");
new Client(12, "No replica has been marked as the primary replica for latest known term '%d'.");
public static final Client CLUSTER_UNABLE_TO_CONNECT =
new Client(12, "Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '%s'.");
new Client(13, "Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '%s'.");
public static final Client CLUSTER_REPLICA_NOT_PRIMARY =
new Client(13, "The replica is not the primary replica.");
new Client(14, "The replica is not the primary replica.");
public static final Client CLUSTER_ALL_NODES_FAILED =
new Client(14, "Attempted connecting to all cluster members, but the following errors occurred: \n%s.");
new Client(15, "Attempted connecting to all cluster members, but the following errors occurred: \n%s.");
public static final Client CLUSTER_USER_DOES_NOT_EXIST =
new Client(15, "The user '%s' does not exist.");
new Client(16, "The user '%s' does not exist.");
public static final ErrorMessage CLUSTER_TOKEN_CREDENTIAL_INVALID =
new Client(16, "Invalid token credential.");
new Client(17, "Invalid token credential.");
public static final ErrorMessage CLUSTER_PASSWORD_CREDENTIAL_EXPIRED =
new Client(17, "Expired password credential.");
new Client(18, "Expired password credential.");

private static final String codePrefix = "CLI";
private static final String messagePrefix = "Client Error";
Expand Down
23 changes: 18 additions & 5 deletions common/exception/TypeDBClientException.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@

import javax.annotation.Nullable;

import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.CLUSTER_PASSWORD_CREDENTIAL_EXPIRED;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.CLUSTER_REPLICA_NOT_PRIMARY;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.CLUSTER_TOKEN_CREDENTIAL_INVALID;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.RPC_METHOD_UNAVAILABLE;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.UNABLE_TO_CONNECT;

public class TypeDBClientException extends RuntimeException {

// TODO: propagate exception from the server side in a less-brittle way
Expand All @@ -48,14 +54,16 @@ public TypeDBClientException(String message, Throwable cause) {
}

public static TypeDBClientException of(StatusRuntimeException sre) {
if (isRstStream(sre)) {
return new TypeDBClientException(ErrorMessage.Client.UNABLE_TO_CONNECT);
if (isUnimplementedMethod(sre)) {
return new TypeDBClientException(RPC_METHOD_UNAVAILABLE, sre.getStatus().getDescription());
} else if (isRstStream(sre)) {
return new TypeDBClientException(UNABLE_TO_CONNECT);
} else if (isReplicaNotPrimary(sre)) {
return new TypeDBClientException(ErrorMessage.Client.CLUSTER_REPLICA_NOT_PRIMARY);
return new TypeDBClientException(CLUSTER_REPLICA_NOT_PRIMARY);
} else if (isTokenCredentialInvalid(sre)) {
return new TypeDBClientException(ErrorMessage.Client.CLUSTER_TOKEN_CREDENTIAL_INVALID);
return new TypeDBClientException(CLUSTER_TOKEN_CREDENTIAL_INVALID);
} else if (isPasswordCredentialExpired(sre)) {
return new TypeDBClientException(ErrorMessage.Client.CLUSTER_PASSWORD_CREDENTIAL_EXPIRED);
return new TypeDBClientException(CLUSTER_PASSWORD_CREDENTIAL_EXPIRED);
} else {
return new TypeDBClientException(sre.getStatus().getDescription(), sre);
}
Expand Down Expand Up @@ -85,6 +93,11 @@ private static boolean isPasswordCredentialExpired(StatusRuntimeException status
statusRuntimeException.getStatus().getDescription() != null &&
statusRuntimeException.getStatus().getDescription().contains(CLUSTER_PASSWORD_CREDENTIAL_EXPIRED_ERROR_CODE);
}

private static boolean isUnimplementedMethod(StatusRuntimeException statusRuntimeException) {
return statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sufficient but not necessary condition for a client-server mismatch error. It's also possible that we send a request message with an RPC code that is implemented, but carries a different semantic meaning in this protocol version.

Do we perform some kind of handshake anywhere to verify the protocol version? I notice OpenReq now sends a version number. Does that mean the server will decline a connection request from an invalid client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, and it throws basically the equivalent error message if it is a mismatch :) between the two of them we should get pretty good coverage of client-protocol mismatches -> useful errors the users can resolve by themselves.

}

public String getName() {
return this.getClass().getName();
}
Expand Down
11 changes: 11 additions & 0 deletions common/rpc/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import com.vaticle.typedb.protocol.ClusterServerProto;
import com.vaticle.typedb.protocol.ClusterUserProto;
import com.vaticle.typedb.protocol.ConceptProto;
import com.vaticle.typedb.protocol.ConnectionProto;
import com.vaticle.typedb.protocol.CoreDatabaseProto;
import com.vaticle.typedb.protocol.LogicProto;
import com.vaticle.typedb.protocol.OptionsProto;
import com.vaticle.typedb.protocol.QueryProto;
import com.vaticle.typedb.protocol.SessionProto;
import com.vaticle.typedb.protocol.TransactionProto;
import com.vaticle.typedb.protocol.VersionProto;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -174,6 +176,15 @@ public static class Database {
}
}

public static class Connection {

public static ConnectionProto.Connection.Open.Req openReq() {
return ConnectionProto.Connection.Open.Req.newBuilder()
.setVersion(VersionProto.Version.VERSION)
.build();
}
}

public static class Session {

public static SessionProto.Session.Open.Req openReq(
Expand Down
5 changes: 5 additions & 0 deletions common/rpc/TypeDBStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.vaticle.typedb.client.common.rpc;

import com.vaticle.typedb.client.common.exception.TypeDBClientException;
import com.vaticle.typedb.protocol.ConnectionProto;
import com.vaticle.typedb.protocol.CoreDatabaseProto;
import com.vaticle.typedb.protocol.SessionProto;
import com.vaticle.typedb.protocol.TransactionProto;
Expand All @@ -35,6 +36,10 @@

public abstract class TypeDBStub {

public ConnectionProto.Connection.Open.Res connectionOpen(ConnectionProto.Connection.Open.Req request) {
return resilientCall(() -> blockingStub().connectionOpen(request));
}

public CoreDatabaseProto.CoreDatabaseManager.Contains.Res databasesContains(CoreDatabaseProto.CoreDatabaseManager.Contains.Req request) {
return resilientCall(() -> blockingStub().databasesContains(request));
}
Expand Down
13 changes: 0 additions & 13 deletions connection/TypeDBClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,6 @@ public static int calculateParallelisation() {
else return (int) Math.ceil(cores / 4.0);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
protected void validateConnectionOrThrow() throws TypeDBClientException { // TODO: we should throw checked exception
try {
// TODO: This is hacky patch. We know that databaseMgr.all() will throw an exception if connection has not been
// established. But we should replace this code to perform the check in a more meaningful way. This method
// should naturally be replaced once we implement a new client pulse architecture.
databaseMgr.all();
} catch (Exception e){
close();
throw e;
}
}

@Override
public boolean isOpen() {
return !channel().isShutdown();
Expand Down
6 changes: 2 additions & 4 deletions connection/cluster/ClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,15 @@ private Map<String, ClusterServerClient> createClients(TypeDBCredential credenti
Map<String, ClusterServerClient> clients = new HashMap<>();
boolean available = false;
for (String address : addresses) {
ClusterServerClient client = new ClusterServerClient(address, credential, parallelisation);
try {
client.validateConnectionOrThrow();
ClusterServerClient client = new ClusterServerClient(address, credential, parallelisation);
clients.put(address, client);
available = true;
} catch (TypeDBClientException e) {
// do nothing
}
clients.put(address, client);
}
if (!available) throw new TypeDBClientException(CLUSTER_UNABLE_TO_CONNECT, String.join(",", addresses));

return clients;
}

Expand Down
12 changes: 7 additions & 5 deletions connection/cluster/ClusterServerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;

import static com.vaticle.typedb.client.common.rpc.RequestBuilder.Cluster.ServerManager.allReq;
import static com.vaticle.typedb.client.common.rpc.RequestBuilder.Connection.openReq;
import static java.util.stream.Collectors.toSet;

class ClusterServerClient extends TypeDBClientImpl {
Expand All @@ -51,11 +52,12 @@ class ClusterServerClient extends TypeDBClientImpl {
this.address = address;
channel = createManagedChannel(address, credential);
stub = new ClusterServerStub(channel, credential);
}

@Override
public void validateConnectionOrThrow() throws TypeDBClientException {
super.validateConnectionOrThrow();
try {
stub.connectionOpen(openReq());
} catch (Exception e) {
close();
throw e;
}
}

private ManagedChannel createManagedChannel(String address, TypeDBCredential credential) {
Expand Down
9 changes: 8 additions & 1 deletion connection/core/CoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;

import static com.vaticle.typedb.client.common.rpc.RequestBuilder.Connection.openReq;

public class CoreClient extends TypeDBClientImpl {

private final ManagedChannel channel;
Expand All @@ -39,7 +41,12 @@ public CoreClient(String address, int parallelisation) {
super(parallelisation);
channel = NettyChannelBuilder.forTarget(address).usePlaintext().build();
stub = CoreStub.create(channel);
validateConnectionOrThrow();
try {
stub.connectionOpen(openReq());
} catch (Exception e) {
close();
throw e;
}
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions dependencies/vaticle/artifacts.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def vaticle_typedb_artifact():
artifact_name = "typedb-server-{platform}-{version}.{ext}",
tag_source = deployment["artifact.release"],
commit_source = deployment["artifact.snapshot"],
commit = "816ef97845a9577c2236db69dd94548b5869c083"
commit = "ad86a23fb78dda097a8b61b9d01ef68751cb14c1"
)

def vaticle_typedb_cluster_artifact():
Expand All @@ -39,5 +39,5 @@ def vaticle_typedb_cluster_artifact():
artifact_name = "typedb-cluster-all-{platform}-{version}.{ext}",
tag_source = deployment_private["artifact.release"],
commit_source = deployment_private["artifact.snapshot"],
commit = "8044753056dd20b8e6bec6f62036eb0003d4ba06",
commit = "6e00e0bff33c8c9c90f6393620dd30a9526748c3",
)
2 changes: 1 addition & 1 deletion dependencies/vaticle/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def vaticle_typedb_protocol():
git_repository(
name = "vaticle_typedb_protocol",
remote = "https://github.com/vaticle/typedb-protocol",
commit = "5c073e625770e812e4807419392d7f7261a553fb", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_protocol
commit = "3c844deff72f1411d8085584b1bdc7311bbeb37a", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_protocol
)

def vaticle_typedb_behaviour():
Expand Down