Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10535][Agent] Support minute level tasks #10536

Merged
merged 5 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public class CycleUnitType {

public static final String DAY = "D";
public static final String HOUR = "h";
public static final String MINUTE = "m";
public static final String REAL_TIME = "R";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.agent.utils;

import org.apache.inlong.agent.constant.CycleUnitType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,15 +47,11 @@ public static long timeStrConvertToMillSec(String time, String cycleUnit, TimeZo
throws ParseException {
long retTime = 0;
SimpleDateFormat df = null;
if (cycleUnit.equals("Y") && time.length() == 4) {
df = new SimpleDateFormat("yyyy");
} else if (cycleUnit.equals("M") && time.length() == 6) {
df = new SimpleDateFormat("yyyyMM");
} else if (cycleUnit.equals("D") && time.length() == 8) {
if (cycleUnit.equals(CycleUnitType.DAY) && time.length() == 8) {
df = new SimpleDateFormat("yyyyMMdd");
} else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) {
} else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR) && time.length() == 10) {
df = new SimpleDateFormat("yyyyMMddHH");
} else if (cycleUnit.contains("m") && time.length() == 12) {
} else if (cycleUnit.equals(CycleUnitType.MINUTE) && time.length() == 12) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("time {}, cycleUnit {} can't parse!", time, cycleUnit);
Expand All @@ -77,37 +75,18 @@ public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZo

Date dateTime = calendarInstance.getTime();
SimpleDateFormat df = null;
if ("Y".equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyy");
} else if ("M".equals(cycleUnit)) {
df = new SimpleDateFormat("yyyyMM");
} else if ("D".equalsIgnoreCase(cycleUnit)) {
if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMdd");
} else if ("h".equalsIgnoreCase(cycleUnit)) {
} else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHH");
} else if (cycleUnit.contains("m")) {
} else if (CycleUnitType.MINUTE.equals(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("cycleUnit {} can't parse!", cycleUnit);
df = new SimpleDateFormat("yyyyMMddHH");
}
df.setTimeZone(tz);
retTime = df.format(dateTime);

if (cycleUnit.contains("m")) {
int cycleNum = Integer.parseInt(cycleUnit.substring(0,
cycleUnit.length() - 1));
int mmTime = Integer.parseInt(retTime.substring(
retTime.length() - 2, retTime.length()));
String realMMTime = "";
if (cycleNum * (mmTime / cycleNum) <= 0) {
realMMTime = "0" + cycleNum * (mmTime / cycleNum);
} else {
realMMTime = "" + cycleNum * (mmTime / cycleNum);
}
retTime = retTime.substring(0, retTime.length() - 2) + realMMTime;
}

return retTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.utils.AgentUtils;
Expand Down Expand Up @@ -217,7 +218,7 @@ private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTask
String endStr = "2023-07-22 00:00:00";
Long start = 0L;
Long end = 0L;
String normalPattern = testDir + "YYYY/YYYYMMDD_2.log_[0-9]+";
String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr);
Expand All @@ -227,28 +228,31 @@ private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTask
} catch (ParseException e) {
e.printStackTrace();
}
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, state));
configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, state));
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, CycleUnitType.MINUTE, state));
configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, CycleUnitType.DAY, state));
return TaskResult.builder().dataConfigs(configs).build();
}

private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
int state) {
String cycleUnit, int state) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
dataConfig.setDataReportType(1); // 老字段 reportType
dataConfig.setInlongGroupId("devcloud_group_id"); // 老字段 groupId
dataConfig.setInlongStreamId("devcloud_stream_id"); // 老字段 streamId
dataConfig.setDataReportType(0); // 老字段 reportType
dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
dataConfig.setTaskId(taskId); // 老字段 任务 id
dataConfig.setState(state); // 新增! 任务状态 1 正常 2 暂停
dataConfig.setTimeZone("GMT+8:00");
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);// 正则
fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的
fileTaskConfig.setMaxFileCount(100); // 最大文件数
fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
fileTaskConfig.setCycleUnit(cycleUnit); // 新增! 任务周期 "D" 天 "h" 小时
fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
fileTaskConfig.setDataContentStyle("CSV");
fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String
for (Long time : dateRegion) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String filename = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
String fileName = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectoryByWildcard(fileName);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, filename, 3,
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, fileName, 3,
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
Expand All @@ -111,17 +111,6 @@ public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String
return infos;
}

public static ArrayList<String> scanFile(int maxFileNum, String originPattern, long dataTime) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(dataTime);

String filename = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, maxFileNum);
}

private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, String secondDir,
String fileName, long depth, int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
Expand Down Expand Up @@ -151,7 +140,7 @@ private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
ArrayList<String> directories = FilePathUtil
.getDirectoryLayers(logFileName);
.cutDirectoryByWildcardAndDateExpression(logFileName);
String parentDir = directories.get(0) + File.separator
+ directories.get(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void watchInit() {
}

public void addPathPattern(String originPattern) {
ArrayList<String> directories = FilePathUtil.getDirectoryLayers(originPattern);
ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
String basicStaticPath = directories.get(0);
LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, basicStaticPath});
/* Remember the failed watcher creations. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ public WatchEntity(WatchService watchService,
String cycleUnit) {
this.watchService = watchService;
this.originPattern = originPattern;
ArrayList<String> directoryLayers = FilePathUtil.getDirectoryLayers(originPattern);
ArrayList<String> directoryLayers = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
this.basicStaticPath = directoryLayers.get(0);
this.regexPattern = NewDateUtils.replaceDateExpressionWithRegex(originPattern);
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
ArrayList<String> directories = FilePathUtil.cutDirectory(originPattern);
ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcard(originPattern);
this.originPatternWithoutFileName = directories.get(0);
this.patternWithoutFileName = Pattern
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class FilePathUtil {
private static final String DAY = "DD";
private static final String HOUR = "hh";

public static ArrayList<String> cutDirectory(String directory) {
public static ArrayList<String> cutDirectoryByWildcard(String directory) {
String baseDirectory;
String regixDirecotry;
String fileName;
Expand Down Expand Up @@ -81,7 +81,7 @@ public static ArrayList<String> cutDirectory(String directory) {
return ret;
}

public static ArrayList<String> getDirectoryLayers(String directory) {
public static ArrayList<String> cutDirectoryByWildcardAndDateExpression(String directory) {
String baseDirectory;
String regixDirectory;
String fileName;
Expand Down Expand Up @@ -158,8 +158,8 @@ public static ArrayList<String> getDirectoryLayers(String directory) {
}

public static boolean isSameDir(String fileName1, String fileName2) {
ArrayList<String> ret1 = FilePathUtil.cutDirectory(fileName1);
ArrayList<String> ret2 = FilePathUtil.cutDirectory(fileName2);
ArrayList<String> ret1 = FilePathUtil.cutDirectoryByWildcard(fileName1);
ArrayList<String> ret2 = FilePathUtil.cutDirectoryByWildcard(fileName2);
return ret1.get(0).equals(ret2.get(0));
}

Expand Down
Loading
Loading