Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10443][Agent] Put Rocksdb into the plugins module #10444

Merged
merged 8 commits into from
Jun 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AgentConstants {
public static final String DEFAULT_LOCAL_IP = "127.0.0.1";
public static final String DEFAULT_LOCAL_HOST = "localhost";
public static final String AGENT_STORE_CLASSNAME = "agent.store.classname";
public static final String DEFAULT_AGENT_STORE_CLASSNAME = "org.apache.inlong.agent.store.RocksDBStoreImpl";
public static final String DEFAULT_AGENT_STORE_CLASSNAME = "org.apache.inlong.agent.plugin.store.RocksDBStoreImpl";

// default use local ip as uniq id for agent.
public static final String DEFAULT_AGENT_UNIQ_ID = AgentUtils.getLocalIp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.inlong.agent.core.task;
package org.apache.inlong.agent.core;

import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.core.task.MemoryManager;

import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down
3 changes: 1 addition & 2 deletions inlong-agent/agent-installer/conf/installer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,5 @@ agent.cluster.name=default_agent
############################
# whether to enable audit
audit.enable=true
# audit proxy address
# Use the audit proxy address if the audit proxy address is configured; otherwise get the proxy address from the manager
audit.proxys=127.0.0.1:10081
audit.proxys=127.0.0.1:10081
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void putIntoQueue(SourceData sourceData) {
if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
}
LOGGER.debug("Read {} from source {}", sourceData.getData(), inlongGroupId);
LOGGER.debug("Put in source queue {} {}", new String(sourceData.getData()), inlongGroupId);
} catch (InterruptedException e) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
LOGGER.error("fetchData offer failed", e);
Expand Down Expand Up @@ -314,8 +314,10 @@ public Message read() {
LOGGER.warn("poll {} data get interrupted.", instanceId);
}
if (sourceData == null) {
LOGGER.debug("Read from source queue null {}", inlongGroupId);
return null;
}
LOGGER.debug("Read from source queue {} {}", new String(sourceData.getData()), inlongGroupId);
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
return createMessage(sourceData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.agent.store;
package org.apache.inlong.agent.plugin.store;

import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.store.KeyValueEntity;
import org.apache.inlong.agent.store.Store;

import com.google.gson.Gson;
import org.rocksdb.AbstractImmutableNativeReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.store.RocksDBStoreImpl;
import org.apache.inlong.agent.plugin.store.RocksDBStoreImpl;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.store.TaskStore;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ public void teardownAgentHome() {
}

public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state, String cycleUnit) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit);
TaskStateEnum state, String cycleUnit, String timeZone) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit, timeZone);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}

private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state, String cycleUnit) {
TaskStateEnum state, String cycleUnit, String timeZone) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
dataConfig.setDataReportType(1);
dataConfig.setTaskType(3);
dataConfig.setTaskId(taskId);
dataConfig.setTimeZone("GMT-8:00");
dataConfig.setTimeZone(timeZone);
dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.agent.core.instance;
package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
* limitations under the License.
*/

package org.apache.inlong.agent.core.instance;
package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.store.InstanceStore;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.store.TaskStore;
Expand Down Expand Up @@ -53,9 +57,10 @@ public class TestInstanceManager {
@BeforeClass
public static void setup() {
helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
Store basicInstanceStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+6:00");
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR,
"GMT+6:00");
Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
kafkaSink = new MockSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
pulsarSink = new MockSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public static void setup() {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public static void setup() {
private LogFileSource getSource(int taskId, long offset) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D");
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
InstanceProfile instanceProfile = taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime());
Expand Down Expand Up @@ -123,15 +124,20 @@ private void testFullRead() {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
LogFileSource source = getSource(1, 0);
int cnt = 0;
Message msg = source.read();
int readLen = 0;
while (msg != null) {
readLen += msg.getBody().length;
String record = new String(msg.getBody());
Assert.assertTrue(record.compareTo(check[cnt]) == 0);
int cnt = 0;
while (cnt < check.length) {
if (msg != null) {
readLen += msg.getBody().length;
String record = new String(msg.getBody());
Assert.assertEquals(0, record.compareTo(check[cnt]));
cnt++;
} else {
AgentUtils.silenceSleepInSeconds(1);
}
MemoryManager.getInstance().printAll();
msg = source.read();
cnt++;
}
await().atMost(30, TimeUnit.SECONDS).until(() -> source.sourceFinish());
source.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
* limitations under the License.
*/

package org.apache.inlong.agent.store;
package org.apache.inlong.agent.plugin.store;

import org.apache.inlong.agent.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.store.KeyValueEntity;
import org.apache.inlong.agent.store.StateSearchKey;

import org.junit.AfterClass;
import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.inlong.agent.core.store;
package org.apache.inlong.agent.plugin.store;

import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.store.InstanceStore;
import org.apache.inlong.agent.store.OffsetStore;
import org.apache.inlong.agent.store.TaskStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.inlong.agent.core.task;
package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.store.Store;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public static void setup() {
tempResourceName = LOADER.getResource("testScan/temp.txt").getPath();
File f = new File(tempResourceName);
String pattern = f.getParent() + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, "D");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
try {
String startStr = "2023-09-20 00:00:00";
String endStr = "2023-09-30 00:00:00";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.inlong.agent.core.task;
package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.common.enums.TaskStateEnum;

Expand Down Expand Up @@ -58,7 +59,7 @@ public void testTaskManager() {
TaskStore taskStore = manager.getTaskStore();
for (int i = 1; i <= 10; i++) {
TaskProfile taskProfile = helper.getTaskProfile(i, pattern, false, 0L, 0L, TaskStateEnum.RUNNING,
"GMT+8:00");
"D", "GMT+8:00");
taskProfile.setTaskClass(MockTask.class.getCanonicalName());
taskStore.storeTask(taskProfile);
}
Expand All @@ -74,7 +75,7 @@ public void testTaskManager() {
}

TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false, 0L, 0L, TaskStateEnum.RUNNING,
"GMT+8:00");
"D", "GMT+8:00");
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles1 = new ArrayList<>();
Expand All @@ -99,7 +100,7 @@ public void testTaskManager() {

// test delete
TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false, 0L, 0L, TaskStateEnum.RUNNING,
"GMT+8:00");
"D", "GMT+8:00");
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
taskProfiles2.add(taskProfile2);
Expand Down
9 changes: 4 additions & 5 deletions inlong-agent/conf/agent.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
# Agent config values can overwrite the values in the Constants class.

########################
# bdb config
# Offset storage type, default is org.apache.inlong.agent.plugin.store.RocksDBStoreImpl, means store to local RocksDB
# If you choose to store in zk, it can be configured as org.apache.inlong.agent.plugin.store.ZooKeeperImpl
########################
# bdb data readonly
agent.localStore.readonly=false
agent.store.classname = org.apache.inlong.agent.plugin.store.RocksDBStoreImpl

######################
# fetch center
Expand Down Expand Up @@ -98,6 +98,5 @@ agent.prometheus.exporter.port=9080
############################
# whether to enable audit
audit.enable=true
# audit proxy address
# Use the audit proxy address if the audit proxy address is configured; otherwise get the proxy address from the manager
audit.proxys=127.0.0.1:10081
audit.proxys=127.0.0.1:10081
Loading