From 793e5c1d51d41c6fcb635961d5b145f66b0d92ab Mon Sep 17 00:00:00 2001 From: HanDong Date: Mon, 3 May 2021 13:54:53 +0800 Subject: [PATCH] Fix compile error --- .../broker/client/AdjustQueueNumStrategy.java | 22 +++++++++---------- .../broker/client/DeFiConsumerManager.java | 16 +++++++++----- .../consumer/DeFiBusPullMessageService.java | 15 ++++++++----- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java b/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java index a6d3354..7a666dd 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java @@ -17,16 +17,8 @@ package com.webank.defibus.broker.client; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; - import com.webank.defibus.broker.DeFiBrokerController; -import io.netty.channel.Channel; +import com.webank.defibus.common.DeFiBusConstant; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.MixAll; @@ -37,7 +29,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.webank.defibus.common.DeFiBusConstant; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +import io.netty.channel.Channel; public class AdjustQueueNumStrategy { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -234,7 +234,7 @@ private int scaleQueueSize(Set cidList) { int scaleQueueSize = 0; long nearbyClients = nearbyClients(cidList); if (nearbyClients != 0) { - scaleQueueSize = new Long(nearbyClients).intValue(); + scaleQueueSize = Long.valueOf(nearbyClients).intValue(); } else if (isAllClientsHaveNotIDCSurffix(cidList)) { scaleQueueSize = cidList.size(); } diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java b/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java index 769280f..cecd755 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java @@ -18,12 +18,7 @@ package com.webank.defibus.broker.client; import com.webank.defibus.common.util.ReflectUtil; -import io.netty.channel.Channel; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; + import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -39,6 +34,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import io.netty.channel.Channel; + public class DeFiConsumerManager extends ConsumerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; @@ -47,6 +50,7 @@ public class DeFiConsumerManager extends ConsumerManager { private final ConsumerIdsChangeListener consumerIdsChangeListener; private final AdjustQueueNumStrategy adjustQueueNumStrategy; + @SuppressWarnings("unchecked") public DeFiConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, final AdjustQueueNumStrategy strategy) { super(consumerIdsChangeListener); diff --git a/defibus-client/src/main/java/com/webank/defibus/client/impl/consumer/DeFiBusPullMessageService.java b/defibus-client/src/main/java/com/webank/defibus/client/impl/consumer/DeFiBusPullMessageService.java index 992fcf9..da31e84 100644 --- a/defibus-client/src/main/java/com/webank/defibus/client/impl/consumer/DeFiBusPullMessageService.java +++ b/defibus-client/src/main/java/com/webank/defibus/client/impl/consumer/DeFiBusPullMessageService.java @@ -19,12 +19,7 @@ import com.webank.defibus.client.impl.factory.DeFiBusClientInstance; import com.webank.defibus.common.util.ReflectUtil; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.consumer.PullMessageService; @@ -33,6 +28,13 @@ import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + public class DeFiBusPullMessageService extends PullMessageService { private final InternalLogger log = ClientLogger.getLog(); private final DeFiBusClientInstance mQClientFactory; @@ -47,6 +49,7 @@ public Thread newThread(Runnable r) { } }); + @SuppressWarnings("unchecked") public DeFiBusPullMessageService(DeFiBusClientInstance deFiBusClientInstance) { super(deFiBusClientInstance); this.mQClientFactory = deFiBusClientInstance;