From 298a6b9c4aec21ec37ecc84414e121d2e3f6b683 Mon Sep 17 00:00:00 2001 From: doleyzi <43397300+doleyzi@users.noreply.github.com> Date: Mon, 17 Jun 2024 21:04:02 +0800 Subject: [PATCH] [INLONG-10402][Audit] Audit Service supports the hourly Audit data one day ago (#10428) --- .../inlong/audit/cache/AbstractCache.java | 81 ++++++++++++++----- .../inlong/audit/config/ConfigConstants.java | 8 ++ .../inlong/audit/config/SqlConstants.java | 8 ++ .../inlong/audit/entities/CacheKeyEntity.java | 30 +++++++ .../apache/inlong/audit/sink/JdbcSink.java | 68 +++++++++++++++- 5 files changed, 172 insertions(+), 23 deletions(-) create mode 100644 inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/CacheKeyEntity.java diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java index 3ad1a43bff0..766f64f70dd 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java @@ -19,6 +19,7 @@ import org.apache.inlong.audit.config.Configuration; import org.apache.inlong.audit.entities.AuditCycle; +import org.apache.inlong.audit.entities.CacheKeyEntity; import org.apache.inlong.audit.entities.StatData; import org.apache.inlong.audit.utils.CacheUtils; @@ -27,9 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executors; @@ -57,6 +57,8 @@ public class AbstractCache { // According to the startTime and endTime of the request parameters, the maximum number of cache keys generated. private static final int MAX_CACHE_KEY_SIZE = 1440; + private final DateTimeFormatter FORMATTER_YYMMDDHHMMSS = DateTimeFormatter.ofPattern(DATE_FORMAT); + protected AbstractCache(AuditCycle auditCycle) { cache = Caffeine.newBuilder() .maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE, @@ -84,7 +86,6 @@ public Cache getCache() { } /** - * * @param startTime * @param endTime * @param inlongGroupId @@ -96,38 +97,46 @@ public Cache getCache() { public List getData(String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId, String auditTag) { List result = new LinkedList<>(); - List keyList = buildCacheKeyList(startTime, endTime, inlongGroupId, + List keyList = buildCacheKeyList(startTime, endTime, inlongGroupId, inlongStreamId, auditId, auditTag); - for (String cacheKey : keyList) { - StatData statData = cache.getIfPresent(cacheKey); + for (CacheKeyEntity cacheKey : keyList) { + StatData statData = cache.getIfPresent(cacheKey.getCacheKey()); if (null == statData) { // Compatible with scenarios where the auditTag openapi parameter can be empty. - statData = cache.getIfPresent(cacheKey + DEFAULT_AUDIT_TAG); + statData = cache.getIfPresent(cacheKey.getCacheKey() + DEFAULT_AUDIT_TAG); } if (null != statData) { result.add(statData); + } else { + statData = fetchDataFromAuditStorage(cacheKey.getStartTime(), cacheKey.getEndTime(), inlongGroupId, + inlongStreamId, + auditId, auditTag); + result.add(statData); } + } return result; } - private List buildCacheKeyList(String startTime, String endTime, String inlongGroupId, + private List buildCacheKeyList(String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId, String auditTag) { - List keyList = new LinkedList<>(); + List keyList = new LinkedList<>(); try { - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - Date startDate = dateFormat.parse(startTime); - Date endDate = dateFormat.parse(endTime); - for (int index = 0; index < MAX_CACHE_KEY_SIZE; index++) { - Calendar calendar = Calendar.getInstance(); - calendar.setTime(startDate); - calendar.add(Calendar.MINUTE, index * auditCycle.getValue()); - calendar.set(Calendar.SECOND, 0); - if (calendar.getTime().compareTo(endDate) > 0) { + LocalDateTime startDateTime = LocalDateTime.parse(startTime, FORMATTER_YYMMDDHHMMSS); + LocalDateTime endDateTime = LocalDateTime.parse(endTime, FORMATTER_YYMMDDHHMMSS); + LocalDateTime nowDateTime = LocalDateTime.now(); + LocalDateTime maxDateTime = endDateTime.isBefore(nowDateTime) ? endDateTime : nowDateTime; + + for (long index = 0; index < MAX_CACHE_KEY_SIZE; index++) { + LocalDateTime currentDateTime = startDateTime.plusMinutes(index * auditCycle.getValue()); + if (!currentDateTime.isBefore(maxDateTime)) { break; } - String time = dateFormat.format(calendar.getTime()); - keyList.add(CacheUtils.buildCacheKey(time, inlongGroupId, inlongStreamId, auditId, auditTag)); + String currentTime = currentDateTime.format(FORMATTER_YYMMDDHHMMSS); + String cacheKey = + CacheUtils.buildCacheKey(currentTime, inlongGroupId, inlongStreamId, auditId, auditTag); + keyList.add(new CacheKeyEntity(cacheKey, currentTime, + currentDateTime.plusMinutes(auditCycle.getValue()).format(FORMATTER_YYMMDDHHMMSS))); } } catch (Exception exception) { LOGGER.error("It has exception when build cache key list!", exception); @@ -149,4 +158,34 @@ public void destroy() { private void monitor() { LOGGER.info("{} api local cache size={}", auditCycle, cache.estimatedSize()); } + + private StatData fetchDataFromAuditStorage(String startTime, String endTime, String inlongGroupId, + String inlongStreamId, + String auditId, String auditTag) { + List allStatData = + RealTimeQuery.getInstance().queryLogTs(startTime, endTime, inlongGroupId, inlongStreamId, auditId); + + long totalCount = 0L; + long totalSize = 0L; + long totalDelay = 0L; + + for (StatData data : allStatData) { + if (auditTag.equals(data.getAuditTag()) || auditTag.equals(DEFAULT_AUDIT_TAG) || auditTag.isEmpty()) { + totalCount += data.getCount(); + totalSize += data.getSize(); + totalDelay += data.getDelay(); + } + } + + StatData statData = new StatData(); + statData.setLogTs(startTime); + statData.setInlongGroupId(inlongGroupId); + statData.setInlongStreamId(inlongStreamId); + statData.setAuditId(auditId); + statData.setAuditTag(auditTag); + statData.setCount(totalCount); + statData.setSize(totalSize); + statData.setDelay(totalDelay); + return statData; + } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java index 1b4cb6b81ee..d5afb1306e5 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -58,6 +58,14 @@ public class ConfigConstants { public static final String KEY_CONFIG_UPDATE_INTERVAL_SECONDS = "config.update.interval.seconds"; public static final int DEFAULT_CONFIG_UPDATE_INTERVAL_SECONDS = 60; + public static final String KEY_ENABLE_MANAGE_PARTITIONS = "enable.manage.partitions"; + public static final boolean DEFAULT_ENABLE_MANAGE_PARTITIONS = true; + public static final String KEY_CHECK_PARTITION_INTERVAL_HOURS = "check.partition.interval.hours"; + public static final int DEFAULT_CHECK_PARTITION_INTERVAL_HOURS = 6; + + public static final String KEY_AUDIT_DATA_TEMP_STORAGE_DAYS = "audit.data.temp.storage.days"; + public static final int DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS = 3; + public static final String KEY_DATASOURCE_POOL_SIZE = "datasource.pool.size"; public static final int DEFAULT_DATASOURCE_POOL_SIZE = 2; diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java index 036781d773c..9eb3950ea52 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java @@ -225,4 +225,12 @@ public class SqlConstants { "replace into audit_proxy_heartbeat (component, host, port)\n" + "values (?, ?, ?)"; + public static final String KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL = "audit.data.temp.add.partition.sql"; + public static final String DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL = + "ALTER TABLE audit_data_temp ADD PARTITION (PARTITION %s VALUES LESS THAN (TO_DAYS('%s')))"; + + public static final String KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = "audit.data.temp.delete.partition.sql"; + public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = + "ALTER TABLE audit_data_temp DROP PARTITION %s"; + } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/CacheKeyEntity.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/CacheKeyEntity.java new file mode 100644 index 00000000000..c4a42fd7e8a --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/CacheKeyEntity.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.entities; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class CacheKeyEntity { + + private String cacheKey; + private String startTime; + private String endTime; +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java index db3e76a143c..d6d4781d3dd 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java @@ -31,22 +31,30 @@ import java.sql.Connection; import java.sql.PreparedStatement; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_MANAGE_PARTITIONS; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_MANAGE_PARTITIONS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT; @@ -54,6 +62,10 @@ import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL; import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL; /** * Jdbc sink @@ -62,12 +74,16 @@ public class JdbcSink implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class); private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService partitionManagerTimer = Executors.newSingleThreadScheduledExecutor(); private final DataQueue dataQueue; private final int insertBatch; private final int pullTimeOut; private final SinkConfig sinkConfig; private DataSource dataSource; + private final DateTimeFormatter FORMATTER_YYMMDDHH = DateTimeFormatter.ofPattern("yyyyMMdd"); + private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) { this.dataQueue = dataQueue; this.sinkConfig = sinkConfig; @@ -90,6 +106,14 @@ public void start() { Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL, DEFAULT_SOURCE_DB_SINK_INTERVAL), TimeUnit.MILLISECONDS); + if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS, + DEFAULT_ENABLE_MANAGE_PARTITIONS)) { + partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions, + 0, + Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS, + DEFAULT_CHECK_PARTITION_INTERVAL_HOURS), + TimeUnit.HOURS); + } } /** @@ -125,8 +149,8 @@ private void process() { preparedStatement.executeBatch(); preparedStatement.clearBatch(); } - } catch (Exception e) { - LOGGER.error("Process exception! {}", e.getMessage()); + } catch (Exception exception) { + LOGGER.error("Process exception!", exception); } } @@ -153,6 +177,46 @@ protected void createDataSource() { dataSource = new HikariDataSource(config); } + private void managePartitions() { + addPartition(); + deletePartition(); + } + + private String formatPartitionName(LocalDate date) { + return "p" + date.format(FORMATTER_YYMMDDHH); + } + + private void addPartition() { + String partitionName = formatPartitionName(LocalDate.now().plusDays(1)); + String partitionValue = LocalDate.now().plusDays(2).format(FORMATTER_YY_MM_DD_HH); + String addPartitionSQL = String.format( + Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL, + DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL), + partitionName, partitionValue); + executeUpdate(addPartitionSQL); + } + + private void deletePartition() { + int daysToSubtract = Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS, + DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS); + String partitionName = formatPartitionName(LocalDate.now().minusDays(daysToSubtract)); + String deletePartitionSQL = String.format( + Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL, + DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL), + partitionName); + executeUpdate(deletePartitionSQL); + } + + private void executeUpdate(String updateSQL) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(updateSQL)) { + preparedStatement.executeUpdate(); + LOGGER.info("Execute update [{}] success!", updateSQL); + } catch (Exception exception) { + LOGGER.error("Execute update [{}] has exception!", updateSQL, exception); + } + } + public void destroy() { sinkTimer.shutdown(); }