Skip to content

Commit

Permalink
Support follower read (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm authored Nov 25, 2020
1 parent 9dc52d8 commit 0d15c52
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 27 deletions.
17 changes: 13 additions & 4 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
resp.getLeader(),
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode());
conf.getKvMode(),
conf.isReplicaRead());
}

@Override
Expand All @@ -237,7 +238,8 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
resp.getLeader(),
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode()));
conf.getKvMode(),
conf.isReplicaRead()));
Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();

Expand All @@ -263,7 +265,8 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
resp.getLeader(),
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode());
conf.getKvMode(),
conf.isReplicaRead());
}

@Override
Expand All @@ -276,7 +279,8 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
resp.getLeader(),
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode()));
conf.getKvMode(),
conf.isReplicaRead()));

Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
Expand Down Expand Up @@ -334,6 +338,11 @@ public List<Store> getAllStores(BackOffer backOffer) {
.getStoresList();
}

@Override
public boolean isReplicaRead() {
return conf.isReplicaRead();
}

@Override
public void close() throws InterruptedException {
etcdClient.close();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ public interface ReadOnlyPDClient {
Future<Store> getStoreAsync(BackOffer backOffer, long storeId);

List<Store> getAllStores(BackOffer backOffer);

boolean isReplicaRead();
}
10 changes: 10 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TiConfiguration implements Serializable {
private static final int DEF_KV_CLIENT_CONCURRENCY = 10;
private static final KVMode DEF_KV_MODE = KVMode.TXN;
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;
private static final boolean DEF_IS_REPLICA_READ = false;

private int timeout = DEF_TIMEOUT;
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
Expand All @@ -78,6 +79,7 @@ public class TiConfiguration implements Serializable {
private int partitionPerSplit = DEF_PARTITION_PER_SPLIT;

private int kvClientConcurrency = DEF_KV_CLIENT_CONCURRENCY;
private boolean isReplicaRead = DEF_IS_REPLICA_READ;

public enum KVMode {
TXN,
Expand Down Expand Up @@ -314,4 +316,12 @@ public int getKvClientConcurrency() {
public void setKvClientConcurrency(int kvClientConcurrency) {
this.kvClientConcurrency = kvClientConcurrency;
}

public boolean isReplicaRead() {
return isReplicaRead;
}

public void setReplicaRead(boolean isReplicaRead) {
this.isReplicaRead = isReplicaRead;
}
}
12 changes: 10 additions & 2 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RegionManager {
// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
private final RegionCache cache;
private final boolean isReplicaRead;

private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;

Expand All @@ -57,11 +58,13 @@ public class RegionManager {
public RegionManager(
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache(pdClient);
this.isReplicaRead = pdClient.isReplicaRead();
this.cacheInvalidateCallback = cacheInvalidateCallback;
}

public RegionManager(ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient);
this.isReplicaRead = pdClient.isReplicaRead();
this.cacheInvalidateCallback = null;
}

Expand Down Expand Up @@ -112,8 +115,13 @@ public Pair<TiRegion, Store> getRegionStorePairByKey(

Store store = null;
if (storeType == TiStoreType.TiKV) {
Peer leader = region.getLeader();
store = cache.getStoreById(leader.getStoreId(), backOffer);
if (isReplicaRead) {
Peer peer = region.getCurrentFollower();
store = cache.getStoreById(peer.getStoreId(), backOffer);
} else {
Peer leader = region.getLeader();
store = cache.getStoreById(leader.getStoreId(), backOffer);
}
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private synchronized Boolean getIsV4() {
private RegionStoreClient(
TiConfiguration conf,
TiRegion region,
Store store,
String storeVersion,
TiStoreType storeType,
ChannelFactory channelFactory,
TikvBlockingStub blockingStub,
Expand All @@ -93,7 +93,7 @@ private RegionStoreClient(
if (this.storeType == TiStoreType.TiKV) {
this.lockResolverClient =
AbstractLockResolverClient.getInstance(
store,
storeVersion,
conf,
region,
this.blockingStub,
Expand All @@ -118,7 +118,7 @@ private RegionStoreClient(

this.lockResolverClient =
AbstractLockResolverClient.getInstance(
tikvStore,
tikvStore.getVersion(),
conf,
region,
tikvBlockingStub,
Expand Down Expand Up @@ -790,7 +790,8 @@ public List<TiRegion> splitRegion(Iterable<ByteString> splitKeys) {
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode()))
conf.getKvMode(),
conf.isReplicaRead()))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -1021,7 +1022,7 @@ public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeTy
return new RegionStoreClient(
conf,
region,
store,
store.getVersion(),
storeType,
channelFactory,
blockingStub,
Expand Down
71 changes: 60 additions & 11 deletions src/main/java/org/tikv/common/region/TiRegion.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,48 @@ public class TiRegion implements Serializable {
private final Region meta;
private final IsolationLevel isolationLevel;
private final Kvrpcpb.CommandPri commandPri;
private Peer peer;
private Peer leader;
private int followerIdx = 0;
private final boolean isReplicaRead;

public TiRegion(
Region meta,
Peer peer,
Peer leader,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode) {
this(meta, leader, isolationLevel, commandPri, kvMode, false);
}

public TiRegion(
Region meta,
Peer leader,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode,
boolean isReplicaRead) {
Objects.requireNonNull(meta, "meta is null");
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
if (peer == null || peer.getId() == 0) {
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
}
// region's first peer is leader.
this.peer = meta.getPeers(0);
this.leader = meta.getPeers(0);
} else {
this.peer = peer;
this.leader = leader;
}
if (isReplicaRead && meta.getPeersCount() > 0) {
// try to get first follower
try {
getNextFollower();
} catch (Exception ignore) {
// ignore
}
}
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.isReplicaRead = isReplicaRead;
}

private Region decodeRegion(Region region, boolean isRawRegion) {
Expand Down Expand Up @@ -89,7 +110,24 @@ private Region decodeRegion(Region region, boolean isRawRegion) {
}

public Peer getLeader() {
return peer;
return leader;
}

public Peer getCurrentFollower() {
return meta.getPeers(followerIdx);
}

public Peer getNextFollower() {
int cnt = meta.getPeersCount();
for (int retry = cnt - 1; retry > 0; retry--) {
followerIdx = (followerIdx + 1) % cnt;
Peer cur = meta.getPeers(followerIdx);
if (cur.getIsLearner()) {
continue;
}
return cur;
}
return leader;
}

public List<Peer> getLearnerList() {
Expand Down Expand Up @@ -130,7 +168,18 @@ public Kvrpcpb.Context getContext(Set<Long> resolvedLocks) {
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
builder.setIsolationLevel(this.isolationLevel);
builder.setPriority(this.commandPri);
builder.setRegionId(meta.getId()).setPeer(this.peer).setRegionEpoch(this.meta.getRegionEpoch());
if (isReplicaRead) {
builder
.setRegionId(meta.getId())
.setPeer(getCurrentFollower())
.setReplicaRead(true)
.setRegionEpoch(this.meta.getRegionEpoch());
} else {
builder
.setRegionId(meta.getId())
.setPeer(this.leader)
.setRegionEpoch(this.meta.getRegionEpoch());
}
builder.addAllResolvedLocks(resolvedLocks);
return builder.build();
}
Expand All @@ -152,7 +201,7 @@ boolean switchPeer(long leaderStoreID) {
List<Peer> peers = meta.getPeersList();
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
this.peer = p;
this.leader = p;
return true;
}
}
Expand Down Expand Up @@ -186,7 +235,7 @@ public boolean contains(ByteString key) {
}

public boolean isValid() {
return peer != null && meta != null;
return leader != null && meta != null;
}

public Metapb.RegionEpoch getRegionEpoch() {
Expand All @@ -204,14 +253,14 @@ public boolean equals(final Object another) {
}
TiRegion anotherRegion = ((TiRegion) another);
return anotherRegion.meta.equals(this.meta)
&& anotherRegion.peer.equals(this.peer)
&& anotherRegion.leader.equals(this.leader)
&& anotherRegion.commandPri.equals(this.commandPri)
&& anotherRegion.isolationLevel.equals(this.isolationLevel);
}

@Override
public int hashCode() {
return Objects.hash(meta, peer, isolationLevel, commandPri);
return Objects.hash(meta, leader, isolationLevel, commandPri);
}

@Override
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/tikv/txn/AbstractLockResolverClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;

public interface AbstractLockResolverClient {
Expand Down Expand Up @@ -67,7 +66,7 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) {
}

static AbstractLockResolverClient getInstance(
Metapb.Store store,
String storeVersion,
TiConfiguration conf,
TiRegion region,
TikvGrpc.TikvBlockingStub blockingStub,
Expand All @@ -76,10 +75,10 @@ static AbstractLockResolverClient getInstance(
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V3) < 0) {
if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V3) < 0) {
return new LockResolverClientV2(
conf, region, blockingStub, asyncStub, channelFactory, regionManager);
} else if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V4) < 0) {
} else if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V4) < 0) {
return new LockResolverClientV3(
conf,
region,
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/org/tikv/raw/RawKVClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private static ByteString getRandomValue() {
@Before
public void setup() throws IOException {
try {
session = TiSession.create(TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS));
TiConfiguration conf = TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS);
session = TiSession.create(conf);
initialized = false;
if (client == null) {
client = session.createRawClient();
Expand Down

0 comments on commit 0d15c52

Please sign in to comment.