Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release 1.6.1 (#209) #229

Merged
merged 3 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .middleware-common/.travis.settings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<repository>
<id>central</id>
<name>Central Repository</name>
<url>http://repo1.maven.org/maven2</url>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
Expand Down Expand Up @@ -108,7 +108,7 @@
<pluginRepository>
<id>central</id>
<!-- specify repo1 which support http -->
<url>http://repo1.maven.org/maven2</url>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
Expand Down
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ jdk:
- oraclejdk8
- openjdk7

dist: trusty

before_install:
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
Expand Down
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.0</version>
<version>1.6.1</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down Expand Up @@ -149,6 +149,18 @@
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.20</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.20</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/alipay/remoting/AbstractLifeCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ public boolean isStarted() {
return isStarted.get();
}

/**
* ensure the component has been startup before providing service.
*/
protected void ensureStarted() {
if (!isStarted()) {
throw new LifeCycleException(String.format(
"Component(%s) has not been started yet, please startup first!", getClass()
.getSimpleName()));
}
}
}
16 changes: 15 additions & 1 deletion src/main/java/com/alipay/remoting/AbstractRemotingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/**
* Server template for remoting.
*
*
* @author jiangping
* @version $Id: AbstractRemotingServer.java, v 0.1 2015-9-5 PM7:37:48 tao Exp $
*/
Expand All @@ -56,6 +56,10 @@ public AbstractRemotingServer(int port) {
}

public AbstractRemotingServer(String ip, int port) {
if (port < 0 || port > 65535) {
throw new IllegalArgumentException(String.format(
"Illegal port value: %d, which should between 0 and 65535.", port));
}
this.ip = ip;
this.port = port;

Expand Down Expand Up @@ -123,6 +127,16 @@ public int port() {
return port;
}

/**
* override the random port zero with the actual binding port value.
* @param port local binding port
*/
protected void setLocalBindingPort(int port) {
if (port() == 0) {
this.port = port;
}
}

protected abstract void doInit();

protected abstract boolean doStart() throws InterruptedException;
Expand Down
84 changes: 45 additions & 39 deletions src/main/java/com/alipay/remoting/ConnectionEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,35 +139,55 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// add reconnect task
if (this.globalSwitch != null
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.reconnect(conn.getUrl());
}
Connection conn = (Connection) attr.get();
// if conn is null, means that channel has been inactive before binding with connection
// this situation will fire a CLOSE event in ConnectionFactory
if (conn != null) {
userEventTriggered(ctx, ConnectionEventType.CLOSE);
}
// trigger close connection event
onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
switch ((ConnectionEventType) event) {
ConnectionEventType eventType = (ConnectionEventType) event;
Channel channel = ctx.channel();
if (channel == null) {
logger
.warn(
"channel null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

Connection connection = channel.attr(Connection.CONNECTION).get();
if (connection == null) {
logger
.error(
"[BUG]connection is null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

final String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
final String localAddress = RemotingUtil.parseLocalAddress(ctx.channel());
logger.info("trigger user event, local[{}], remote[{}], event: {}", localAddress,
remoteAddress, eventType.name());

switch (eventType) {
case CONNECT:
Channel channel = ctx.channel();
if (null != channel) {
Connection connection = channel.attr(Connection.CONNECTION).get();
this.onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
} else {
logger
.warn("channel null when handle user triggered event in ConnectionEventHandler!");
}
onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
break;
case CONNECT_FAILED:
case CLOSE:
case EXCEPTION:
submitReconnectTaskIfNecessary(connection.getUrl());
onEvent(connection, connection.getUrl().getOriginUrl(), eventType);
break;
default:
logger.error("[BUG]unknown event: {}", eventType.name());
break;
}
} else {
Expand All @@ -186,6 +206,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().close();
}

private void submitReconnectTaskIfNecessary(Url url) {
if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
reconnectManager.reconnect(url);
}
}

private void onEvent(final Connection conn, final String remoteAddress,
final ConnectionEventType type) {
if (this.eventListener != null) {
Expand All @@ -198,20 +224,10 @@ public void run() {
}
}

/**
* Getter method for property <tt>listener</tt>.
*
* @return property value of listener
*/
public ConnectionEventListener getConnectionEventListener() {
return eventListener;
}

/**
* Setter method for property <tt>listener</tt>.
*
* @param listener value to be assigned to property listener
*/
public void setConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
this.eventListener = listener;
Expand All @@ -221,20 +237,10 @@ public void setConnectionEventListener(ConnectionEventListener listener) {
}
}

/**
* Getter method for property <tt>connectionManager</tt>.
*
* @return property value of connectionManager
*/
public ConnectionManager getConnectionManager() {
return connectionManager;
}

/**
* Setter method for property <tt>connectionManager</tt>.
*
* @param connectionManager value to be assigned to property connectionManager
*/
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/alipay/remoting/ConnectionEventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* @version $Id: ConnectionEventType.java, v 0.1 Mar 4, 2016 8:03:27 PM tao Exp $
*/
public enum ConnectionEventType {
CONNECT, CLOSE, EXCEPTION;
CONNECT, CONNECT_FAILED, CLOSE, EXCEPTION;
}
18 changes: 12 additions & 6 deletions src/main/java/com/alipay/remoting/DefaultConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public DefaultConnectionManager() {
* @param connectionSelectStrategy connection selection strategy
*/
public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy) {
this();
this.connTasks = new ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>>();
this.healTasks = new ConcurrentHashMap<String, FutureTask<Integer>>();
this.connectionSelectStrategy = connectionSelectStrategy;
}

Expand Down Expand Up @@ -283,10 +284,8 @@ public List<Connection> getAll(String poolKey) {
@Override
public Map<String, List<Connection>> getAll() {
Map<String, List<Connection>> allConnections = new HashMap<String, List<Connection>>();
Iterator<Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>>> iterator = this
.getConnPools().entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry = iterator.next();
for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : this
.getConnPools().entrySet()) {
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
if (null != pool) {
allConnections.put(entry.getKey(), pool.getAll());
Expand Down Expand Up @@ -425,7 +424,14 @@ public void scan() {
Iterator<String> iter = this.connTasks.keySet().iterator();
while (iter.hasNext()) {
String poolKey = iter.next();
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.get(poolKey);
if (!task.isDone()) {
logger.info("task(poolKey={}) is not done, do not scan the connection pool",
poolKey);
continue;
}

ConnectionPool pool = this.getConnectionPool(task);
if (null != pool) {
pool.scan();
if (pool.isEmpty()) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/alipay/remoting/InvokeCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@

/**
* Invoke callback.
*
*
* @author jiangping
* @version $Id: InvokeCallback.java, v 0.1 2015-9-30 AM10:24:26 tao Exp $
*/
public interface InvokeCallback {

/**
* Response received.
*
*
* @param result
*/
void onResponse(final Object result);

/**
* Exception caught.
*
*
* @param e
*/
void onException(final Throwable e);

/**
* User defined executor.
*
*
* @return
*/
Executor getExecutor();
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/com/alipay/remoting/ReconnectManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Reconnect manager.
*
*
* @author yunliang.shi
* @version $Id: ReconnectManager.java, v 0.1 Mar 11, 2016 5:20:50 PM yunliang.shi Exp $
*/
Expand All @@ -47,29 +47,39 @@ public ReconnectManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
this.tasks = new LinkedBlockingQueue<ReconnectTask>();
this.canceled = new CopyOnWriteArrayList<Url>();
// call startup in the constructor to be compatible with version 1.5.x
startup();
}

@Override
public void reconnect(Url url) {
ensureStarted();
tasks.add(new ReconnectTask(url));
}

@Override
public void disableReconnect(Url url) {
ensureStarted();
canceled.add(url);
}

@Override
public void enableReconnect(Url url) {
ensureStarted();
canceled.remove(url);
}

@Override
public void startup() throws LifeCycleException {
super.startup();
// make the startup method idempotent to be compatible with version 1.5.x
synchronized (this) {
if (!isStarted()) {
super.startup();

this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.healConnectionThreads.start();
this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.healConnectionThreads.start();
}
}
}

@Override
Expand All @@ -86,6 +96,7 @@ public void shutdown() throws LifeCycleException {
*/
@Deprecated
public void addCancelUrl(Url url) {
ensureStarted();
disableReconnect(url);
}

Expand All @@ -94,6 +105,7 @@ public void addCancelUrl(Url url) {
*/
@Deprecated
public void removeCancelUrl(Url url) {
ensureStarted();
enableReconnect(url);
}

Expand All @@ -102,6 +114,7 @@ public void removeCancelUrl(Url url) {
*/
@Deprecated
public void addReconnectTask(Url url) {
ensureStarted();
reconnect(url);
}

Expand Down
Loading