Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add version verification #370

Merged
merged 11 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import com.facebook.thrift.protocol.TCompactProtocol;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransportException;
import com.google.common.base.Charsets;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.GetEdgeReq;
Expand All @@ -39,6 +41,8 @@
import com.vesoft.nebula.meta.Schema;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.meta.VerifyClientVersionReq;
import com.vesoft.nebula.meta.VerifyClientVersionResp;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -85,28 +89,41 @@ public MetaClient(List<HostAddress> addresses, int timeout, int connectionRetry,
this.addresses = addresses;
}

public void connect() throws TException {
public void connect()
throws TException, ClientServerIncompatibleException {
doConnect();
}

/**
* connect nebula meta server
*/
private void doConnect() throws TTransportException {
private void doConnect()
throws TTransportException, ClientServerIncompatibleException {
Random random = new Random(System.currentTimeMillis());
int position = random.nextInt(addresses.size());
HostAddress address = addresses.get(position);
getClient(address.getHost(), address.getPort());
}

private void getClient(String host, int port) throws TTransportException {
private void getClient(String host, int port)
throws TTransportException, ClientServerIncompatibleException {
transport = new TSocket(host, port, timeout, timeout);
transport.open();
protocol = new TCompactProtocol(transport);
client = new MetaService.Client(protocol);

// check if client version matches server version
VerifyClientVersionResp resp =
client.verifyClientVersion(new VerifyClientVersionReq());
if (resp.getCode() != ErrorCode.SUCCEEDED) {
client.getInputProtocol().getTransport().close();
throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
Charsets.UTF_8));
}
}

private void freshClient(HostAddr leader) throws TTransportException {
private void freshClient(HostAddr leader)
klay-ke marked this conversation as resolved.
Show resolved Hide resolved
throws TTransportException, ClientServerIncompatibleException {
close();
getClient(leader.getHost(), leader.getPort());
}
Expand All @@ -125,7 +142,8 @@ public void close() {
*
* @return
*/
public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedException {
public synchronized List<IdName> getSpaces()
klay-ke marked this conversation as resolved.
Show resolved Hide resolved
throws TException, ExecuteFailedException, ClientServerIncompatibleException {
int retry = RETRY_TIMES;
ListSpacesReq request = new ListSpacesReq();
ListSpacesResp response = null;
Expand Down Expand Up @@ -158,7 +176,7 @@ public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedExc
* @return SpaceItem
*/
public synchronized SpaceItem getSpace(String spaceName) throws TException,
ExecuteFailedException {
ExecuteFailedException, ClientServerIncompatibleException {
klay-ke marked this conversation as resolved.
Show resolved Hide resolved
int retry = RETRY_TIMES;
GetSpaceReq request = new GetSpaceReq();
request.setSpace_name(spaceName.getBytes());
Expand Down Expand Up @@ -192,7 +210,7 @@ public synchronized SpaceItem getSpace(String spaceName) throws TException,
* @return TagItem list
*/
public synchronized List<TagItem> getTags(String spaceName)
throws TException, ExecuteFailedException {
throws TException, ExecuteFailedException, ClientServerIncompatibleException {
int retry = RETRY_TIMES;

int spaceID = getSpace(spaceName).space_id;
Expand Down Expand Up @@ -229,7 +247,7 @@ public synchronized List<TagItem> getTags(String spaceName)
* @return Schema
*/
public synchronized Schema getTag(String spaceName, String tagName)
throws TException, ExecuteFailedException {
throws TException, ExecuteFailedException, ClientServerIncompatibleException {
int retry = RETRY_TIMES;
GetTagReq request = new GetTagReq();
int spaceID = getSpace(spaceName).getSpace_id();
Expand Down Expand Up @@ -268,7 +286,7 @@ public synchronized Schema getTag(String spaceName, String tagName)
* @return EdgeItem list
*/
public synchronized List<EdgeItem> getEdges(String spaceName)
throws TException, ExecuteFailedException {
throws TException, ExecuteFailedException, ClientServerIncompatibleException {
int retry = RETRY_TIMES;
int spaceID = getSpace(spaceName).getSpace_id();
ListEdgesReq request = new ListEdgesReq(spaceID);
Expand Down Expand Up @@ -303,7 +321,7 @@ public synchronized List<EdgeItem> getEdges(String spaceName)
* @return Schema
*/
public synchronized Schema getEdge(String spaceName, String edgeName)
throws TException, ExecuteFailedException {
throws TException, ExecuteFailedException, ClientServerIncompatibleException {
int retry = RETRY_TIMES;
GetEdgeReq request = new GetEdgeReq();
int spaceID = getSpace(spaceName).getSpace_id();
Expand Down Expand Up @@ -343,7 +361,7 @@ public synchronized Schema getEdge(String spaceName, String edgeName)
* @return
*/
public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
throws ExecuteFailedException, TException {
throws ExecuteFailedException, TException, ClientServerIncompatibleException {
int retry = RETRY_TIMES;
GetPartsAllocReq request = new GetPartsAllocReq();
int spaceID = getSpace(spaceName).getSpace_id();
Expand Down Expand Up @@ -375,7 +393,7 @@ public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
/**
* get all Storaged servers
*/
public synchronized Set<HostAddr> listHosts() {
public synchronized Set<HostAddr> listHosts() throws ClientServerIncompatibleException {
int retry = RETRY_TIMES;
ListHostsReq request = new ListHostsReq();
request.setType(ListHostType.STORAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.collect.Maps;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.IdName;
Expand Down Expand Up @@ -49,7 +50,8 @@ private class SpaceInfo {
/**
* init the meta info cache
*/
public MetaManager(List<HostAddress> address) throws TException {
public MetaManager(List<HostAddress> address)
throws TException, ClientServerIncompatibleException {
metaClient = new MetaClient(address);
metaClient.connect();
fillMetaInfo();
Expand Down Expand Up @@ -120,7 +122,7 @@ private void fillMetaInfo() {
} finally {
lock.writeLock().unlock();
}
} catch (TException | ExecuteFailedException e) {
} catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) {
LOGGER.error(e.getMessage());
}
}
Expand Down Expand Up @@ -330,7 +332,13 @@ public void updateLeader(String spaceName, int part, HostAddr newLeader)
* get all storage addresses
*/
public Set<HostAddr> listHosts() {
Set<HostAddr> hosts = metaClient.listHosts();
Set<HostAddr> hosts;
try {
hosts = metaClient.listHosts();
} catch (ClientServerIncompatibleException e) {
LOGGER.error("client version does not match server version");
return new HashSet<>();
}
if (hosts == null) {
return new HashSet<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.vesoft.nebula.client.meta;

import com.facebook.thrift.TException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.client.util.ProcessUtil;
import com.vesoft.nebula.meta.EdgeItem;
Expand Down Expand Up @@ -40,7 +41,7 @@ private void connect() {
metaClient = new MetaClient(address, port);
try {
metaClient.connect();
} catch (TException e) {
} catch (TException | ClientServerIncompatibleException e) {
e.printStackTrace();
assert (false);
}
Expand All @@ -51,7 +52,7 @@ public void testFailConnect() {
MetaClient client = new MetaClient(address, port);
try {
client.connect();
} catch (TException e) {
} catch (TException | ClientServerIncompatibleException e) {
assert (true);
}
}
Expand All @@ -61,7 +62,7 @@ public void testGetSpaces() {
List<IdName> spaces = metaClient.getSpaces();
assert (spaces.size() >= 1);
assert (metaClient.getSpace("testMeta") != null);
} catch (TException | ExecuteFailedException e) {
} catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) {
LOGGER.error(e.getMessage());
e.printStackTrace();
assert (false);
Expand All @@ -73,7 +74,7 @@ public void testGetTags() {
List<TagItem> tags = metaClient.getTags("testMeta");
Assert.assertTrue(tags.size() >= 1);
assert (metaClient.getTag("testMeta", "person") != null);
} catch (TException | ExecuteFailedException e) {
} catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) {
e.printStackTrace();
assert (false);
}
Expand All @@ -84,7 +85,7 @@ public void testGetEdges() {
List<EdgeItem> edges = metaClient.getEdges("testMeta");
Assert.assertTrue(edges.size() >= 1);
assert (metaClient.getEdge("testMeta", "friend") != null);
} catch (TException | ExecuteFailedException e) {
} catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) {
e.printStackTrace();
assert (false);
}
Expand All @@ -93,13 +94,13 @@ public void testGetEdges() {
public void testGetPartsAlloc() {
try {
assert (metaClient.getPartsAlloc("testMeta").size() == 10);
} catch (ExecuteFailedException | TException e) {
} catch (ExecuteFailedException | TException | ClientServerIncompatibleException e) {
e.printStackTrace();
assert (false);
}
}

public void testListHosts() {
public void testListHosts() throws ClientServerIncompatibleException {
if (metaClient == null) {
metaClient = new MetaClient(address, port);
}
Expand All @@ -122,7 +123,12 @@ public void testListOnlineHosts() {
if (metaClient == null) {
metaClient = new MetaClient(address, port);
}
assert (metaClient.listHosts().size() == 2);
try {
assert (metaClient.listHosts().size() == 2);
} catch (ClientServerIncompatibleException e) {
LOGGER.error("client version does not match server version", e);
assert (false);
}

try {
runtime.exec("docker start nebula-docker-compose_storaged0_1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void testGetSpaceParts() {
Assert.assertEquals(hostAddr.port, 4400);
}

public void testMultiVersionSchema() {
public void testMultiVersionSchema() throws ClientServerIncompatibleException {
MockNebulaGraph.createMultiVersionTagAndEdge();
metaManager.close();
metaManager = new MetaManager(
Expand Down