diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 6fc35d4cbae..6969877601d 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -224,7 +224,8 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority(), - conf.getKvMode()); + conf.getKvMode(), + conf.isReplicaRead()); } @Override @@ -237,7 +238,8 @@ public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority(), - conf.getKvMode())); + conf.getKvMode(), + conf.isReplicaRead())); Supplier request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build(); @@ -263,7 +265,8 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) { resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority(), - conf.getKvMode()); + conf.getKvMode(), + conf.isReplicaRead()); } @Override @@ -276,7 +279,8 @@ public Future getRegionByIDAsync(BackOffer backOffer, long id) { resp.getLeader(), conf.getIsolationLevel(), conf.getCommandPriority(), - conf.getKvMode())); + conf.getKvMode(), + conf.isReplicaRead())); Supplier request = () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); @@ -334,6 +338,11 @@ public List getAllStores(BackOffer backOffer) { .getStoresList(); } + @Override + public boolean isReplicaRead() { + return conf.isReplicaRead(); + } + @Override public void close() throws InterruptedException { etcdClient.close(); diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 9722abb7742..4e9c3f51779 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -63,4 +63,6 @@ public interface ReadOnlyPDClient { Future getStoreAsync(BackOffer backOffer, long storeId); List getAllStores(BackOffer backOffer); + + boolean isReplicaRead(); } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index ea716fc0524..80a78635666 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -52,6 +52,7 @@ public class TiConfiguration implements Serializable { private static final int DEF_KV_CLIENT_CONCURRENCY = 10; private static final KVMode DEF_KV_MODE = KVMode.TXN; private static final int DEF_RAW_CLIENT_CONCURRENCY = 200; + private static final boolean DEF_IS_REPLICA_READ = false; private int timeout = DEF_TIMEOUT; private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT; @@ -78,6 +79,7 @@ public class TiConfiguration implements Serializable { private int partitionPerSplit = DEF_PARTITION_PER_SPLIT; private int kvClientConcurrency = DEF_KV_CLIENT_CONCURRENCY; + private boolean isReplicaRead = DEF_IS_REPLICA_READ; public enum KVMode { TXN, @@ -314,4 +316,12 @@ public int getKvClientConcurrency() { public void setKvClientConcurrency(int kvClientConcurrency) { this.kvClientConcurrency = kvClientConcurrency; } + + public boolean isReplicaRead() { + return isReplicaRead; + } + + public void setReplicaRead(boolean isReplicaRead) { + this.isReplicaRead = isReplicaRead; + } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 1bfce3dfbba..7952658faf0 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -49,6 +49,7 @@ public class RegionManager { // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; + private final boolean isReplicaRead; private final Function cacheInvalidateCallback; @@ -57,11 +58,13 @@ public class RegionManager { public RegionManager( ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { this.cache = new RegionCache(pdClient); + this.isReplicaRead = pdClient.isReplicaRead(); this.cacheInvalidateCallback = cacheInvalidateCallback; } public RegionManager(ReadOnlyPDClient pdClient) { this.cache = new RegionCache(pdClient); + this.isReplicaRead = pdClient.isReplicaRead(); this.cacheInvalidateCallback = null; } @@ -112,8 +115,13 @@ public Pair getRegionStorePairByKey( Store store = null; if (storeType == TiStoreType.TiKV) { - Peer leader = region.getLeader(); - store = cache.getStoreById(leader.getStoreId(), backOffer); + if (isReplicaRead) { + Peer peer = region.getCurrentFollower(); + store = cache.getStoreById(peer.getStoreId(), backOffer); + } else { + Peer leader = region.getLeader(); + store = cache.getStoreById(leader.getStoreId(), backOffer); + } } else { outerLoop: for (Peer peer : region.getLearnerList()) { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 5333ec20a2b..d0c6fd7862c 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -79,7 +79,7 @@ private synchronized Boolean getIsV4() { private RegionStoreClient( TiConfiguration conf, TiRegion region, - Store store, + String storeVersion, TiStoreType storeType, ChannelFactory channelFactory, TikvBlockingStub blockingStub, @@ -93,7 +93,7 @@ private RegionStoreClient( if (this.storeType == TiStoreType.TiKV) { this.lockResolverClient = AbstractLockResolverClient.getInstance( - store, + storeVersion, conf, region, this.blockingStub, @@ -118,7 +118,7 @@ private RegionStoreClient( this.lockResolverClient = AbstractLockResolverClient.getInstance( - tikvStore, + tikvStore.getVersion(), conf, region, tikvBlockingStub, @@ -790,7 +790,8 @@ public List splitRegion(Iterable splitKeys) { null, conf.getIsolationLevel(), conf.getCommandPriority(), - conf.getKvMode())) + conf.getKvMode(), + conf.isReplicaRead())) .collect(Collectors.toList()); } @@ -1021,7 +1022,7 @@ public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeTy return new RegionStoreClient( conf, region, - store, + store.getVersion(), storeType, channelFactory, blockingStub, diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index ee4f41cdc28..f2969bf80ea 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -41,27 +41,48 @@ public class TiRegion implements Serializable { private final Region meta; private final IsolationLevel isolationLevel; private final Kvrpcpb.CommandPri commandPri; - private Peer peer; + private Peer leader; + private int followerIdx = 0; + private final boolean isReplicaRead; public TiRegion( Region meta, - Peer peer, + Peer leader, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode) { + this(meta, leader, isolationLevel, commandPri, kvMode, false); + } + + public TiRegion( + Region meta, + Peer leader, + IsolationLevel isolationLevel, + Kvrpcpb.CommandPri commandPri, + KVMode kvMode, + boolean isReplicaRead) { Objects.requireNonNull(meta, "meta is null"); this.meta = decodeRegion(meta, kvMode == KVMode.RAW); - if (peer == null || peer.getId() == 0) { + if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); } // region's first peer is leader. - this.peer = meta.getPeers(0); + this.leader = meta.getPeers(0); } else { - this.peer = peer; + this.leader = leader; + } + if (isReplicaRead && meta.getPeersCount() > 0) { + // try to get first follower + try { + getNextFollower(); + } catch (Exception ignore) { + // ignore + } } this.isolationLevel = isolationLevel; this.commandPri = commandPri; + this.isReplicaRead = isReplicaRead; } private Region decodeRegion(Region region, boolean isRawRegion) { @@ -89,7 +110,24 @@ private Region decodeRegion(Region region, boolean isRawRegion) { } public Peer getLeader() { - return peer; + return leader; + } + + public Peer getCurrentFollower() { + return meta.getPeers(followerIdx); + } + + public Peer getNextFollower() { + int cnt = meta.getPeersCount(); + for (int retry = cnt - 1; retry > 0; retry--) { + followerIdx = (followerIdx + 1) % cnt; + Peer cur = meta.getPeers(followerIdx); + if (cur.getIsLearner()) { + continue; + } + return cur; + } + return leader; } public List getLearnerList() { @@ -130,7 +168,18 @@ public Kvrpcpb.Context getContext(Set resolvedLocks) { Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); builder.setIsolationLevel(this.isolationLevel); builder.setPriority(this.commandPri); - builder.setRegionId(meta.getId()).setPeer(this.peer).setRegionEpoch(this.meta.getRegionEpoch()); + if (isReplicaRead) { + builder + .setRegionId(meta.getId()) + .setPeer(getCurrentFollower()) + .setReplicaRead(true) + .setRegionEpoch(this.meta.getRegionEpoch()); + } else { + builder + .setRegionId(meta.getId()) + .setPeer(this.leader) + .setRegionEpoch(this.meta.getRegionEpoch()); + } builder.addAllResolvedLocks(resolvedLocks); return builder.build(); } @@ -152,7 +201,7 @@ boolean switchPeer(long leaderStoreID) { List peers = meta.getPeersList(); for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { - this.peer = p; + this.leader = p; return true; } } @@ -186,7 +235,7 @@ public boolean contains(ByteString key) { } public boolean isValid() { - return peer != null && meta != null; + return leader != null && meta != null; } public Metapb.RegionEpoch getRegionEpoch() { @@ -204,14 +253,14 @@ public boolean equals(final Object another) { } TiRegion anotherRegion = ((TiRegion) another); return anotherRegion.meta.equals(this.meta) - && anotherRegion.peer.equals(this.peer) + && anotherRegion.leader.equals(this.leader) && anotherRegion.commandPri.equals(this.commandPri) && anotherRegion.isolationLevel.equals(this.isolationLevel); } @Override public int hashCode() { - return Objects.hash(meta, peer, isolationLevel, commandPri); + return Objects.hash(meta, leader, isolationLevel, commandPri); } @Override diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java index d5e00fc636c..44d3c4a71ca 100644 --- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java +++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java @@ -29,7 +29,6 @@ import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.kvproto.Kvrpcpb; -import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; public interface AbstractLockResolverClient { @@ -67,7 +66,7 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) { } static AbstractLockResolverClient getInstance( - Metapb.Store store, + String storeVersion, TiConfiguration conf, TiRegion region, TikvGrpc.TikvBlockingStub blockingStub, @@ -76,10 +75,10 @@ static AbstractLockResolverClient getInstance( RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { - if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V3) < 0) { + if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V3) < 0) { return new LockResolverClientV2( conf, region, blockingStub, asyncStub, channelFactory, regionManager); - } else if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V4) < 0) { + } else if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V4) < 0) { return new LockResolverClientV3( conf, region, diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 05c3068f60f..77a2cb0813d 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -69,7 +69,8 @@ private static ByteString getRandomValue() { @Before public void setup() throws IOException { try { - session = TiSession.create(TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS)); + TiConfiguration conf = TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS); + session = TiSession.create(conf); initialized = false; if (client == null) { client = session.createRawClient();