Skip to content

Commit

Permalink
[INLONG-9736][Agent] Make time zone as a common parameter (#9737)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Feb 28, 2024
1 parent 76fc71e commit 45ca134
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public String getTimeOffset() {
}

public String getTimeZone() {
return get(TaskConstants.TASK_FILE_TIME_ZONE);
return get(TaskConstants.TASK_TIME_ZONE);
}

public TaskStateEnum getState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class CommonConstants {

public static final String PROXY_SENDER_MAX_TIMEOUT = "proxy.sender.maxTimeout";
// max timeout in seconds.
public static final int DEFAULT_PROXY_SENDER_MAX_TIMEOUT = 20;
public static final int DEFAULT_PROXY_SENDER_MAX_TIMEOUT = 60;

public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class FetcherConstants {

public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 10;
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;

public static final String AGENT_HEARTBEAT_INTERVAL = "agent.heartbeat.interval";
public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
public static final String TASK_TIME_ZONE = "task.timeZone";
public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ public static class FileTaskConfig {
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
// Asia/Shanghai
private String timeZone;
// For example: a=b&c=b&e=f
private String additionalAttr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ private static FileTask getFileJob(DataConfig dataConfig) {
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
}
if (taskConfig.getTimeZone() != null) {
fileTask.setTimeZone(taskConfig.getTimeZone());
}

if (taskConfig.getAdditionalAttr() != null) {
fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
Expand Down Expand Up @@ -413,6 +410,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
task.setState(dataConfig.getState());
task.setPredefinedFields(dataConfig.getPredefinedFields());
task.setCycleUnit(CycleUnitType.REAL_TIME);
task.setTimeZone(dataConfig.getTimeZone());

// set sink type
if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) {
Expand Down Expand Up @@ -523,6 +521,7 @@ public static class Task {
private String predefinedFields;
private Integer state;
private String cycleUnit;
private String timeZone;

private FileTask fileTask;
private BinlogJob binlogJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
dataConfig.setDataReportType(1);
dataConfig.setTaskType(3);
dataConfig.setTaskId(taskId);
dataConfig.setTimeZone(timeZone);
dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0h");
fileTaskConfig.setTimeZone(timeZone);
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
Expand Down Expand Up @@ -172,45 +175,19 @@ public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
private Runnable taskConfigFetchThread() {
return () -> {
Thread.currentThread().setName("ManagerFetcher");
int normalTaskId = 100;
int testState = 0;
int retryTaskId = 800;
long count = 1;
while (isRunnable()) {
try {
/*
* int configSleepTime = conf.getInt(AGENT_FETCHER_INTERVAL, DEFAULT_AGENT_FETCHER_INTERVAL);
* TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime));
*/
// fetch task config from manager
TaskResult taskresult;
String testDir = conf.get("test.dir", "");
LOGGER.info("test123 test.dir {}", testDir);
if (testDir == "") {
taskresult = getStaticConfig();
} else {
if (count % 10 == 0) {
normalTaskId++;
retryTaskId++;
}
if (testState == 1) {
testState = 2;
} else {
testState = 1;
}
taskresult = getTestConfig(testDir, normalTaskId, retryTaskId, testState);
int configSleepTime = conf.getInt(AGENT_FETCHER_INTERVAL, DEFAULT_AGENT_FETCHER_INTERVAL);
TaskResult taskResult = getStaticConfig();
if (taskResult != null) {
List<TaskProfile> taskProfiles = new ArrayList<>();
taskResult.getDataConfigs().forEach((config) -> {
TaskProfile profile = TaskProfile.convertToTaskProfile(config);
taskProfiles.add(profile);
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
}
if (taskresult == null) {
continue;
}
List<TaskProfile> taskProfiles = new ArrayList<>();
taskresult.getDataConfigs().forEach((config) -> {
TaskProfile profile = TaskProfile.convertToTaskProfile(config);
taskProfiles.add(profile);
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
count++;
AgentUtils.silenceSleepInSeconds(60);
TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime));
} catch (Throwable ex) {
LOGGER.warn("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public boolean isProfileValid(TaskProfile profile) {
LOGGER.error("task profile cycle unit must be consistent");
return false;
}
if (!profile.hasKey(TaskConstants.TASK_FILE_TIME_ZONE)) {
if (!profile.hasKey(TaskConstants.TASK_TIME_ZONE)) {
LOGGER.error("task profile needs time zone");
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
dataConfig.setDataReportType(1);
dataConfig.setTaskType(3);
dataConfig.setTaskId(taskId);
dataConfig.setTimeZone("GMT-8:00");
dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
// GMT-8:00 same with Asia/Shanghai
fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
Expand Down

0 comments on commit 45ca134

Please sign in to comment.