diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java index 9ffe88ffff4..f7cc229fc9e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java @@ -85,6 +85,9 @@ public class StatConstants { public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING = "default.topic.empty"; public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED = "default.topic.used"; public static final java.lang.String EVENT_SINK_PRODUCER_NULL = "sink.producer.null"; + public static final java.lang.String EVENT_SINK_CLUSTER_EMPTY = "sink.cluster.empty"; + public static final java.lang.String EVENT_SINK_CLUSTER_UNMATCHED = "sink.cluster.unmatched"; + public static final java.lang.String EVENT_SINK_CPRODUCER_NULL = "sink.cluster.producer.null"; public static final java.lang.String EVENT_SINK_SEND_EXCEPTION = "sink.send.exception"; public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java index be20c21ccf0..358165a3af7 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java @@ -19,6 +19,7 @@ import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; +import org.apache.inlong.dataproxy.consts.StatConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,8 +31,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @@ -46,7 +45,6 @@ public class MessageQueueZoneProducer { private final CacheClusterSelector cacheClusterSelector; private final AtomicInteger clusterIndex = new AtomicInteger(0); - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private List currentClusterNames = new ArrayList<>(); private final ConcurrentHashMap usingTimeMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap usingClusterMap = new ConcurrentHashMap<>(); @@ -150,24 +148,29 @@ public void clearExpiredProducers() { */ public boolean send(PackProfile profile) { String clusterName; + List tmpClusters; MessageQueueClusterProducer clusterProducer; - readWriteLock.readLock().lock(); - try { - do { - clusterName = currentClusterNames.get( - Math.abs(clusterIndex.getAndIncrement()) % currentClusterNames.size()); - if (clusterName == null) { - continue; - } - clusterProducer = usingClusterMap.get(clusterName); - if (clusterProducer == null) { - continue; - } - return clusterProducer.send(profile); - } while (true); - } finally { - readWriteLock.readLock().unlock(); - } + do { + tmpClusters = currentClusterNames; + if (tmpClusters == null || tmpClusters.isEmpty()) { + context.fileMetricIncSumStats(StatConstants.EVENT_SINK_CLUSTER_EMPTY); + sleepSomeTime(100); + continue; + } + clusterName = tmpClusters.get(Math.abs(clusterIndex.getAndIncrement()) % tmpClusters.size()); + if (clusterName == null) { + context.fileMetricIncSumStats(StatConstants.EVENT_SINK_CLUSTER_UNMATCHED); + sleepSomeTime(100); + continue; + } + clusterProducer = usingClusterMap.get(clusterName); + if (clusterProducer == null) { + context.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_CPRODUCER_NULL, clusterName); + sleepSomeTime(100); + continue; + } + return clusterProducer.send(profile); + } while (true); } private void checkAndReloadClusterInfo() { @@ -225,14 +228,9 @@ private void checkAndReloadClusterInfo() { } } // replace cluster names - readWriteLock.writeLock().lock(); - try { - if (!lastClusterNames.equals(currentClusterNames)) { - changed = true; - currentClusterNames = lastClusterNames; - } - } finally { - readWriteLock.writeLock().unlock(); + if (!lastClusterNames.equals(currentClusterNames)) { + currentClusterNames = lastClusterNames; + changed = true; } // filter removed records Set needRmvs = new HashSet<>(); @@ -293,4 +291,12 @@ private void checkAndPublishTopics() { clusterProducer.publishTopic(curTopicSet); } } + + private void sleepSomeTime(long millis) { + try { + Thread.sleep(millis); + } catch (Throwable e) { + // + } + } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java index 6edb9f5bc08..76e50d697e3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java @@ -46,6 +46,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * MessageQueueZoneSink @@ -72,7 +74,8 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable, private MessageQueueZoneProducer zoneProducer; // configure change notify - private final Object syncLock = new Object(); + private final ReentrantLock reentrantLock = new ReentrantLock(); + private final Condition condition = reentrantLock.newCondition(); private final AtomicLong lastNotifyTime = new AtomicLong(0); // changeListerThread private Thread configListener; @@ -295,8 +298,13 @@ public void update() { if (zoneProducer == null) { return; } - lastNotifyTime.set(System.currentTimeMillis()); - syncLock.notifyAll(); + reentrantLock.lock(); + try { + lastNotifyTime.set(System.currentTimeMillis()); + condition.signal(); + } finally { + reentrantLock.unlock(); + } } /** @@ -311,14 +319,16 @@ private class ConfigChangeProcessor implements Runnable { @Override public void run() { long lastCheckTime; + logger.info("{} config-change processor start!", getName()); while (!isShutdown) { + reentrantLock.lock(); try { - syncLock.wait(); - } catch (InterruptedException e) { - logger.error("{} config-change processor meet interrupt, exit!", getName()); + condition.await(); + } catch (InterruptedException e1) { + logger.info("{} config-change processor meet interrupt, break!", getName()); break; - } catch (Throwable e2) { - // + } finally { + reentrantLock.unlock(); } if (zoneProducer == null) { continue; @@ -328,6 +338,7 @@ public void run() { zoneProducer.reloadMetaConfig(); } while (lastCheckTime != lastNotifyTime.get()); } + logger.info("{} config-change processor exit!", getName()); } } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java index 9f31e7f97e6..cd2353f01e0 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java @@ -229,7 +229,7 @@ public Event encEventPackage(BaseSource source, Channel channel) { - BIN_MSG_ATTRLEN_SIZE - BIN_MSG_MAGIC_SIZE - origAttr.length(), (short) origAttr.length()); if (origAttr.length() > 0) { System.arraycopy(origAttr.getBytes(StandardCharsets.UTF_8), 0, dataBuf.array(), - totalPkgLength - BIN_MSG_MAGIC_SIZE - origAttr.length(), bodyData.length); + totalPkgLength - BIN_MSG_MAGIC_SIZE - origAttr.length(), origAttr.length()); } dataBuf.putShort(totalPkgLength - BIN_MSG_MAGIC_SIZE, (short) BIN_MSG_MAGIC); // build InLong message