diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java index d12e825b858..4572464fbfd 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java @@ -19,7 +19,8 @@ public class CycleUnitType { - public static final String DAY = "D"; - public static final String HOUR = "h"; + public static final String DAY = "d"; + public static final String HOUR = "H"; + public static final String MINUTE = "m"; public static final String REAL_TIME = "R"; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java index fe6257d64ee..cd63d743184 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.agent.utils; +import org.apache.inlong.agent.constant.CycleUnitType; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,15 +47,11 @@ public static long timeStrConvertToMillSec(String time, String cycleUnit, TimeZo throws ParseException { long retTime = 0; SimpleDateFormat df = null; - if (cycleUnit.equals("Y") && time.length() == 4) { - df = new SimpleDateFormat("yyyy"); - } else if (cycleUnit.equals("M") && time.length() == 6) { - df = new SimpleDateFormat("yyyyMM"); - } else if (cycleUnit.equals("D") && time.length() == 8) { + if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY) && time.length() == 8) { df = new SimpleDateFormat("yyyyMMdd"); - } else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) { + } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR) && time.length() == 10) { df = new SimpleDateFormat("yyyyMMddHH"); - } else if (cycleUnit.contains("m") && time.length() == 12) { + } else if (cycleUnit.equals(CycleUnitType.MINUTE) && time.length() == 12) { df = new SimpleDateFormat("yyyyMMddHHmm"); } else { logger.error("time {}, cycleUnit {} can't parse!", time, cycleUnit); @@ -77,15 +75,11 @@ public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZo Date dateTime = calendarInstance.getTime(); SimpleDateFormat df = null; - if ("Y".equalsIgnoreCase(cycleUnit)) { - df = new SimpleDateFormat("yyyy"); - } else if ("M".equals(cycleUnit)) { - df = new SimpleDateFormat("yyyyMM"); - } else if ("D".equalsIgnoreCase(cycleUnit)) { + if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) { df = new SimpleDateFormat("yyyyMMdd"); - } else if ("h".equalsIgnoreCase(cycleUnit)) { + } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) { df = new SimpleDateFormat("yyyyMMddHH"); - } else if (cycleUnit.contains("m")) { + } else if (CycleUnitType.MINUTE.equals(cycleUnit)) { df = new SimpleDateFormat("yyyyMMddHHmm"); } else { logger.error("cycleUnit {} can't parse!", cycleUnit); @@ -93,21 +87,6 @@ public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZo } df.setTimeZone(tz); retTime = df.format(dateTime); - - if (cycleUnit.contains("m")) { - int cycleNum = Integer.parseInt(cycleUnit.substring(0, - cycleUnit.length() - 1)); - int mmTime = Integer.parseInt(retTime.substring( - retTime.length() - 2, retTime.length())); - String realMMTime = ""; - if (cycleNum * (mmTime / cycleNum) <= 0) { - realMMTime = "0" + cycleNum * (mmTime / cycleNum); - } else { - realMMTime = "" + cycleNum * (mmTime / cycleNum); - } - retTime = retTime.substring(0, retTime.length() - 2) + realMMTime; - } - return retTime; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index 812e4c25133..d31126860eb 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.ProfileFetcher; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.core.AgentManager; import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.agent.utils.AgentUtils; @@ -217,7 +218,7 @@ private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTask String endStr = "2023-07-22 00:00:00"; Long start = 0L; Long end = 0L; - String normalPattern = testDir + "YYYY/YYYYMMDD_2.log_[0-9]+"; + String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+"; String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+"; try { Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr); @@ -227,28 +228,31 @@ private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTask } catch (ParseException e) { e.printStackTrace(); } - configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, state)); - configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, state)); + configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, CycleUnitType.MINUTE, state)); + configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, CycleUnitType.DAY, state)); return TaskResult.builder().dataConfigs(configs).build(); } private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - int state) { + String cycleUnit, int state) { DataConfig dataConfig = new DataConfig(); - dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId - dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId - dataConfig.setDataReportType(1); // 老字段 reportType - dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集 - dataConfig.setTaskId(taskId); // 老字段 任务 id - dataConfig.setState(state); // 新增! 任务状态 1 正常 2 暂停 + dataConfig.setInlongGroupId("devcloud_group_id"); + dataConfig.setInlongStreamId("devcloud_stream_id"); + dataConfig.setDataReportType(0); + dataConfig.setTaskType(3); + dataConfig.setTaskId(taskId); + dataConfig.setState(state); + dataConfig.setTimeZone("GMT+8:00"); FileTaskConfig fileTaskConfig = new FileTaskConfig(); - fileTaskConfig.setPattern(pattern);// 正则 - fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的 - fileTaskConfig.setMaxFileCount(100); // 最大文件数 - fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时 - fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true + fileTaskConfig.setPattern(pattern); + fileTaskConfig.setTimeOffset("0d"); + fileTaskConfig.setMaxFileCount(100); + fileTaskConfig.setCycleUnit(cycleUnit); + fileTaskConfig.setRetry(retry); fileTaskConfig.setStartTime(startTime); fileTaskConfig.setEndTime(endTime); + fileTaskConfig.setDataContentStyle("CSV"); + fileTaskConfig.setDataSeparator("|"); dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); return dataConfig; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java index 58328540d40..e37b6deb896 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java @@ -94,11 +94,11 @@ public static List scanTaskBetweenTimes(String cycleUnit, String for (Long time : dateRegion) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); - String filename = NewDateUtils.replaceDateExpression(calendar, originPattern); - ArrayList allPaths = FilePathUtil.cutDirectory(filename); + String fileName = NewDateUtils.replaceDateExpression(calendar, originPattern); + ArrayList allPaths = FilePathUtil.cutDirectoryByWildcard(fileName); String firstDir = allPaths.get(0); String secondDir = allPaths.get(0) + File.separator + allPaths.get(1); - ArrayList fileList = getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, + ArrayList fileList = getUpdatedOrNewFiles(firstDir, secondDir, fileName, 3, DEFAULT_FILE_MAX_NUM); for (String file : fileList) { // TODO the time is not YYYYMMDDHH @@ -111,17 +111,6 @@ public static List scanTaskBetweenTimes(String cycleUnit, String return infos; } - public static ArrayList scanFile(int maxFileNum, String originPattern, long dataTime) { - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(dataTime); - - String filename = NewDateUtils.replaceDateExpression(calendar, originPattern); - ArrayList allPaths = FilePathUtil.cutDirectory(filename); - String firstDir = allPaths.get(0); - String secondDir = allPaths.get(0) + File.separator + allPaths.get(1); - return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, maxFileNum); - } - private static ArrayList getUpdatedOrNewFiles(String firstDir, String secondDir, String fileName, long depth, int maxFileNum) { ArrayList ret = new ArrayList(); @@ -151,7 +140,7 @@ private static ArrayList getUpdatedOrNewFiles(String logFileName, int maxFileNum) { ArrayList ret = new ArrayList(); ArrayList directories = FilePathUtil - .getDirectoryLayers(logFileName); + .cutDirectoryByWildcardAndDateExpression(logFileName); String parentDir = directories.get(0) + File.separator + directories.get(1); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java index 44495b6059e..44cff75f2fe 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java @@ -182,7 +182,7 @@ private void watchInit() { } public void addPathPattern(String originPattern) { - ArrayList directories = FilePathUtil.getDirectoryLayers(originPattern); + ArrayList directories = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern); String basicStaticPath = directories.get(0); LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, basicStaticPath}); /* Remember the failed watcher creations. */ @@ -530,7 +530,7 @@ private boolean checkFileNameForTime(String newFileName, WatchEntity entity) { PathDateExpression dateExpression = entity.getDateExpression(); if (dateExpression.getLongestDatePattern().length() != 0) { String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), dateExpression); - LOGGER.info("file {} ,fileTime {}", newFileName, dataTime); + LOGGER.info("file {}, fileTime {}", newFileName, dataTime); if (!NewDateUtils.isValidCreationTime(dataTime, entity.getCycleUnit(), taskProfile.getTimeOffset())) { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java index cac872182c7..a62a602784d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java @@ -65,11 +65,11 @@ public WatchEntity(WatchService watchService, String cycleUnit) { this.watchService = watchService; this.originPattern = originPattern; - ArrayList directoryLayers = FilePathUtil.getDirectoryLayers(originPattern); + ArrayList directoryLayers = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern); this.basicStaticPath = directoryLayers.get(0); this.regexPattern = NewDateUtils.replaceDateExpressionWithRegex(originPattern); pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); - ArrayList directories = FilePathUtil.cutDirectory(originPattern); + ArrayList directories = FilePathUtil.cutDirectoryByWildcard(originPattern); this.originPatternWithoutFileName = directories.get(0); this.patternWithoutFileName = Pattern .compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName), diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java index 2b7b98dfabd..912b0356425 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java @@ -29,7 +29,7 @@ public class FilePathUtil { private static final String DAY = "DD"; private static final String HOUR = "hh"; - public static ArrayList cutDirectory(String directory) { + public static ArrayList cutDirectoryByWildcard(String directory) { String baseDirectory; String regixDirecotry; String fileName; @@ -81,7 +81,7 @@ public static ArrayList cutDirectory(String directory) { return ret; } - public static ArrayList getDirectoryLayers(String directory) { + public static ArrayList cutDirectoryByWildcardAndDateExpression(String directory) { String baseDirectory; String regixDirectory; String fileName; @@ -158,8 +158,8 @@ public static ArrayList getDirectoryLayers(String directory) { } public static boolean isSameDir(String fileName1, String fileName2) { - ArrayList ret1 = FilePathUtil.cutDirectory(fileName1); - ArrayList ret2 = FilePathUtil.cutDirectory(fileName2); + ArrayList ret1 = FilePathUtil.cutDirectoryByWildcard(fileName1); + ArrayList ret2 = FilePathUtil.cutDirectoryByWildcard(fileName2); return ret1.get(0).equals(ret2.get(0)); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index 529e348fc52..563a168885e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -17,6 +17,7 @@ package org.apache.inlong.agent.plugin.utils.file; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.utils.DateTransUtils; import hirondelle.date4j.DateTime; @@ -30,7 +31,6 @@ import java.util.Calendar; import java.util.Date; import java.util.List; -import java.util.StringTokenizer; import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -170,34 +170,34 @@ private static Calendar getDateTime(Calendar calendar, String cycleUnit, String calendar.set(Calendar.MINUTE, minTime); /* Calculate the offset. */ - if ("D".equalsIgnoreCase(offsetUnit)) { + if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) { calendar.add(Calendar.DAY_OF_YEAR, offsetNumber); } - if ("H".equalsIgnoreCase(offsetUnit)) { + if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) { calendar.add(Calendar.HOUR_OF_DAY, offsetNumber); } } else if (cycleUnit.length() == 1) { - if ("D".equalsIgnoreCase(cycleUnit)) { + if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) { calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); - } else if ("h".equalsIgnoreCase(cycleUnit)) { + } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) { calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); } } /* Calculate the offset. */ - if ("D".equalsIgnoreCase(offsetUnit)) { + if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) { calendar.add(Calendar.DAY_OF_YEAR, offsetNumber); } - if ("h".equalsIgnoreCase(offsetUnit)) { + if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) { calendar.add(Calendar.HOUR_OF_DAY, offsetNumber); } - if ("m".equals(offsetUnit)) { + if (CycleUnitType.MINUTE.equals(offsetUnit)) { calendar.add(Calendar.MINUTE, offsetNumber); } @@ -207,15 +207,11 @@ private static Calendar getDateTime(Calendar calendar, String cycleUnit, String public static boolean isValidCreationTime(String dataTime, String cycleUnit, String timeOffset) { long timeInterval = 0; - if ("Y".equalsIgnoreCase(cycleUnit)) { + if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) { timeInterval = DAY_TIMEOUT_INTERVAL; - } else if ("M".equals(cycleUnit)) { + } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) { timeInterval = HOUR_TIMEOUT_INTERVAL; - } else if ("D".equalsIgnoreCase(cycleUnit)) { - timeInterval = DAY_TIMEOUT_INTERVAL; - } else if ("h".equalsIgnoreCase(cycleUnit)) { - timeInterval = HOUR_TIMEOUT_INTERVAL; - } else if (cycleUnit.contains("m")) { + } else if (cycleUnit.endsWith(CycleUnitType.MINUTE)) { timeInterval = HOUR_TIMEOUT_INTERVAL; } else { logger.error("cycleUnit {} can't parse!", cycleUnit); @@ -265,23 +261,20 @@ public static boolean isBraceContain(String dataName) { return matcher.find(); } - public static String getDateTime(String fileName, String dataName, - PathDateExpression dateExpression) { + public static String getDateTime(String fileName, String dataName, PathDateExpression dateExpression) { String dataTime = null; if (isBraceContain(dataName)) { String fullRegx = replaceDateExpressionWithRegex(dataName, "dataTime"); - Pattern fullPatt = Pattern.compile(fullRegx); - Matcher matcher = fullPatt.matcher(fileName); + Pattern fullPattern = Pattern.compile(fullRegx); + Matcher matcher = fullPattern.matcher(fileName); if (matcher.find()) { dataTime = matcher.group("dataTime"); } } else { dataTime = getDateTime(fileName, dateExpression); } - return dataTime; - } public static String getDateTime(String fileName, PathDateExpression dateExpression) { @@ -423,8 +416,7 @@ public static String replaceDateExpressionWithRegex(String dataPath, String date return sb.toString(); } - public static String replaceDateExpression(Calendar dateTime, - String dataPath) { + public static String replaceDateExpression(Calendar dateTime, String dataPath) { if (dataPath == null) { return null; } @@ -439,7 +431,6 @@ public static String replaceDateExpression(Calendar dateTime, // find longest DATEPATTERN ArrayList mp = extractAllTimeRegex(dataPath); - if (mp == null || mp.size() == 0) { return dataPath; } @@ -471,35 +462,7 @@ public static String replaceDateExpression(Calendar dateTime, return sb.toString(); } - public static String replaceDateExpression1(Calendar dateTime, - String logFileName) { - if (dateTime == null || logFileName == null) { - return null; - } - - String year = String.valueOf(dateTime.get(Calendar.YEAR)); - String month = String.valueOf(dateTime.get(Calendar.MONTH) + 1); - String day = String.valueOf(dateTime.get(Calendar.DAY_OF_MONTH)); - String hour = String.valueOf(dateTime.get(Calendar.HOUR_OF_DAY)); - String minute = String.valueOf(dateTime.get(Calendar.MINUTE)); - - int hhIndex = logFileName.indexOf("hh"); - int mmIndex = logFileName.indexOf("mm"); - - logFileName = logFileName.replaceAll("YYYY", year); - logFileName = logFileName.replaceAll("MM", externDate(month)); - logFileName = logFileName.replaceAll("DD", externDate(day)); - logFileName = logFileName.replaceAll("hh", externDate(hour)); - - if (hhIndex != -1 && mmIndex != -1 && mmIndex >= hhIndex + 2 - && mmIndex < hhIndex + 4) { - logFileName = logFileName.replaceAll("mm", externDate(minute)); - } - - return logFileName; - } - - private static String externDate(String time) { + public static String externDate(String time) { if (time.length() == 1) { return "0" + time; } else { @@ -507,51 +470,12 @@ private static String externDate(String time) { } } - public static String parseCycleUnit(String scheduleTime) { - String cycleUnit = "D"; - - StringTokenizer st = new StringTokenizer(scheduleTime, " "); - - if (st.countTokens() <= 0) { - return "D"; - } - - int index = 0; - while (st.hasMoreElements()) { - String currentString = st.nextToken(); - if (currentString.contains("/")) { - if (index == 1) { - cycleUnit = "10m"; - } else if (index == 2) { - cycleUnit = "h"; - } - break; - } - - if (currentString.equals("*")) { - if (index == 3) { - cycleUnit = "D"; - } - break; - } - - index++; - } - - logger.info("ScheduleTime: " + scheduleTime + ", cycleUnit: " - + cycleUnit); - - return cycleUnit; - } - public static List getDateRegion(long startTime, long endTime, String cycleUnit) { List ret = new ArrayList(); DateTime dtStart = DateTime.forInstant(startTime, TimeZone.getDefault()); DateTime dtEnd = DateTime.forInstant(endTime, TimeZone.getDefault()); - if (cycleUnit.equals("M")) { - dtEnd = dtEnd.getEndOfMonth(); - } else if (cycleUnit.equals("D")) { + if (cycleUnit.equals(CycleUnitType.DAY)) { dtEnd = dtEnd.getEndOfDay(); } @@ -561,22 +485,12 @@ public static List getDateRegion(long startTime, long endTime, String cycl int hour = 0; int minute = 0; int second = 0; - if (cycleUnit.equalsIgnoreCase("Y")) { - year = 1; - } else if (cycleUnit.equals("M")) { - month = 1; - } else if (cycleUnit.equalsIgnoreCase("D")) { + if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY)) { day = 1; - } else if (cycleUnit.equalsIgnoreCase("h")) { + } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR)) { hour = 1; - } else if (cycleUnit.equals("10m")) { - minute = 10; - } else if (cycleUnit.equals("15m")) { - minute = 15; - } else if (cycleUnit.equals("30m")) { - minute = 30; - } else if (cycleUnit.equalsIgnoreCase("s")) { - second = 1; + } else if (cycleUnit.equals(CycleUnitType.MINUTE)) { + minute = 1; } else { logger.error("cycleUnit {} is error: ", cycleUnit); return ret; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index dd9c3205763..10e4532be2c 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -108,8 +108,7 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long fileTaskConfig.setEndTime(endTime); // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf fileTaskConfig.setDataContentStyle("mix"); - // 124 is the ASCII code of '|' - fileTaskConfig.setDataSeparator("124"); + fileTaskConfig.setDataSeparator("|"); dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); return dataConfig; } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java index 3a87eac388a..cf8128a0be7 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java @@ -19,13 +19,13 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.task.TaskManager; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.task.file.LogFileTask; import org.apache.inlong.common.enums.TaskStateEnum; -import com.google.gson.Gson; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -41,7 +41,10 @@ import java.io.File; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -55,15 +58,9 @@ public class TestLogFileTask { private static final Logger LOGGER = LoggerFactory.getLogger(TestLogFileTask.class); private static final ClassLoader LOADER = TestLogFileTask.class.getClassLoader(); - private static LogFileTask task; private static AgentBaseTestsHelper helper; - private static final Gson GSON = new Gson(); private static TaskManager manager; - private static MockInstanceManager instanceManager = new MockInstanceManager(); - private static String tempResourceName; - private static String resourceName; - private static String fileName; - private static String dataTime; + private static String resourceParentPath; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, @@ -71,52 +68,76 @@ public class TestLogFileTask { new AgentThreadFactory("TestLogfileCollectTask")); @BeforeClass - public static void setup() { + public static void setup() throws Exception { helper = new AgentBaseTestsHelper(TestLogFileTask.class.getName()).setupAgentHome(); - resourceName = LOADER.getResource("testScan/20230928_1/test_1.txt").getPath(); - tempResourceName = LOADER.getResource("testScan/temp.txt").getPath(); - File f = new File(tempResourceName); - String pattern = f.getParent() + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, "D", + resourceParentPath = new File(LOADER.getResource("testScan/temp.txt").getPath()).getParent(); + manager = new TaskManager(); + } + + @AfterClass + public static void teardown() throws Exception { + helper.teardownAgentHome(); + } + + @Test + public void testScan() throws Exception { + doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"), + resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY, Arrays.asList("20230928"), + "2023-09-28 00:00:00", "2023-09-30 23:00:00"); + doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"), + resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", + CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023-09-28 00:00:00", "2023-09-30 23:00:00"); + doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt", "testScan/202309301059_1/test_1.txt"), + resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt", + CycleUnitType.MINUTE, Arrays.asList("202309281030", "202309301059"), "2023-09-28 00:00:00", + "2023-09-30 23:00:00"); + doTest(4, Arrays.asList("testScan/20241030/23/59.txt"), + resourceParentPath + "/YYYYMMDD/hh/mm.txt", + CycleUnitType.MINUTE, Arrays.asList("202410302359"), "2024-10-30 00:00:00", "2024-10-31 00:00:00"); + } + + private void doTest(int taskId, List resources, String pattern, String cycle, List srcDataTimes, + String startTime, String endTime) + throws Exception { + List resourceName = new ArrayList<>(); + for (int i = 0; i < resources.size(); i++) { + resourceName.add(LOADER.getResource(resources.get(i)).getPath()); + } + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, cycle, "GMT+8:00"); + LogFileTask dayTask = null; + final List fileName = new ArrayList(); + final List dataTime = new ArrayList(); try { - String startStr = "2023-09-20 00:00:00"; - String endStr = "2023-09-30 00:00:00"; - Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr); + + Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startTime); long start = parse.getTime(); - parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr); + parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endTime); long end = parse.getTime(); taskProfile.setLong(TaskConstants.TASK_START_TIME, start); taskProfile.setLong(TaskConstants.TASK_END_TIME, end); - manager = new TaskManager(); - task = PowerMockito.spy(new LogFileTask()); + dayTask = PowerMockito.spy(new LogFileTask()); PowerMockito.doAnswer(invocation -> { - fileName = invocation.getArgument(0); - dataTime = invocation.getArgument(1); + fileName.add(invocation.getArgument(0)); + dataTime.add(invocation.getArgument(1)); return null; - }).when(task, "addToEvenMap", Mockito.anyString(), Mockito.anyString()); - Assert.assertTrue(task.isProfileValid(taskProfile)); + }).when(dayTask, "addToEvenMap", Mockito.anyString(), Mockito.anyString()); + Assert.assertTrue(dayTask.isProfileValid(taskProfile)); manager.getTaskStore().storeTask(taskProfile); - task.init(manager, taskProfile, manager.getInstanceBasicStore()); - EXECUTOR_SERVICE.submit(task); + dayTask.init(manager, taskProfile, manager.getInstanceBasicStore()); + EXECUTOR_SERVICE.submit(dayTask); } catch (Exception e) { LOGGER.error("source init error {}", e); Assert.assertTrue("source init error", false); } - } - - @AfterClass - public static void teardown() throws Exception { - task.destroy(); - helper.teardownAgentHome(); - } - - @Test - public void testTaskManager() throws Exception { - await().atMost(2, TimeUnit.SECONDS).until(() -> fileName != null && dataTime != null); - Assert.assertTrue(fileName.compareTo(resourceName) == 0); - Assert.assertTrue(dataTime.compareTo("20230928") == 0); - PowerMockito.verifyPrivate(task, Mockito.times(1)) + await().atMost(10, TimeUnit.SECONDS) + .until(() -> fileName.size() == resources.size() && dataTime.size() == resources.size()); + for (int i = 0; i < fileName.size(); i++) { + Assert.assertEquals(0, fileName.get(i).compareTo(resourceName.get(i))); + Assert.assertEquals(0, dataTime.get(i).compareTo(srcDataTimes.get(i))); + } + PowerMockito.verifyPrivate(dayTask, Mockito.times(resources.size())) .invoke("addToEvenMap", Mockito.anyString(), Mockito.anyString()); + dayTask.destroy(); } } \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java index ea575e613d3..cf1b7c8f958 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.agent.plugin.utils; +import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; +import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.metric.MetricRegister; @@ -34,7 +36,10 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; import java.util.List; import static org.mockito.ArgumentMatchers.any; @@ -54,37 +59,67 @@ public void testCalcOffset() { Assert.assertTrue(DateTransUtils.calcOffset("") == 0); } - public static String getTestTriggerProfile() { - return "{\n" - + " \"job\": {\n" - + " \"fileJob\": {\n" - + " \"additionStr\": \"m=15&file=test\",\n" - + " \"trigger\": \"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger\",\n" - + " \"dir\": {\n" - + " \"path\": \"\",\n" - + " \"patterns\": \"/AgentBaseTestsHelper/" - + "org.apache.tubemq.inlong.plugin.fetcher.TestTdmFetcher/test.dat\"\n" - + " },\n" - + " \"thread\" : {\n" - + "\"running\": {\n" - + "\"core\": \"4\"\n" - + "}\n" - + "} \n" - + " },\n" - + " \"id\": 1,\n" - + " \"op\": 0,\n" - + " \"ip\": \"127.0.0.1\",\n" - + " \"groupId\": \"groupId\",\n" - + " \"streamId\": \"streamId\",\n" - + " \"name\": \"fileAgentTest\",\n" - + " \"source\": \"org.apache.inlong.agent.plugin.sources.TextFileSource\",\n" - + " \"sink\": \"org.apache.inlong.agent.plugin.sinks.MockSink\",\n" - + " \"channel\": \"org.apache.inlong.agent.plugin.channel.MemoryChannel\",\n" - + " \"standalone\": true,\n" - + " \"deliveryTime\": \"1231313\",\n" - + " \"splitter\": \"&\"\n" - + " }\n" - + " }"; + @Test + public void testPattern() throws ParseException { + /* + * Condition: YYYY(?:.MM|MM)?(?:.DD|DD)?(?:.hh|hh)?(?:.mm|mm)?(?:.ss|ss)? The date expression as a whole must + * meet this condition in order to match Need to start with YYYY, and month, day, hour, minute can only be + * separated by one character + */ + testReplaceDateExpression("/YYYYMMDDhhmm.log", "/202406251007.log"); + testReplaceDateExpression("/YYYY.log", "/2024.log"); + testReplaceDateExpression("/YYYYhhmm.log", "/20241007.log"); + testReplaceDateExpression("/YYYY/YYYYMMDDhhmm.log", "/2024/202406251007.log"); + testReplaceDateExpression("/YYYY/MMDD/hhmm.log", "/2024/0625/1007.log"); + testReplaceDateExpression("/data/YYYYMMDD.hh/mm.log_[0-9]+", "/data/20240625.10/07.log_[0-9]+"); + // error cases + testReplaceDateExpression("/YYY.log", "/YYY.log"); + testReplaceDateExpression("/MMDDhhmm.log", "/MMDDhhmm.log"); + testReplaceDateExpression("/MMDD/hhmm.log", "/MMDD/hhmm.log"); + testReplaceDateExpression("/data/YYYYMMDD..hh/mm.log_[0-9]+", "/data/20240625..hh/mm.log_[0-9]+"); + + /* + * 1 cut the file name 2 cut the path contains wildcard + */ + testCutDirectoryByWildcard("/data/123/YYYYMMDDhhmm.log", + Arrays.asList("/data/123", "", "YYYYMMDDhhmm.log")); + testCutDirectoryByWildcard("/data/YYYYMMDDhhmm/test.log", + Arrays.asList("/data/YYYYMMDDhhmm", "", "test.log")); + testCutDirectoryByWildcard("/data/YYYYMMDDhhmm*/test.log", + Arrays.asList("/data", "YYYYMMDDhhmm*", "test.log")); + testCutDirectoryByWildcard("/data/log_minute/minute_YYYYMMDDhh*/mm.log_[0-9]+", + Arrays.asList("/data/log_minute", "minute_YYYYMMDDhh*", "mm.log_[0-9]+")); + testCutDirectoryByWildcard("/data/123+/YYYYMMDDhhmm.log", + Arrays.asList("/data", "123+", "YYYYMMDDhhmm.log")); + + /* + * 1 cut the file name 2 cut the path contains wildcard or date expression + */ + testCutDirectoryByWildcardAndDateExpression("/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm.log", + Arrays.asList("/data", "YYYYMM/YYYaaMM", "YYYYMMDDhhmm.log")); + testCutDirectoryByWildcardAndDateExpression("/data/YYYY/test.log", + Arrays.asList("/data", "YYYY", "test.log")); + testCutDirectoryByWildcardAndDateExpression("/data/123*/MMDD/test.log", + Arrays.asList("/data", "123*/MMDD", "test.log")); + testCutDirectoryByWildcardAndDateExpression("/data/YYYYMMDD/123*/test.log", + Arrays.asList("/data", "YYYYMMDD/123*", "test.log")); + } + + private void testReplaceDateExpression(String src, String dst) throws ParseException { + Calendar calendar = Calendar.getInstance(); + Long dataTime = DateTransUtils.timeStrConvertToMillSec("202406251007", "m"); + calendar.setTimeInMillis(dataTime); + Assert.assertEquals(NewDateUtils.replaceDateExpression(calendar, src), dst); + } + + private void testCutDirectoryByWildcard(String src, List dst) { + ArrayList directories = FilePathUtil.cutDirectoryByWildcard(src); + Assert.assertEquals(directories, dst); + } + + private void testCutDirectoryByWildcardAndDateExpression(String src, List dst) { + ArrayList directoryLayers = FilePathUtil.cutDirectoryByWildcardAndDateExpression(src); + Assert.assertEquals(directoryLayers, dst); } public static void createHugeFiles(String fileName, String rootDir, String record) throws Exception { diff --git a/inlong-agent/agent-plugins/src/test/resources/testScan/202309281030_1/test_1.txt b/inlong-agent/agent-plugins/src/test/resources/testScan/202309281030_1/test_1.txt new file mode 100644 index 00000000000..780b09709f5 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/resources/testScan/202309281030_1/test_1.txt @@ -0,0 +1,3 @@ +hello line-end-symbol aa +world line-end-symbol +agent line-end-symbol diff --git a/inlong-agent/agent-plugins/src/test/resources/testScan/2023092810_1/test_1.txt b/inlong-agent/agent-plugins/src/test/resources/testScan/2023092810_1/test_1.txt new file mode 100644 index 00000000000..780b09709f5 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/resources/testScan/2023092810_1/test_1.txt @@ -0,0 +1,3 @@ +hello line-end-symbol aa +world line-end-symbol +agent line-end-symbol diff --git a/inlong-agent/agent-plugins/src/test/resources/testScan/202309301059_1/test_1.txt b/inlong-agent/agent-plugins/src/test/resources/testScan/202309301059_1/test_1.txt new file mode 100644 index 00000000000..780b09709f5 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/resources/testScan/202309301059_1/test_1.txt @@ -0,0 +1,3 @@ +hello line-end-symbol aa +world line-end-symbol +agent line-end-symbol diff --git a/inlong-agent/agent-plugins/src/test/resources/testScan/20241030/23/59.txt b/inlong-agent/agent-plugins/src/test/resources/testScan/20241030/23/59.txt new file mode 100644 index 00000000000..780b09709f5 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/resources/testScan/20241030/23/59.txt @@ -0,0 +1,3 @@ +hello line-end-symbol aa +world line-end-symbol +agent line-end-symbol