Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadOnlyOption 支持请求级别 #867

Merged
merged 7 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alipay.sofa.jraft.error.LogNotFoundException;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.util.Describer;

/**
Expand Down Expand Up @@ -119,9 +120,10 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
* [Thread-safe and wait-free]
*
* Starts a linearizable read-only query request with request context(optional,
* such as request id etc.) and closure. The closure will be called when the
* request is completed, and user can read data from state machine if the result
* status is OK.
* such as request id etc.) and closure.
* The default value of ReadOnlyOption is {@link ReadOnlyOption#ReadOnlySafe}.
* The closure will be called when the request is completed, and user can read
* data from state machine if the result status is OK.
*
* @param requestContext the context of request
* @param done callback
Expand All @@ -130,6 +132,19 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
void readIndex(final byte[] requestContext, final ReadIndexClosure done);

/**
* [Thread-safe and wait-free]
*
* Specify the ReadOnlyOption to execute a linearizable read-only query request.
*
* @param readOnlyOptions how the read only request is processed
* @param requestContext the context of request
* @param done callback
*
* @since 1.3.13
*/
void readIndex(final ReadOnlyOption readOnlyOptions, final byte[] requestContext, final ReadIndexClosure done);

/**
* List peers of this raft group, only leader returns.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;

/**
Expand All @@ -36,6 +37,15 @@ public interface ReadOnlyService extends Lifecycle<ReadOnlyServiceOptions> {
*/
void addRequest(final byte[] reqCtx, final ReadIndexClosure closure);

/**
* Adds a ReadIndex request.
*
* @param readOnlyOptions how the read only request is processed
* @param reqCtx request context of readIndex
* @param closure callback
*/
void addRequest(final ReadOnlyOption readOnlyOptions, final byte[] reqCtx, final ReadIndexClosure closure);

/**
* Waits for service shutdown.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1435,13 +1435,18 @@ public JRaftServiceFactory getServiceFactory() {

@Override
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
readIndex(this.raftOptions.getReadOnlyOptions(), requestContext, done);
}

@Override
public void readIndex(ReadOnlyOption readOnlyOptions, byte[] requestContext, ReadIndexClosure done) {
if (this.shutdownLatch != null) {
ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.ENODESHUTDOWN,
"Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
this.readOnlyService.addRequest(requestContext, done);
this.readOnlyService.addRequest(readOnlyOptions, requestContext, done);
}

/**
Expand Down Expand Up @@ -1580,7 +1585,8 @@ private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.
}
}

ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
ReadOnlyOption readOnlyOpt = ReadOnlyOption.valueOfWithDefault(request.getReadOnlyOptions(),
this.raftOptions.getReadOnlyOptions());
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -92,12 +95,14 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
.getLogger(ReadOnlyServiceImpl.class);

private static class ReadIndexEvent {
ReadOnlyOption readOnlyOptions;
Bytes requestContext;
ReadIndexClosure done;
CountDownLatch shutdownLatch;
long startTime;

private void reset() {
this.readOnlyOptions = null;
this.requestContext = null;
this.done = null;
this.shutdownLatch = null;
Expand Down Expand Up @@ -221,25 +226,34 @@ private void notifyFail(final Status status) {
}
}

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
private void handleReadIndex(final ReadOnlyOption option, final List<ReadIndexEvent> events) {
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());

final List<ReadIndexState> states = new ArrayList<>(events.size());

for (final ReadIndexEvent event : events) {
rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString())
.setReadOnlyOptions(ReadOnlyOption.convertMsgType(option));
final List<ReadIndexState> states = events.stream()
.filter(it -> option.equals(it.readOnlyOptions))
.map(it -> {
rb.addEntries(ZeroByteStringHelper.wrap(it.requestContext.get()));
return new ReadIndexState(it.requestContext, it.done, it.startTime);
})
.collect(Collectors.toList());

if (states.isEmpty()) {
return;
}
final ReadIndexRequest request = rb.build();

this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
handleReadIndex(ReadOnlyOption.ReadOnlySafe, events);
handleReadIndex(ReadOnlyOption.ReadOnlyLeaseBased, events);
}

private void resetPendingStatusError(final Status st) {
this.lock.lock();
try {
Expand Down Expand Up @@ -318,35 +332,41 @@ public void join() throws InterruptedException {
}

@Override
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
public void addRequest(byte[] reqCtx, ReadIndexClosure closure) {
addRequest(this.node.getRaftOptions().getReadOnlyOptions(), reqCtx, closure);
}

@Override
public void addRequest(final ReadOnlyOption readOnlyOptions, final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch != null) {
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.readOnlyOptions = readOnlyOptions;
event.done = closure;
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
};

switch(this.node.getOptions().getApplyTaskMode()) {
case Blocking:
this.readIndexQueue.publishEvent(translator);
break;
case NonBlocking:
switch (this.node.getOptions().getApplyTaskMode()) {
case Blocking:
this.readIndexQueue.publishEvent(translator);
break;
case NonBlocking:
default:
if (!this.readIndexQueue.tryPublishEvent(translator)) {
final String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize="+ this.readIndexQueue.getBufferSize();
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure,
new Status(RaftError.EBUSY, errorMsg));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
if(closure == null) {
throw new OverloadException(errorMsg);
if (!this.readIndexQueue.tryPublishEvent(translator)) {
final String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize=" + this.readIndexQueue.getBufferSize();
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure,
new Status(RaftError.EBUSY, errorMsg));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
if (closure == null) {
throw new OverloadException(errorMsg);
}
shihuili1218 marked this conversation as resolved.
Show resolved Hide resolved
}
}
break;
break;
}
} catch (final Exception e) {
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,92 @@ private ErrorType(int value) {
// @@protoc_insertion_point(enum_scope:jraft.ErrorType)
}

/**
* Protobuf enum {@code jraft.ReadOnlyType}
*/
public enum ReadOnlyType implements com.google.protobuf.ProtocolMessageEnum {
/**
* <code>READ_ONLY_SAFE = 0;</code>
*/
READ_ONLY_SAFE(0),
/**
* <code>READ_ONLY_LEASE_BASED = 1;</code>
*/
READ_ONLY_LEASE_BASED(1), ;

/**
* <code>READ_ONLY_SAFE = 0;</code>
*/
public static final int READ_ONLY_SAFE_VALUE = 0;
/**
* <code>READ_ONLY_LEASE_BASED = 1;</code>
*/
public static final int READ_ONLY_LEASE_BASED_VALUE = 1;

public final int getNumber() {
return value;
}

/**
* @deprecated Use {@link #forNumber(int)} instead.
*/
@java.lang.Deprecated
public static ReadOnlyType valueOf(int value) {
return forNumber(value);
}

public static ReadOnlyType forNumber(int value) {
switch (value) {
case 0:
return READ_ONLY_SAFE;
case 1:
return READ_ONLY_LEASE_BASED;
default:
return null;
}
}

public static com.google.protobuf.Internal.EnumLiteMap<ReadOnlyType> internalGetValueMap() {
return internalValueMap;
}

private static final com.google.protobuf.Internal.EnumLiteMap<ReadOnlyType> internalValueMap = new com.google.protobuf.Internal.EnumLiteMap<ReadOnlyType>() {
public ReadOnlyType findValueByNumber(int number) {
return ReadOnlyType
.forNumber(number);
}
};

public final com.google.protobuf.Descriptors.EnumValueDescriptor getValueDescriptor() {
return getDescriptor().getValues().get(ordinal());
}

public final com.google.protobuf.Descriptors.EnumDescriptor getDescriptorForType() {
return getDescriptor();
}

public static final com.google.protobuf.Descriptors.EnumDescriptor getDescriptor() {
return com.alipay.sofa.jraft.entity.EnumOutter.getDescriptor().getEnumTypes().get(2);
}

private static final ReadOnlyType[] VALUES = values();

public static ReadOnlyType valueOf(com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
if (desc.getType() != getDescriptor()) {
throw new java.lang.IllegalArgumentException("EnumValueDescriptor is not for this type.");
}
return VALUES[desc.getIndex()];
}

private final int value;

private ReadOnlyType(int value) {
this.value = value;
}

// @@protoc_insertion_point(enum_scope:jraft.ReadOnlyType)
}

public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() {
return descriptor;
}
Expand All @@ -275,7 +361,9 @@ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() {
+ "NE\020\000\022\022\n\016ERROR_TYPE_LOG\020\001\022\025\n\021ERROR_TYPE_S"
+ "TABLE\020\002\022\027\n\023ERROR_TYPE_SNAPSHOT\020\003\022\034\n\030ERRO"
+ "R_TYPE_STATE_MACHINE\020\004\022\023\n\017ERROR_TYPE_MET"
+ "A\020\005B*\n\034com.alipay.sofa.jraft.entityB\nEnu" + "mOutter" };
+ "A\020\005*=\n\014ReadOnlyType\022\022\n\016READ_ONLY_SAFE\020\000\022"
+ "\031\n\025READ_ONLY_LEASE_BASED\020\001B*\n\034com.alipay"
+ ".sofa.jraft.entityB\nEnumOutter" };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.jraft.option;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.util.Copiable;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

Expand Down Expand Up @@ -70,6 +72,8 @@ public class RaftOptions implements Copiable<RaftOptions> {

/**
* ReadOnlyOption specifies how the read only request is processed.
* This is a global configuration, you can use {@link Node#readIndex(ReadOnlyOption, byte[], ReadIndexClosure)}
* to specify how individual requests are processed.
*
* {@link ReadOnlyOption#ReadOnlySafe} guarantees the linearizability of the read only request by
* communicating with the quorum. It is the default and suggested option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.alipay.sofa.jraft.option;

import org.apache.commons.lang.StringUtils;

import com.alipay.sofa.jraft.entity.EnumOutter;

/**
* Read only options.
*
Expand All @@ -31,5 +35,19 @@ public enum ReadOnlyOption {
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyLeaseBased
ReadOnlyLeaseBased;

public static EnumOutter.ReadOnlyType convertMsgType(ReadOnlyOption option) {
return ReadOnlyOption.ReadOnlyLeaseBased.equals(option) ? EnumOutter.ReadOnlyType.READ_ONLY_LEASE_BASED
: EnumOutter.ReadOnlyType.READ_ONLY_SAFE;
}

public static ReadOnlyOption valueOfWithDefault(EnumOutter.ReadOnlyType readOnlyType, ReadOnlyOption defaultOption) {
if (readOnlyType == null) {
// for old version of messages
return defaultOption;
}
return EnumOutter.ReadOnlyType.READ_ONLY_SAFE.equals(readOnlyType) ? ReadOnlyOption.ReadOnlySafe
: ReadOnlyOption.ReadOnlyLeaseBased;
}
}
Loading