From 257684a0b8da98c127c0256fa052e4bc0ada008d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Thu, 16 Nov 2023 19:50:24 +0800 Subject: [PATCH] Change the policy of the queue(DataCarrier) L1 in the metric aggregate worker to IF_POSSIBLE mode (#11554) --- docs/en/changes/changes.md | 3 + .../worker/MetricsAggregateWorker.java | 13 +++- .../buffer/ArrayBlockingQueueBuffer.java | 4 +- .../library/datacarrier/buffer/Buffer.java | 76 ------------------- .../library/datacarrier/buffer/Channels.java | 6 +- .../datacarrier/consumer/ConsumerThread.java | 4 - 6 files changed, 18 insertions(+), 88 deletions(-) delete mode 100644 oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Buffer.java diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 5c360c2e14e4..bef119bce22b 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -39,6 +39,9 @@ * Fix `limit` doesn't work for `findEndpoint` API in ES storage. * Isolate MAL CounterWindow cache by metric name. * Fix JDBC Log query order. +* Change the DataCarrier IF_POSSIBLE strategy to use ArrayBlockingQueue implementation. +* Change the policy of the queue(DataCarrier) in the L1 metric aggregate worker to IF_POSSIBLE mode. +* Add self-observability metric `metrics_aggregator_abandon` to count the number of abandon metrics. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 698341144fde..255aa44306b9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; +import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; @@ -46,6 +47,7 @@ public class MetricsAggregateWorker extends AbstractWorker { private AbstractWorker nextWorker; private final DataCarrier dataCarrier; private final MergableBufferedData mergeDataCache; + private CounterMetrics abandonCounter; private CounterMetrics aggregationCounter; private long lastSendTime = 0; @@ -68,7 +70,7 @@ public class MetricsAggregateWorker extends AbstractWorker { queueBufferSize = 1_000; } this.dataCarrier = new DataCarrier<>( - "MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize); + "MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE); BulkConsumePool.Creator creator = new BulkConsumePool.Creator( name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20); @@ -82,6 +84,11 @@ public class MetricsAggregateWorker extends AbstractWorker { MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) .provider() .getService(MetricsCreator.class); + abandonCounter = metricsCreator.createCounter( + "metrics_aggregator_abandon", "The abandon number of rows received in aggregation", + new MetricsTag.Keys("metricName", "level", "dimensionality"), + new MetricsTag.Values(modelName, "1", "minute") + ); aggregationCounter = metricsCreator.createCounter( "metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), @@ -95,7 +102,9 @@ public class MetricsAggregateWorker extends AbstractWorker { */ @Override public final void in(Metrics metrics) { - dataCarrier.produce(metrics); + if (!dataCarrier.produce(metrics)) { + abandonCounter.inc(); + } } /** diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java index 5601d12528ad..502bc2fe3c83 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java +++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java @@ -40,8 +40,10 @@ public class ArrayBlockingQueueBuffer implements QueueBuffer { @Override public boolean save(T data) { - //only BufferStrategy.BLOCKING try { + if (BufferStrategy.IF_POSSIBLE.equals(strategy)) { + return queue.offer(data); + } queue.put(data); } catch (InterruptedException e) { // Ignore the error diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Buffer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Buffer.java deleted file mode 100644 index 93c8e39a4c17..000000000000 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Buffer.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.library.datacarrier.buffer; - -import java.util.List; -import org.apache.skywalking.oap.server.library.datacarrier.common.AtomicRangeInteger; - -/** - * Self implementation ring queue. - */ -public class Buffer implements QueueBuffer { - private final Object[] buffer; - private BufferStrategy strategy; - private AtomicRangeInteger index; - - Buffer(int bufferSize, BufferStrategy strategy) { - buffer = new Object[bufferSize]; - this.strategy = strategy; - index = new AtomicRangeInteger(0, bufferSize); - } - - @Override - public void setStrategy(BufferStrategy strategy) { - this.strategy = strategy; - } - - @Override - public boolean save(T data) { - int i = index.getAndIncrement(); - if (buffer[i] != null) { - switch (strategy) { - case IF_POSSIBLE: - return false; - default: - } - } - buffer[i] = data; - return true; - } - - @Override - public int getBufferSize() { - return buffer.length; - } - - @Override - public void obtain(List consumeList) { - this.obtain(consumeList, 0, buffer.length); - } - - void obtain(List consumeList, int start, int end) { - for (int i = start; i < end; i++) { - if (buffer[i] != null) { - consumeList.add((T) buffer[i]); - buffer[i] = null; - } - } - } - -} diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java index da323d180da6..d834287cc6ba 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java +++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java @@ -35,11 +35,7 @@ public Channels(int channelSize, int bufferSize, IDataPartitioner partitioner this.strategy = strategy; bufferChannels = new QueueBuffer[channelSize]; for (int i = 0; i < channelSize; i++) { - if (BufferStrategy.BLOCKING.equals(strategy)) { - bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy); - } else { - bufferChannels[i] = new Buffer<>(bufferSize, strategy); - } + bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy); } // noinspection PointlessArithmeticExpression size = 1L * channelSize * bufferSize; // it's not pointless, it prevents numeric overflow before assigning an integer to a long diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java index d5013f1c7b02..30caa739b5b7 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java +++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.skywalking.oap.server.library.datacarrier.buffer.Buffer; import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer; public class ConsumerThread extends Thread { @@ -88,9 +87,6 @@ void shutdown() { running = false; } - /** - * DataSource is a refer to {@link Buffer}. - */ class DataSource { private QueueBuffer sourceBuffer;