Skip to content

Commit

Permalink
[INLONG-10402][Audit] Audit Service supports the hourly Audit data on…
Browse files Browse the repository at this point in the history
…e day ago (#10428)
  • Loading branch information
doleyzi authored Jun 17, 2024
1 parent 5c16b6d commit 298a6b9
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.CacheKeyEntity;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.utils.CacheUtils;

Expand All @@ -27,9 +28,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -57,6 +57,8 @@ public class AbstractCache {
// According to the startTime and endTime of the request parameters, the maximum number of cache keys generated.
private static final int MAX_CACHE_KEY_SIZE = 1440;

private final DateTimeFormatter FORMATTER_YYMMDDHHMMSS = DateTimeFormatter.ofPattern(DATE_FORMAT);

protected AbstractCache(AuditCycle auditCycle) {
cache = Caffeine.newBuilder()
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
Expand Down Expand Up @@ -84,7 +86,6 @@ public Cache<String, StatData> getCache() {
}

/**
*
* @param startTime
* @param endTime
* @param inlongGroupId
Expand All @@ -96,38 +97,46 @@ public Cache<String, StatData> getCache() {
public List<StatData> getData(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId, String auditTag) {
List<StatData> result = new LinkedList<>();
List<String> keyList = buildCacheKeyList(startTime, endTime, inlongGroupId,
List<CacheKeyEntity> keyList = buildCacheKeyList(startTime, endTime, inlongGroupId,
inlongStreamId, auditId, auditTag);
for (String cacheKey : keyList) {
StatData statData = cache.getIfPresent(cacheKey);
for (CacheKeyEntity cacheKey : keyList) {
StatData statData = cache.getIfPresent(cacheKey.getCacheKey());
if (null == statData) {
// Compatible with scenarios where the auditTag openapi parameter can be empty.
statData = cache.getIfPresent(cacheKey + DEFAULT_AUDIT_TAG);
statData = cache.getIfPresent(cacheKey.getCacheKey() + DEFAULT_AUDIT_TAG);
}
if (null != statData) {
result.add(statData);
} else {
statData = fetchDataFromAuditStorage(cacheKey.getStartTime(), cacheKey.getEndTime(), inlongGroupId,
inlongStreamId,
auditId, auditTag);
result.add(statData);
}

}
return result;
}

private List<String> buildCacheKeyList(String startTime, String endTime, String inlongGroupId,
private List<CacheKeyEntity> buildCacheKeyList(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId, String auditTag) {
List<String> keyList = new LinkedList<>();
List<CacheKeyEntity> keyList = new LinkedList<>();
try {
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
Date startDate = dateFormat.parse(startTime);
Date endDate = dateFormat.parse(endTime);
for (int index = 0; index < MAX_CACHE_KEY_SIZE; index++) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
calendar.add(Calendar.MINUTE, index * auditCycle.getValue());
calendar.set(Calendar.SECOND, 0);
if (calendar.getTime().compareTo(endDate) > 0) {
LocalDateTime startDateTime = LocalDateTime.parse(startTime, FORMATTER_YYMMDDHHMMSS);
LocalDateTime endDateTime = LocalDateTime.parse(endTime, FORMATTER_YYMMDDHHMMSS);
LocalDateTime nowDateTime = LocalDateTime.now();
LocalDateTime maxDateTime = endDateTime.isBefore(nowDateTime) ? endDateTime : nowDateTime;

for (long index = 0; index < MAX_CACHE_KEY_SIZE; index++) {
LocalDateTime currentDateTime = startDateTime.plusMinutes(index * auditCycle.getValue());
if (!currentDateTime.isBefore(maxDateTime)) {
break;
}
String time = dateFormat.format(calendar.getTime());
keyList.add(CacheUtils.buildCacheKey(time, inlongGroupId, inlongStreamId, auditId, auditTag));
String currentTime = currentDateTime.format(FORMATTER_YYMMDDHHMMSS);
String cacheKey =
CacheUtils.buildCacheKey(currentTime, inlongGroupId, inlongStreamId, auditId, auditTag);
keyList.add(new CacheKeyEntity(cacheKey, currentTime,
currentDateTime.plusMinutes(auditCycle.getValue()).format(FORMATTER_YYMMDDHHMMSS)));
}
} catch (Exception exception) {
LOGGER.error("It has exception when build cache key list!", exception);
Expand All @@ -149,4 +158,34 @@ public void destroy() {
private void monitor() {
LOGGER.info("{} api local cache size={}", auditCycle, cache.estimatedSize());
}

private StatData fetchDataFromAuditStorage(String startTime, String endTime, String inlongGroupId,
String inlongStreamId,
String auditId, String auditTag) {
List<StatData> allStatData =
RealTimeQuery.getInstance().queryLogTs(startTime, endTime, inlongGroupId, inlongStreamId, auditId);

long totalCount = 0L;
long totalSize = 0L;
long totalDelay = 0L;

for (StatData data : allStatData) {
if (auditTag.equals(data.getAuditTag()) || auditTag.equals(DEFAULT_AUDIT_TAG) || auditTag.isEmpty()) {
totalCount += data.getCount();
totalSize += data.getSize();
totalDelay += data.getDelay();
}
}

StatData statData = new StatData();
statData.setLogTs(startTime);
statData.setInlongGroupId(inlongGroupId);
statData.setInlongStreamId(inlongStreamId);
statData.setAuditId(auditId);
statData.setAuditTag(auditTag);
statData.setCount(totalCount);
statData.setSize(totalSize);
statData.setDelay(totalDelay);
return statData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public class ConfigConstants {
public static final String KEY_CONFIG_UPDATE_INTERVAL_SECONDS = "config.update.interval.seconds";
public static final int DEFAULT_CONFIG_UPDATE_INTERVAL_SECONDS = 60;

public static final String KEY_ENABLE_MANAGE_PARTITIONS = "enable.manage.partitions";
public static final boolean DEFAULT_ENABLE_MANAGE_PARTITIONS = true;
public static final String KEY_CHECK_PARTITION_INTERVAL_HOURS = "check.partition.interval.hours";
public static final int DEFAULT_CHECK_PARTITION_INTERVAL_HOURS = 6;

public static final String KEY_AUDIT_DATA_TEMP_STORAGE_DAYS = "audit.data.temp.storage.days";
public static final int DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS = 3;

public static final String KEY_DATASOURCE_POOL_SIZE = "datasource.pool.size";
public static final int DEFAULT_DATASOURCE_POOL_SIZE = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,12 @@ public class SqlConstants {
"replace into audit_proxy_heartbeat (component, host, port)\n" +
"values (?, ?, ?)";

public static final String KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL = "audit.data.temp.add.partition.sql";
public static final String DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL =
"ALTER TABLE audit_data_temp ADD PARTITION (PARTITION %s VALUES LESS THAN (TO_DAYS('%s')))";

public static final String KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = "audit.data.temp.delete.partition.sql";
public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
"ALTER TABLE audit_data_temp DROP PARTITION %s";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.entities;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class CacheKeyEntity {

private String cacheKey;
private String startTime;
private String endTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,41 @@

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_MANAGE_PARTITIONS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_MANAGE_PARTITIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL;
import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;

/**
* Jdbc sink
Expand All @@ -62,12 +74,16 @@ public class JdbcSink implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class);
private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService partitionManagerTimer = Executors.newSingleThreadScheduledExecutor();
private final DataQueue dataQueue;
private final int insertBatch;
private final int pullTimeOut;
private final SinkConfig sinkConfig;
private DataSource dataSource;

private final DateTimeFormatter FORMATTER_YYMMDDHH = DateTimeFormatter.ofPattern("yyyyMMdd");
private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = DateTimeFormatter.ofPattern("yyyy-MM-dd");

public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
this.dataQueue = dataQueue;
this.sinkConfig = sinkConfig;
Expand All @@ -90,6 +106,14 @@ public void start() {
Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL,
DEFAULT_SOURCE_DB_SINK_INTERVAL),
TimeUnit.MILLISECONDS);
if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS,
DEFAULT_ENABLE_MANAGE_PARTITIONS)) {
partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions,
0,
Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS,
DEFAULT_CHECK_PARTITION_INTERVAL_HOURS),
TimeUnit.HOURS);
}
}

/**
Expand Down Expand Up @@ -125,8 +149,8 @@ private void process() {
preparedStatement.executeBatch();
preparedStatement.clearBatch();
}
} catch (Exception e) {
LOGGER.error("Process exception! {}", e.getMessage());
} catch (Exception exception) {
LOGGER.error("Process exception!", exception);
}
}

Expand All @@ -153,6 +177,46 @@ protected void createDataSource() {
dataSource = new HikariDataSource(config);
}

private void managePartitions() {
addPartition();
deletePartition();
}

private String formatPartitionName(LocalDate date) {
return "p" + date.format(FORMATTER_YYMMDDHH);
}

private void addPartition() {
String partitionName = formatPartitionName(LocalDate.now().plusDays(1));
String partitionValue = LocalDate.now().plusDays(2).format(FORMATTER_YY_MM_DD_HH);
String addPartitionSQL = String.format(
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL),
partitionName, partitionValue);
executeUpdate(addPartitionSQL);
}

private void deletePartition() {
int daysToSubtract = Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS,
DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
String partitionName = formatPartitionName(LocalDate.now().minusDays(daysToSubtract));
String deletePartitionSQL = String.format(
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL),
partitionName);
executeUpdate(deletePartitionSQL);
}

private void executeUpdate(String updateSQL) {
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(updateSQL)) {
preparedStatement.executeUpdate();
LOGGER.info("Execute update [{}] success!", updateSQL);
} catch (Exception exception) {
LOGGER.error("Execute update [{}] has exception!", updateSQL, exception);
}
}

public void destroy() {
sinkTimer.shutdown();
}
Expand Down

0 comments on commit 298a6b9

Please sign in to comment.