Skip to content

Commit

Permalink
1. upgrade version to 1.5.6 (#168)
Browse files Browse the repository at this point in the history
2. fix NPE in channelInactive
3. handling reconnection situation with no connection binding with channel
4. du not scan connection pool associated with task that has not been don
5. add CONNECT_FAILED event type and print user event log
  • Loading branch information
dbl-x authored and yuemingliang committed Jun 27, 2019
1 parent 124fbf4 commit 811618a
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 48 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ jdk:
- oraclejdk8
- openjdk7

dist: trusty

before_install:
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
86 changes: 46 additions & 40 deletions src/main/java/com/alipay/remoting/ConnectionEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,36 +139,56 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// add reconnect task
if (this.globalSwitch != null
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.addReconnectTask(conn.getUrl());
}
Connection conn = (Connection) attr.get();
// if conn is null, means that channel has been inactive before binding with connection
// this situation will fire a CLOSE event in ConnectionFactory
if (conn != null) {
userEventTriggered(ctx, ConnectionEventType.CLOSE);
}
// trigger close connection event
onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
switch ((ConnectionEventType) event) {
ConnectionEventType eventType = (ConnectionEventType) event;
Channel channel = ctx.channel();
if (channel == null) {
logger
.warn(
"channel null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

Connection connection = channel.attr(Connection.CONNECTION).get();
if (connection == null) {
logger
.error(
"[BUG]connection is null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

final String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
final String localAddress = RemotingUtil.parseLocalAddress(ctx.channel());
logger.info("trigger user event, local[{}], remote[{}], event: {}", localAddress,
remoteAddress, eventType.name());

switch (eventType) {
case CONNECT:
Channel channel = ctx.channel();
if (null != channel) {
Connection connection = channel.attr(Connection.CONNECTION).get();
this.onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
} else {
logger
.warn("channel null when handle user triggered event in ConnectionEventHandler!");
}
onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
break;
case CONNECT_FAILED:
case CLOSE:
case EXCEPTION:
submitReconnectTaskIfNecessary(connection.getUrl());
onEvent(connection, connection.getUrl().getOriginUrl(), eventType);
break;
default:
return;
logger.error("[BUG]unknown event: {}", eventType.name());
break;
}
} else {
super.userEventTriggered(ctx, event);
Expand All @@ -186,12 +206,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().close();
}

/**
*
* @param conn
* @param remoteAddress
* @param type
*/
private void submitReconnectTaskIfNecessary(Url url) {
if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
reconnectManager.addReconnectTask(url);
}
}

private void onEvent(final Connection conn, final String remoteAddress,
final ConnectionEventType type) {
if (this.eventListener != null) {
Expand All @@ -204,20 +224,10 @@ public void run() {
}
}

/**
* Getter method for property <tt>listener</tt>.
*
* @return property value of listener
*/
public ConnectionEventListener getConnectionEventListener() {
return eventListener;
}

/**
* Setter method for property <tt>listener</tt>.
*
* @param listener value to be assigned to property listener
*/
public void setConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
this.eventListener = listener;
Expand Down Expand Up @@ -268,8 +278,6 @@ public class ConnectionEventExecutor {

/**
* Process event.
*
* @param event
*/
public void onEvent(Runnable event) {
try {
Expand All @@ -282,8 +290,6 @@ public void onEvent(Runnable event) {

/**
* print info log
* @param format
* @param addr
*/
private void infoLog(String format, String addr) {
if (logger.isInfoEnabled()) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/alipay/remoting/ConnectionEventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* @version $Id: ConnectionEventType.java, v 0.1 Mar 4, 2016 8:03:27 PM tao Exp $
*/
public enum ConnectionEventType {
CONNECT, CLOSE, EXCEPTION;
CONNECT, CONNECT_FAILED, CLOSE, EXCEPTION;
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,14 @@ public void scan() {
Iterator<String> iter = this.connTasks.keySet().iterator();
while (iter.hasNext()) {
String poolKey = iter.next();
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.get(poolKey);
if (!task.isDone()) {
logger.info("task(poolKey={}) is not done, do not scan the connection pool",
poolKey);
continue;
}

ConnectionPool pool = this.getConnectionPool(task);
if (null != pool) {
pool.scan();
if (pool.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ public Connection createConnection(Url url) throws Exception {
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
url.getVersion(), url);
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
if (channel.isActive()) {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
}
return conn;
}

Expand All @@ -142,7 +146,11 @@ public Connection createConnection(String targetIP, int targetPort, int connectT
Connection conn = new Connection(channel,
ProtocolCode.fromBytes(RpcProtocol.PROTOCOL_CODE), RpcProtocolV2.PROTOCOL_VERSION_1,
new Url(targetIP, targetPort));
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
if (channel.isActive()) {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
}
return conn;
}

Expand All @@ -153,7 +161,11 @@ public Connection createConnection(String targetIP, int targetPort, byte version
Connection conn = new Connection(channel,
ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE), version, new Url(targetIP,
targetPort));
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
if (channel.isActive()) {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
}
return conn;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/alipay/remoting/util/FutureTaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public static <T> T getFutureTaskResult(RunStateRecordedFutureTask<T> task, Logg
} catch (ExecutionException e) {
logger.error("Future task execute failed!", e);
} catch (FutureTaskNotRunYetException e) {
logger.error("Future task has not run yet!", e);
logger.warn("Future task has not run yet!", e);
} catch (FutureTaskNotCompleted e) {
logger.error("Future task has not completed!", e);
logger.warn("Future task has not completed!", e);
}
}
return t;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.remoting.rpc.connectionmanage;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventProcessor;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import org.junit.Assert;
import org.junit.Test;

/**
* @author chengyi (mark.lx@antfin.com) 2019-06-27 10:51
*/
public class ConnectionExceptionTest {

@Test
public void testConnectionException() throws RemotingException, InterruptedException {
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();

BoltServer boltServer = new BoltServer(1024);
boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
boltServer.start();

final String[] closedUrl = new String[1];
RpcClient client = new RpcClient();
client.enableReconnectSwitch();
client.addConnectionEventProcessor(ConnectionEventType.CLOSE,
new ConnectionEventProcessor() {
@Override
public void onEvent(String remoteAddr, Connection conn) {
closedUrl[0] = remoteAddr;
}
});
client.init();

Connection connection = client.getConnection("127.0.0.1:1024", 1000);
Thread.sleep(10);
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());

connection.getChannel().close();

Thread.sleep(100);
Assert.assertTrue("127.0.0.1:1024".equals(closedUrl[0]));

// connection has been created by ReconnectManager
Thread.sleep(1000 * 2);
Assert.assertEquals(2, serverConnectProcessor.getConnectTimes());
connection = client.getConnection("127.0.0.1:1024", 1000);
Assert.assertTrue(connection.isFine());
Assert.assertEquals(2, serverConnectProcessor.getConnectTimes());

boltServer.stop();
}
}

0 comments on commit 811618a

Please sign in to comment.