Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat(security): add negotiation class #132

Merged
merged 4 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* .falconPerfCounterTags("")
* .falconPushInterval(Duration.ofSeconds(10))
* .metaQueryTimeout(Duration.ofMillis(5000))
* .enableAuth(false)
* .build();
* }</pre>
*/
Expand All @@ -44,6 +45,7 @@ public class ClientOptions {
public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
Expand All @@ -54,6 +56,7 @@ public class ClientOptions {
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
public static final boolean DEFAULT_ENABLE_AUTH = false;

private final String metaServers;
private final Duration operationTimeout;
Expand All @@ -63,6 +66,7 @@ public class ClientOptions {
private final Duration falconPushInterval;
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
private final boolean enableAuth;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -73,6 +77,7 @@ protected ClientOptions(Builder builder) {
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
this.enableAuth = builder.enableAuth;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -84,6 +89,7 @@ protected ClientOptions(ClientOptions original) {
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.enableAuth = original.enableAuth;
}

/**
Expand Down Expand Up @@ -143,6 +149,7 @@ public static ClientOptions create(String configPath) throws PException {
Duration metaQueryTimeout =
Duration.ofMillis(
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));
boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH);

return ClientOptions.builder()
.metaServers(metaList)
Expand All @@ -152,6 +159,7 @@ public static ClientOptions create(String configPath) throws PException {
.falconPerfCounterTags(perfCounterTags)
.falconPushInterval(pushIntervalSecs)
.metaQueryTimeout(metaQueryTimeout)
.enableAuth(enableAuth)
.build();
}

Expand All @@ -169,7 +177,8 @@ public boolean equals(Object options) {
&& this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis();
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis()
&& this.enableAuth == clientOptions.enableAuth;
}
return false;
}
Expand All @@ -195,6 +204,8 @@ public String toString() {
+ enableWriteLimit
+ ", metaQueryTimeout(ms)="
+ metaQueryTimeout.toMillis()
+ ", enableAuth="
+ enableAuth
+ '}';
}

Expand All @@ -208,6 +219,7 @@ public static class Builder {
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private boolean enableAuth = DEFAULT_ENABLE_AUTH;

protected Builder() {}

Expand Down Expand Up @@ -310,6 +322,18 @@ public Builder metaQueryTimeout(Duration metaQueryTimeout) {
return this;
}

/**
* Whether to enable authentication. Defaults to {@literal false}, see {@link
* #DEFAULT_ENABLE_AUTH}.
*
* @param enableAuth
* @return {@code this}
*/
public Builder enableAuth(boolean enableAuth) {
this.enableAuth = enableAuth;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -337,7 +361,8 @@ public ClientOptions.Builder mutate() {
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout());
.metaQueryTimeout(getMetaQueryTimeout())
.enableAuth(isEnableAuth());
return builder;
}

Expand Down Expand Up @@ -417,4 +442,13 @@ public boolean isWriteLimitEnabled() {
public Duration getMetaQueryTimeout() {
return metaQueryTimeout;
}

/**
* Whether to enable authentication. Defaults to {@literal false}.
*
* @return whether to enable authentication.
*/
public boolean isEnableAuth() {
return enableAuth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ClusterManager extends Cluster {
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
private boolean enableAuth;

private static final String osName;

Expand Down Expand Up @@ -61,6 +62,8 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {
// so the replicaSessions should be initialized earlier
metaSession =
new MetaSession(this, metaList, (int) opts.getMetaQueryTimeout().toMillis(), 10, metaGroup);

this.enableAuth = opts.isEnableAuth();
}

public EventExecutor getExecutor() {
Expand All @@ -82,7 +85,10 @@ public ReplicaSession getReplicaSession(rpc_address address) {
if (ss != null) return ss;
ss =
new ReplicaSession(
address, replicaGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT));
address,
replicaGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
enableAuth);
replicaSessions.put(address, ss);
return ss;
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.xiaomi.infra.pegasus.rpc.async;

public class Negotiation {
public Negotiation(ReplicaSession session) {
this.session = session;
}

public void start() {
// TBD(zlw)
}

private ReplicaSession session;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public enum ConnState {
DISCONNECTED
}

public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout) {
public ReplicaSession(
rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean enableAuth) {
this.address = address;
this.rpcGroup = rpcGroup;
this.enableAuth = enableAuth;

final ReplicaSession this_ = this;
boot = new Bootstrap();
Expand Down Expand Up @@ -72,7 +74,7 @@ public ReplicaSession(
EventLoopGroup rpcGroup,
int socketTimeout,
MessageResponseFilter filter) {
this(address, rpcGroup, socketTimeout);
this(address, rpcGroup, socketTimeout, false);
this.filter = filter;
}

Expand Down Expand Up @@ -207,6 +209,17 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
}

private void startNegotiation(Channel activeChannel) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
logger.info("{}: mark session state negotiation");
if (enableAuth) {
negotiation = new Negotiation(this);
negotiation.start();
} else {
logger.info("{}: mark session state connected");
markSessionConnected(activeChannel);
}
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
Expand Down Expand Up @@ -368,7 +381,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel {} for session {} is active", ctx.channel().toString(), name());
markSessionConnected(ctx.channel());
startNegotiation(ctx.channel());
}

@Override
Expand Down Expand Up @@ -421,6 +434,8 @@ static final class VolatileFields {
private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private boolean enableAuth;
private Negotiation negotiation;

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManager;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,7 +51,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private InterceptorManger interceptorManger;
private InterceptorManager interceptorManager;

public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
Expand Down Expand Up @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, InternalTableOptions intern
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions());
this.interceptorManager = new InterceptorManager(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down Expand Up @@ -255,7 +255,7 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin
}

client_operator operator = round.getOperator();
interceptorManger.after(round, operator.rpc_error.errno, this);
interceptorManager.after(round, operator.rpc_error.errno, this);
boolean needQueryMeta = false;
switch (operator.rpc_error.errno) {
case ERR_OK:
Expand Down Expand Up @@ -363,7 +363,7 @@ void call(final ClientRequestRound round) {
tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx());

if (handle.primarySession != null) {
interceptorManger.before(round, this);
interceptorManager.before(round, this);
// send request to primary
handle.primarySession.asyncSend(
round.getOperator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import java.util.ArrayList;
import java.util.List;

public class InterceptorManger {
public class InterceptorManager {

private List<TableInterceptor> interceptors = new ArrayList<>();

public InterceptorManger(TableOptions options) {
public InterceptorManager(TableOptions options) {
if (options.enableBackupRequest()) {
interceptors.add(new BackupRequestInterceptor(options.backupRequestDelayMs()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void testSessionConnectTimeout() throws InterruptedException {

long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
ReplicaSession rs = new ReplicaSession(addr, rpcGroup, 1000);
ReplicaSession rs = new ReplicaSession(addr, rpcGroup, 1000, false);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();
Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Expand Down