Skip to content

Commit

Permalink
Merge 1f28c29 into 9a48aca
Browse files Browse the repository at this point in the history
  • Loading branch information
xinxingi authored Apr 18, 2024
2 parents 9a48aca + 1f28c29 commit db90c4e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ public static <T> List<T> toList(String json, Class<T> clazz) {
* @return true if valid
*/
public static boolean checkJsonValid(String json) {
return checkJsonValid(json, true);
}

public static boolean checkJsonValid(String json, Boolean logFlag) {
if (Strings.isNullOrEmpty(json)) {
return false;
}
Expand All @@ -205,7 +208,8 @@ public static boolean checkJsonValid(String json) {
objectMapper.readTree(json);
return true;
} catch (IOException e) {
log.error("check json object valid exception!", e);
if (logFlag)
log.error("check json object valid exception!", e);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ private Constants() {
public static final String STARTUP_SCRIPT_SPARK = "spark";
public static final String STARTUP_SCRIPT_FLINK = "flink";
public static final String STARTUP_SCRIPT_SEATUNNEL = "seatunnel";
public static final String JSON_SUFFIX = "json";
public static final String CONF_SUFFIX = "conf";

}
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ private String buildCustomConfigContent() {
}

private String buildConfigFilePath() {
return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
return String.format("%s/seatunnel_%s.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), formatDetector());
}

private String formatDetector() {
return JSONUtils.checkJsonValid(seatunnelParameters.getRawScript(), false) ? Constants.JSON_SUFFIX
: Constants.CONF_SUFFIX;
}

private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class SeatunnelTaskTest {
private static final String EXECUTE_PATH = "/home";
private static final String TASK_APPID = "9527";

@Test
public void formatDetector() throws Exception{
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setRawScript(RAW_SCRIPT);

TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(TASK_APPID);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));

SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.conf", seatunnelTask.buildCustomConfigCommand());

seatunnelParameters.setRawScript(RAW_SCRIPT_2);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.json", seatunnelTask.buildCustomConfigCommand());
}
private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" +
" checkpoint.interval = 10000\n" +
"}\n" +
"\n" +
"source {\n" +
" FakeSource {\n" +
" parallelism = 2\n" +
" result_table_name = \"fake\"\n" +
" row.num = 16\n" +
" schema = {\n" +
" fields {\n" +
" name = \"string\"\n" +
" age = \"int\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n" +
"\n" +
"sink {\n" +
" Console {\n" +
" }\n" +
"}";
private static final String RAW_SCRIPT_2 = "{\n" +
" \"env\": {\n" +
" \"execution.parallelism\": 2,\n" +
" \"job.mode\": \"BATCH\",\n" +
" \"checkpoint.interval\": 10000\n" +
" },\n" +
" \"source\": {\n" +
" \"FakeSource\": {\n" +
" \"parallelism\": 2,\n" +
" \"result_table_name\": \"fake\",\n" +
" \"row.num\": 16,\n" +
" \"schema\": {\n" +
" \"fields\": {\n" +
" \"name\": \"string\",\n" +
" \"age\": \"int\"\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"sink\": {\n" +
" \"Console\": {}\n" +
" }\n" +
"}";
}

0 comments on commit db90c4e

Please sign in to comment.