diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 36d7272ddc7..bb9012cff16 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -47,7 +47,7 @@ public class AgentConstants { public static final String DEFAULT_LOCAL_IP = "127.0.0.1"; public static final String DEFAULT_LOCAL_HOST = "localhost"; public static final String AGENT_STORE_CLASSNAME = "agent.store.classname"; - public static final String DEFAULT_AGENT_STORE_CLASSNAME = "org.apache.inlong.agent.store.RocksDBStoreImpl"; + public static final String DEFAULT_AGENT_STORE_CLASSNAME = "org.apache.inlong.agent.plugin.store.RocksDBStoreImpl"; // default use local ip as uniq id for agent. public static final String DEFAULT_AGENT_UNIQ_ID = AgentUtils.getLocalIp(); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestMemoryManager.java similarity index 97% rename from inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java rename to inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestMemoryManager.java index e203e0d760c..7729c165f8f 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestMemoryManager.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.inlong.agent.core.task; +package org.apache.inlong.agent.core; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.core.task.MemoryManager; import org.junit.Assert; import org.junit.BeforeClass; diff --git a/inlong-agent/agent-installer/conf/installer.properties b/inlong-agent/agent-installer/conf/installer.properties index dc52a0a1468..d74e2d23e47 100755 --- a/inlong-agent/agent-installer/conf/installer.properties +++ b/inlong-agent/agent-installer/conf/installer.properties @@ -39,6 +39,5 @@ agent.cluster.name=default_agent ############################ # whether to enable audit audit.enable=true -# audit proxy address # Use the audit proxy address if the audit proxy address is configured; otherwise get the proxy address from the manager -audit.proxys=127.0.0.1:10081 +audit.proxys=127.0.0.1:10081 \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 40a1f46c522..a477d840934 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -252,7 +252,7 @@ private void putIntoQueue(SourceData sourceData) { if (!offerSuc) { MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); } - LOGGER.debug("Read {} from source {}", sourceData.getData(), inlongGroupId); + LOGGER.debug("Put in source queue {} {}", new String(sourceData.getData()), inlongGroupId); } catch (InterruptedException e) { MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); LOGGER.error("fetchData offer failed", e); @@ -314,8 +314,10 @@ public Message read() { LOGGER.warn("poll {} data get interrupted.", instanceId); } if (sourceData == null) { + LOGGER.debug("Read from source queue null {}", inlongGroupId); return null; } + LOGGER.debug("Read from source queue {} {}", new String(sourceData.getData()), inlongGroupId); MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); return createMessage(sourceData); } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/RocksDBStoreImpl.java similarity index 98% rename from inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/RocksDBStoreImpl.java index b9be413f04d..0bc56ffe514 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/RocksDBStoreImpl.java @@ -15,10 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.agent.store; +package org.apache.inlong.agent.plugin.store; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.store.KeyValueEntity; +import org.apache.inlong.agent.store.Store; import com.google.gson.Gson; import org.rocksdb.AbstractImmutableNativeReference; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java index bcb04342ea6..4baa41eb2ac 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java @@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.store.RocksDBStoreImpl; +import org.apache.inlong.agent.plugin.store.RocksDBStoreImpl; import org.apache.inlong.agent.store.Store; import org.apache.inlong.agent.store.TaskStore; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 32234574ee7..dd9c3205763 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -81,21 +81,21 @@ public void teardownAgentHome() { } public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state, String cycleUnit) { - DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit); + TaskStateEnum state, String cycleUnit, String timeZone) { + DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit, timeZone); TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); return profile; } private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state, String cycleUnit) { + TaskStateEnum state, String cycleUnit, String timeZone) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); dataConfig.setInlongStreamId("testStreamId"); dataConfig.setDataReportType(1); dataConfig.setTaskType(3); dataConfig.setTaskId(taskId); - dataConfig.setTimeZone("GMT-8:00"); + dataConfig.setTimeZone(timeZone); dataConfig.setState(state.ordinal()); FileTaskConfig fileTaskConfig = new FileTaskConfig(); fileTaskConfig.setPattern(pattern); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java similarity index 92% rename from inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java index ea5a260b421..d3dc67df5ca 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java @@ -15,9 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.agent.core.instance; +package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.core.instance.ActionType; +import org.apache.inlong.agent.core.instance.InstanceAction; +import org.apache.inlong.agent.core.instance.InstanceManager; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.utils.AgentUtils; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java similarity index 92% rename from inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java index 13c52bd5052..ce9a77acf6e 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java @@ -15,13 +15,17 @@ * limitations under the License. */ -package org.apache.inlong.agent.core.instance; +package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.core.AgentBaseTestsHelper; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.core.instance.ActionType; +import org.apache.inlong.agent.core.instance.InstanceAction; +import org.apache.inlong.agent.core.instance.InstanceManager; import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.store.InstanceStore; import org.apache.inlong.agent.store.Store; import org.apache.inlong.agent.store.TaskStore; @@ -53,9 +57,10 @@ public class TestInstanceManager { @BeforeClass public static void setup() { helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome(); - String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt"; + String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt"; Store basicInstanceStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); - taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+6:00"); + taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR, + "GMT+6:00"); Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); TaskStore taskStore = new TaskStore(taskBasicStore); taskStore.storeTask(taskProfile); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java index b07126d9463..26e06c13261 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java @@ -47,7 +47,8 @@ public static void setUp() throws Exception { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00"); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); kafkaSink = new MockSink(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java index c12d0db7bb1..39f6ec8e716 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java @@ -47,7 +47,8 @@ public static void setUp() throws Exception { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00"); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); pulsarSink = new MockSink(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 897d9968675..bc0aeedbc9d 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -70,7 +70,8 @@ public static void setup() { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00"); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 2779dc6b3e8..1049bebb1a9 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -77,7 +77,8 @@ public static void setup() { private LogFileSource getSource(int taskId, long offset) { try { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00"); String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); InstanceProfile instanceProfile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime()); @@ -123,15 +124,20 @@ private void testFullRead() { srcLen += check[i].getBytes(StandardCharsets.UTF_8).length; } LogFileSource source = getSource(1, 0); - int cnt = 0; Message msg = source.read(); int readLen = 0; - while (msg != null) { - readLen += msg.getBody().length; - String record = new String(msg.getBody()); - Assert.assertTrue(record.compareTo(check[cnt]) == 0); + int cnt = 0; + while (cnt < check.length) { + if (msg != null) { + readLen += msg.getBody().length; + String record = new String(msg.getBody()); + Assert.assertEquals(0, record.compareTo(check[cnt])); + cnt++; + } else { + AgentUtils.silenceSleepInSeconds(1); + } + MemoryManager.getInstance().printAll(); msg = source.read(); - cnt++; } await().atMost(30, TimeUnit.SECONDS).until(() -> source.sourceFinish()); source.destroy(); diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestRocksDBStoreImpl.java similarity index 92% rename from inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestRocksDBStoreImpl.java index 5d1c2918157..749eecc930b 100644 --- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestRocksDBStoreImpl.java @@ -15,9 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.agent.store; +package org.apache.inlong.agent.plugin.store; -import org.apache.inlong.agent.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.KeyValueEntity; +import org.apache.inlong.agent.store.StateSearchKey; import org.junit.AfterClass; import org.junit.Assert; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestRocksDbKey.java similarity index 96% rename from inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestRocksDbKey.java index b854296872a..327db5868dd 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestRocksDbKey.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.agent.core.store; +package org.apache.inlong.agent.plugin.store; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.core.AgentBaseTestsHelper; import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.store.InstanceStore; import org.apache.inlong.agent.store.OffsetStore; import org.apache.inlong.agent.store.TaskStore; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java similarity index 95% rename from inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java index a5a572dc656..acfce5bb9a7 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.inlong.agent.core.task; +package org.apache.inlong.agent.plugin.task; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.core.task.TaskManager; import org.apache.inlong.agent.plugin.file.Task; import org.apache.inlong.agent.store.Store; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java index c1b8fe5c0b6..3a87eac388a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java @@ -77,7 +77,8 @@ public static void setup() { tempResourceName = LOADER.getResource("testScan/temp.txt").getPath(); File f = new File(tempResourceName); String pattern = f.getParent() + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, "D"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00"); try { String startStr = "2023-09-20 00:00:00"; String endStr = "2023-09-30 00:00:00"; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java similarity index 94% rename from inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java index 61c255bbdf1..c5370fc69d5 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.agent.core.task; +package org.apache.inlong.agent.plugin.task; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.core.AgentBaseTestsHelper; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.store.TaskStore; import org.apache.inlong.common.enums.TaskStateEnum; @@ -58,7 +59,7 @@ public void testTaskManager() { TaskStore taskStore = manager.getTaskStore(); for (int i = 1; i <= 10; i++) { TaskProfile taskProfile = helper.getTaskProfile(i, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, - "GMT+8:00"); + "D", "GMT+8:00"); taskProfile.setTaskClass(MockTask.class.getCanonicalName()); taskStore.storeTask(taskProfile); } @@ -74,7 +75,7 @@ public void testTaskManager() { } TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, - "GMT+8:00"); + "D", "GMT+8:00"); String taskId1 = taskProfile1.getTaskId(); taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); List taskProfiles1 = new ArrayList<>(); @@ -99,7 +100,7 @@ public void testTaskManager() { // test delete TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, - "GMT+8:00"); + "D", "GMT+8:00"); taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); List taskProfiles2 = new ArrayList<>(); taskProfiles2.add(taskProfile2); diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties index 5ae2f64560d..679b2987a57 100755 --- a/inlong-agent/conf/agent.properties +++ b/inlong-agent/conf/agent.properties @@ -18,10 +18,10 @@ # Agent config values can overwrite the values in the Constants class. ######################## -# bdb config +# Offset storage type, default is org.apache.inlong.agent.plugin.store.RocksDBStoreImpl, means store to local RocksDB +# If you choose to store in zk, it can be configured as org.apache.inlong.agent.plugin.store.ZooKeeperImpl ######################## -# bdb data readonly -agent.localStore.readonly=false +agent.store.classname = org.apache.inlong.agent.plugin.store.RocksDBStoreImpl ###################### # fetch center @@ -98,6 +98,5 @@ agent.prometheus.exporter.port=9080 ############################ # whether to enable audit audit.enable=true -# audit proxy address # Use the audit proxy address if the audit proxy address is configured; otherwise get the proxy address from the manager -audit.proxys=127.0.0.1:10081 \ No newline at end of file +audit.proxys=127.0.0.1:10081