Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize retry for FailbackRegistry. #2763

Merged
merged 11 commits into from
Dec 11, 2018
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.support.FailbackRegistry;

import java.util.concurrent.TimeUnit;

/**
* AbstractRetryTask
*/
public abstract class AbstractRetryTask implements TimerTask {

protected final Logger logger = LoggerFactory.getLogger(getClass());

/**
* url for retry task
*/
protected final URL url;

/**
* registry for this task
*/
protected final FailbackRegistry registry;

/**
* retry period
*/
protected final long retryPeriod;

/**
* task name for this task
*/
protected final String taskName;

private volatile boolean cancel;

AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
if (url == null || StringUtils.isBlank(taskName)) {
throw new IllegalArgumentException();
}
this.url = url;
this.registry = registry;
this.taskName = taskName;
cancel = false;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
}

public void cancel() {
cancel = true;
}

public boolean isCancel() {
return cancel;
}

protected void reput(Timeout timeout, long tick) {
if (timeout == null) {
throw new IllegalArgumentException();
}

Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled() || isCancel()) {
return;
}

timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
// other thread cancel this timeout or stop the timer.
return;
}
if (logger.isInfoEnabled()) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
logger.info(taskName + " : " + url);
}
try {
doRetry(url, registry, timeout);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
// reput this task when catch exception.
reput(timeout, retryPeriod);
}
}

protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* FailedNotifiedTask
*/
public final class FailedNotifiedTask extends AbstractRetryTask {

private static final String NAME = "retry subscribe";

private final NotifyListener listener;

private final List<URL> urls = new CopyOnWriteArrayList<>();

public FailedNotifiedTask(URL url, NotifyListener listener) {
super(url, null, NAME);
if (listener == null) {
throw new IllegalArgumentException();
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
}
this.listener = listener;
}

public void addUrlToRetry(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
this.urls.addAll(urls);
}

public void removeRetryUrl(List<URL> urls) {
this.urls.removeAll(urls);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
if (CollectionUtils.isNotEmpty(urls)) {
listener.notify(urls);
urls.clear();
}
reput(timeout, retryPeriod);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedRegisteredTask
*/
public final class FailedRegisteredTask extends AbstractRetryTask {

private static final String NAME = "retry register";

public FailedRegisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doRegister(url);
registry.removeFailedRegisteredTask(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedSubscribedTask
*/
public final class FailedSubscribedTask extends AbstractRetryTask {

private static final String NAME = "retry subscribe";

private final NotifyListener listener;

public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doSubscribe(url, listener);
registry.removeFailedSubscribedTask(url, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedUnregisteredTask
*/
public final class FailedUnregisteredTask extends AbstractRetryTask {

private static final String NAME = "retry unregister";

public FailedUnregisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doUnregister(url);
registry.removeFailedUnregisteredTask(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedUnsubscribedTask
*/
public final class FailedUnsubscribedTask extends AbstractRetryTask {

private static final String NAME = "retry unsubscribe";

private final NotifyListener listener;

public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.unsubscribe(url, listener);
registry.removeFailedUnsubscribedTask(url, listener);
}
}
Loading