Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code optimization #3297

Merged
merged 1 commit into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class Constants {

public static final int DEFAULT_FAILBACK_TIMES = 3;

public static final int MAX_PROXY_COUNT = 65535;

// default buffer size is 8k.
public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;

Expand Down Expand Up @@ -480,16 +482,19 @@ public class Constants {

/**
* simple the registry for provider.
*
* @since 2.7.0
*/
public static final String SIMPLE_PROVIDER_CONFIG_KEY = "simple.provider.config";
/**
* simple the registry for consumer.
*
* @since 2.7.0
*/
public static final String SIMPLE_CONSUMER_CONFIG_KEY = "simple.consumer.config";
/**
* After simplify the registry, should add some parameter individually for provider.
*
* @since 2.7.0
*/
public static final String EXTRA_PROVIDER_CONFIG_KEYS_KEY = "extra.provider.keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.dubbo.common.bytecode;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.utils.ClassHelper;
import org.apache.dubbo.common.utils.ReflectUtils;

Expand Down Expand Up @@ -77,7 +78,7 @@ public static Proxy getProxy(Class<?>... ics) {
* @return Proxy instance.
*/
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > 65535) {
if (ics.length > Constants.MAX_PROXY_COUNT) {
throw new IllegalArgumentException("interface limit exceeded");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.lang.reflect.Proxy;

/**
* JavaassistRpcProxyFactory
* JdkRpcProxyFactory
*/
public class JdkProxyFactory extends AbstractProxyFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,61 +73,25 @@ public void encode(Channel channel, OutputStream output, Object message) throws
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);

byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, true, false, false);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, false, true, false);
break;
case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
try {
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, false, false, true);
break;
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, true, false, true);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
setException((Throwable) obj);
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, false, true, false);
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
Expand Down Expand Up @@ -155,4 +119,28 @@ public void decode() throws Exception {
}
}

private void setResponseResult(ObjectInput in, boolean hasValue, boolean hasException, boolean hasAttachments) throws IOException {
try {
if (hasValue) {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
}
if (hasException) {
Object obj = in.readObject();
if (obj instanceof Throwable == false) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
setException((Throwable) obj);
}
if (hasAttachments) {
setAttachments((Map<String, String>) in.readObject(Map.class));
}
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,22 @@ public class DubboProtocol extends AbstractProtocol {
public static final int DEFAULT_PORT = 20880;
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
private static DubboProtocol INSTANCE;
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final Set<String> optimizers = new ConcurrentHashSet<String>();
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
/**
* <host:port,Exchanger>
*/
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<>();
/**
* <host:port,Exchanger>
*/
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<>();
private final Set<String> optimizers = new ConcurrentHashSet<>();
/**
* consumer side export a stub service for dispatching event
* servicekey-stubmethods
*/
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<>();
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
Expand Down Expand Up @@ -180,7 +188,8 @@ public DubboProtocol() {

public static DubboProtocol getDubboProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); // load
// load
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME);
}
return INSTANCE;
}
Expand Down Expand Up @@ -218,15 +227,16 @@ Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " +
exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}

return exporter.getInvoker();
Expand Down Expand Up @@ -447,7 +457,7 @@ private ExchangeClient initClient(URL url) {

@Override
public void destroy() {
for (String key : new ArrayList<String>(serverMap.keySet())) {
for (String key : new ArrayList<>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
Expand All @@ -461,7 +471,7 @@ public void destroy() {
}
}

for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
for (String key : new ArrayList<>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
Expand All @@ -475,7 +485,7 @@ public void destroy() {
}
}

for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
for (String key : new ArrayList<>(ghostClientMap.keySet())) {
ExchangeClient client = ghostClientMap.remove(key);
if (client != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
@SuppressWarnings("deprecation")
final class LazyConnectExchangeClient implements ExchangeClient {

// when this warning rises from invocation, program probably have bug.
static final String REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
/**
* when this warning rises from invocation, program probably have bug.
*/
protected static final String REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
private final static Logger logger = LoggerFactory.getLogger(LazyConnectExchangeClient.class);
protected final boolean requestWithWarning;
private final URL url;
private final ExchangeHandler requestHandler;
private final Lock connectLock = new ReentrantLock();
// lazy connect, initial state for connection
private final int warning_period = 5000;
/**
* lazy connect, initial state for connection
*/
private final boolean initialState;
private volatile ExchangeClient client;
private AtomicLong warningcount = new AtomicLong(0);
Expand Down Expand Up @@ -81,7 +86,7 @@ private void initClient() throws RemotingException {

@Override
public ResponseFuture request(Object request) throws RemotingException {
warning(request);
warning();
initClient();
return client.request(request);
}
Expand All @@ -102,19 +107,17 @@ public InetSocketAddress getRemoteAddress() {

@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
warning(request);
warning();
initClient();
return client.request(request, timeout);
}

/**
* If {@link #REQUEST_WITH_WARNING_KEY} is configured, then warn once every 5000 invocations.
*
* @param request
*/
private void warning(Object request) {
private void warning() {
if (requestWithWarning) {
if (warningcount.get() % 5000 == 0) {
if (warningcount.get() % warning_period == 0) {
logger.warn(new IllegalStateException("safe guard client , should not be called ,must have a bug."));
}
warningcount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ public class TraceFilter implements Filter {

private static final String TRACE_COUNT = "trace.count";

private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<String, Set<Channel>>();
private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<>();

public static void addTracer(Class<?> type, String method, Channel channel, int max) {
channel.setAttribute(TRACE_MAX, max);
channel.setAttribute(TRACE_COUNT, new AtomicInteger());
String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
Set<Channel> channels = tracers.get(key);
if (channels == null) {
tracers.putIfAbsent(key, new ConcurrentHashSet<Channel>());
tracers.putIfAbsent(key, new ConcurrentHashSet<>());
channels = tracers.get(key);
}
channels.add(channel);
Expand Down Expand Up @@ -87,13 +87,13 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
channels = tracers.get(key);
}
if (CollectionUtils.isNotEmpty(channels)) {
for (Channel channel : new ArrayList<Channel>(channels)) {
for (Channel channel : new ArrayList<>(channels)) {
if (channel.isConnected()) {
try {
int max = 1;
Integer m = (Integer) channel.getAttribute(TRACE_MAX);
if (m != null) {
max = (int) m;
max = m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
Expand Down