Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #616] check store version while using API V2 (#617) #620

Merged
merged 1 commit into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public TiSession(TiConfiguration conf) {
}

this.client = PDClient.createRaw(conf, keyCodec, channelFactory);
if (conf.getApiVersion().isV2() && !StoreVersion.minTiKVVersion(Version.API_V2, client)) {
throw new IllegalStateException(
"With API v2, store versions should not older than " + Version.API_V2);
}

this.enableGrpcForward = conf.getEnableGrpcForward();
if (this.enableGrpcForward) {
logger.info("enable grpc forward for high available");
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public class Version {
public static final String RESOLVE_LOCK_V4 = "4.0.0";

public static final String BATCH_WRITE = "3.0.14";

public static final String API_V2 = "6.1.0";
}
11 changes: 11 additions & 0 deletions src/test/java/org/tikv/common/ApiVersionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,15 @@ public void testAccessV1ClusterWithTtl() throws InterruptedException {
Assert.assertNotNull(e);
}
}

@Test
public void testAccessOldVersionClusterWithV2() {
Assume.assumeFalse(minTiKVVersion("6.1.0"));

try (RawKVClient client = createRawClient(ApiVersion.V2)) {
Assert.fail("Should not create V2 client while store version is less than 6.1.0");
} catch (Exception e) {
Assert.assertNotNull(e);
}
}
}
11 changes: 8 additions & 3 deletions src/test/java/org/tikv/common/MockThreeStoresTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.GetAllStoresResponse;

public class MockThreeStoresTest extends PDMockServerTest {

Expand Down Expand Up @@ -68,17 +69,17 @@ public void setup() throws IOException {
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + ports[0])
.setVersion("5.0.0")
.setVersion(Version.API_V2)
.setId(0x1)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + ports[1])
.setVersion("5.0.0")
.setVersion(Version.API_V2)
.setId(0x2)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + ports[2])
.setVersion("5.0.0")
.setVersion(Version.API_V2)
.setId(0x3)
.build());

Expand All @@ -94,6 +95,10 @@ public void setup() throws IOException {
int i = (int) request.getStoreId() - 1;
return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build();
});
server.addGetAllStoresListener(
request -> {
return GetAllStoresResponse.newBuilder().addAllStores(stores).build();
});
}

this.region =
Expand Down
16 changes: 16 additions & 0 deletions src/test/java/org/tikv/common/PDClientV2MockTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.tikv.common;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import org.junit.Assert;
Expand All @@ -27,14 +28,29 @@
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.GetAllStoresResponse;
import org.tikv.kvproto.Pdpb.GetRegionResponse;
import org.tikv.kvproto.Pdpb.Region;
import org.tikv.kvproto.Pdpb.ScanRegionsResponse;

public class PDClientV2MockTest extends PDMockServerTest {
@Before
public void init() throws Exception {
leader.addGetAllStoresListener(
request -> {
return GetAllStoresResponse.newBuilder()
.addAllStores(
ImmutableList.of(
Store.newBuilder()
.setId(0x1)
.setState(StoreState.Up)
.setVersion(Version.API_V2)
.build()))
.build();
});
upgradeToV2Cluster();
}

Expand Down
18 changes: 18 additions & 0 deletions src/test/java/org/tikv/common/PDMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Optional;
import java.util.function.Function;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.Pdpb.GetAllStoresRequest;
import org.tikv.kvproto.Pdpb.GetAllStoresResponse;
import org.tikv.kvproto.Pdpb.GetMembersRequest;
import org.tikv.kvproto.Pdpb.GetMembersResponse;
import org.tikv.kvproto.Pdpb.GetRegionByIDRequest;
Expand All @@ -54,6 +56,8 @@ public class PDMockServer extends PDGrpc.PDImplBase {

private Function<ScanRegionsRequest, ScanRegionsResponse> scanRegionsListener;

private Function<GetAllStoresRequest, GetAllStoresResponse> getAllStoresListener;

public void addGetMembersListener(Function<GetMembersRequest, GetMembersResponse> func) {
getMembersListener = func;
}
Expand Down Expand Up @@ -144,6 +148,20 @@ public void scanRegions(ScanRegionsRequest request, StreamObserver<ScanRegionsRe
}
}

public void addGetAllStoresListener(Function<GetAllStoresRequest, GetAllStoresResponse> func) {
getAllStoresListener = func;
}

@Override
public void getAllStores(GetAllStoresRequest request, StreamObserver<GetAllStoresResponse> resp) {
try {
resp.onNext(Optional.ofNullable(getAllStoresListener.apply(request)).get());
resp.onCompleted();
} catch (Exception e) {
resp.onError(Status.INTERNAL.asRuntimeException());
}
}

public void start(long clusterId) throws IOException {
int port;
try (ServerSocket s = new ServerSocket(0)) {
Expand Down