From 8cda41d1ff913c20b0a5eb30467b8f5e03a1cbf7 Mon Sep 17 00:00:00 2001 From: Coen Date: Fri, 16 Sep 2022 16:28:47 +0800 Subject: [PATCH] [Feature][Connector2] Add DingTalk Source #2684 --- docs/en/connector-v2/source/dingtalk.md | 71 ++++++++++++ plugin-mapping.properties | 1 + .../connector-dingtalk/pom.xml | 13 +++ .../seatunnel/common/DingTalkConstant.java | 33 ++++++ .../seatunnel/common/DingTalkParameter.java | 60 ++++++++++ .../seatunnel/common/DingTalkUtil.java | 51 +++++++++ .../seatunnel/source/DingTalkSource.java | 105 ++++++++++++++++++ .../source/DingTalkSourceReader.java | 88 +++++++++++++++ .../flink/v2/SeaTunnelDingTalkApiExample.java | 16 +++ .../examples/dingtalk_source_to_console.conf | 48 ++++++++ 10 files changed, 486 insertions(+) create mode 100644 docs/en/connector-v2/source/dingtalk.md create mode 100644 seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkConstant.java create mode 100644 seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkParameter.java create mode 100644 seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkUtil.java create mode 100644 seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSource.java create mode 100644 seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSourceReader.java create mode 100644 seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/dingtalk_source_to_console.conf diff --git a/docs/en/connector-v2/source/dingtalk.md b/docs/en/connector-v2/source/dingtalk.md new file mode 100644 index 00000000000..d9ecac58da2 --- /dev/null +++ b/docs/en/connector-v2/source/dingtalk.md @@ -0,0 +1,71 @@ +# DingTalk + +> DinkTalk source connector +## Description + +A source plugin which use DingTalk + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------| ---------- | -------- | ------------- | +| api_client | string | yes | - | +| access_token | string | yes | - | +| app_key | string | yes | - | +| app_secret | string | yes | - | + + +### url [string] + +[DingTalk API](https://open.dingtalk.com/document/orgapp-server/api-overview) address like : https://oapi.dingtalk.com/topapi/v2/department/listsub(string) + +### access_token [string] + +DingTalk access_token [DingTalk Doc](https://open.dingtalk.com/document/orgapp-server/obtain-the-access_token-of-an-internal-app) +The valid period of access token is 7200 seconds , if access token expired , can use app_key and app_secret get new message (string) +access_token is the source voucher , can direct use of access_token or use app_key and app_secret get it + +### app_key [string] + +DingTalk app key (string) + +### app_secret [string] + +DingTalk app secret (string) + +## Example + +```hocon +source { + DingTalk { + api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub" + access_token="8c61c395035c37c7812b9b1b1dbecb20" + } +} +or +source { + DingTalk { + api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub" + app_key="dingpsi2nsmw2v" + app_secret="qv5pLes-M9JvHXjaBqkPFdAyk9WKEjDjEZOpLZKHi" + } +} + +result : +row=1 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101254282,"ext":"{\"faceCount\":\"1\"}","name":"技术部","parent_id":1} +row=2 : {"auto_add_user":true,"create_dept_group":true,"dept_id":101279294,"name":"财务部","parent_id":1} +row=3 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101316242,"ext":"{\"faceCount\":\"2\"}","name":"渠道部","parent_id":1} +row=4 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101340237,"name":"运营部","parent_id":1} +row=5 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101467231,"ext":"{\"faceCount\":\"1\"}","name":"客服部","parent_id":1} +row=6 : {"auto_add_user":true,"create_dept_group":true,"dept_id":101532253,"name":"人力部","parent_id":1} +row=7 : {"auto_add_user":true,"create_dept_group":true,"dept_id":101532254,"name":"直销部","parent_id":1} +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index cde36d782a4..85a6c58ab4f 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -116,6 +116,7 @@ seatunnel.sink.OssFile = connector-file-oss seatunnel.source.Pulsar = connector-pulsar seatunnel.source.Hudi = connector-hudi seatunnel.sink.DingTalk = connector-dingtalk +seatunnel.source.DingTalk = connector-dingtalk seatunnel.sink.elasticsearch = connector-elasticsearch seatunnel.source.IoTDB = connector-iotdb seatunnel.sink.IoTDB = connector-iotdb diff --git a/seatunnel-connectors-v2/connector-dingtalk/pom.xml b/seatunnel-connectors-v2/connector-dingtalk/pom.xml index f6efee3f26f..556e39774a6 100644 --- a/seatunnel-connectors-v2/connector-dingtalk/pom.xml +++ b/seatunnel-connectors-v2/connector-dingtalk/pom.xml @@ -48,6 +48,19 @@ + + + com.aliyun + dingtalk + 1.4.26 + + + + org.apache.seatunnel + connector-http-base + 2.1.3-SNAPSHOT + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkConstant.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkConstant.java new file mode 100644 index 00000000000..da85420c261 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkConstant.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common; + +/** + * @description: DingTalk contant + **/ + +public class DingTalkConstant { + public static final String APP_KEY = "app_key"; + public static final String APP_SECRET = "app_secret"; + public static final String ACCESS_TOKEN = "access_token"; + public static final String API_CLIENT = "api_client"; + public static final String DEFAULT_FORMAT = "json"; + public static final String SCHEMA = "schema"; + public static final String STATUS_OK = "ok"; + public static final String BODY_RESULT = "result"; +} diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkParameter.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkParameter.java new file mode 100644 index 00000000000..fb7cf2964f0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkParameter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common; + +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @description: Ding Talk Parameter + **/ + +@Data +public class DingTalkParameter implements Serializable { + + private String appKey; + private String appSecret; + private String accessToken; + private String apiClient; + private Map params; + + public void buildParameter(Config pluginConfig, Boolean hasToken) { + if (!hasToken) { + // DingTalk app key + this.setAppKey(pluginConfig.getString(DingTalkConstant.APP_KEY)); + // DingTalk app secret + this.setAppSecret(pluginConfig.getString(DingTalkConstant.APP_SECRET)); + } else { + // DingTalk app token + this.setAccessToken(pluginConfig.getString(DingTalkConstant.ACCESS_TOKEN)); + } + // DingTalk api client + this.setApiClient(pluginConfig.getString(DingTalkConstant.API_CLIENT)); + // set params + if (pluginConfig.hasPath(HttpConfig.PARAMS)) { + this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2))); + } + } +} diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkUtil.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkUtil.java new file mode 100644 index 00000000000..73a89fc3ec4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common; + +import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponse; + +/** + * @description: Ding Talk util + **/ + +public class DingTalkUtil { + + /** + * @return Client + */ + public static com.aliyun.dingtalkoauth2_1_0.Client createClient() throws Exception { + com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config(); + config.protocol = "https"; + config.regionId = "central"; + return new com.aliyun.dingtalkoauth2_1_0.Client(config); + } + + public static String getAppToken(String appKey, String appSecret) { + String appToken = null; + try { + com.aliyun.dingtalkoauth2_1_0.Client client = DingTalkUtil.createClient(); + com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest getAccessTokenRequest = new com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest() + .setAppKey(appKey).setAppSecret(appSecret); + GetAccessTokenResponse res = client.getAccessToken(getAccessTokenRequest); + appToken = res.getBody().getAccessToken(); + } catch (Exception e) { + e.printStackTrace(); + } + return appToken; + } +} diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSource.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSource.java new file mode 100644 index 00000000000..3e650a630dc --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSource.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.source; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant; +import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter; +import org.apache.seatunnel.connectors.seatunnel.common.DingTalkUtil; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSource.class) +public class DingTalkSource extends AbstractSingleSplitSource { + + protected final DingTalkParameter dtParameter = new DingTalkParameter(); + protected SeaTunnelRowType rowType; + protected JobContext jobContext; + protected DeserializationSchema deserializationSchema; + + @Override + public String getPluginName() { + return "DingTalk"; + } + + @Override + public Boundedness getBoundedness() { + return JobMode.BATCH.equals(jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult hasClient = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.API_CLIENT); + if (!hasClient.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasClient.getMsg()); + } + CheckResult hasToken = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.ACCESS_TOKEN); + if (!hasToken.isSuccess()) { + CheckResult hasKey = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.APP_KEY, DingTalkConstant.APP_SECRET); + if (!hasKey.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasKey.getMsg()); + } + String appToken = DingTalkUtil.getAppToken(pluginConfig.getString(DingTalkConstant.APP_KEY), pluginConfig.getString(DingTalkConstant.APP_SECRET)); + if (null == appToken) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Get App Token Error!"); + } + this.dtParameter.setAccessToken(appToken); + } + this.dtParameter.buildParameter(pluginConfig, hasToken.isSuccess()); + + if (pluginConfig.hasPath(DingTalkConstant.SCHEMA)) { + Config schema = pluginConfig.getConfig(DingTalkConstant.SCHEMA); + this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); + } else { + this.rowType = SeaTunnelSchema.buildSimpleTextSchema(); + } + } + + @Override + public void setJobContext(JobContext jobContext) { + this.jobContext = jobContext; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowType; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + return new DingTalkSourceReader(this.dtParameter, readerContext, this.deserializationSchema); + } + +} diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSourceReader.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSourceReader.java new file mode 100644 index 00000000000..58f40df1a90 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSourceReader.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.source; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant; +import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; + +import com.dingtalk.api.DefaultDingTalkClient; +import com.dingtalk.api.DingTalkClient; +import com.dingtalk.api.request.OapiV2DepartmentListsubRequest; +import com.dingtalk.api.response.OapiV2DepartmentListsubResponse; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DingTalkSourceReader extends AbstractSingleSplitReader { + + private static final Logger LOGGER = LoggerFactory.getLogger(DingTalkSourceReader.class); + protected final SingleSplitReaderContext context; + protected final DingTalkParameter dtParameter; + protected DingTalkClient dtClient; + protected OapiV2DepartmentListsubRequest dtRequest; + protected final DeserializationSchema deserializationSchema; + + public DingTalkSourceReader(DingTalkParameter dtParameter, SingleSplitReaderContext context, DeserializationSchema deserializationSchema) { + this.context = context; + this.dtParameter = dtParameter; + this.deserializationSchema = deserializationSchema; + } + + @Override + public void open() { + dtClient = new DefaultDingTalkClient(dtParameter.getApiClient()); + dtRequest = new OapiV2DepartmentListsubRequest(); + LOGGER.info("Ding Talk Access Token is :" + dtParameter.getAccessToken()); + } + + @Override + public void close() throws IOException { + } + + @Override + public void pollNext(Collector output) { + try { + OapiV2DepartmentListsubResponse response = dtClient.execute(dtRequest, dtParameter.getAccessToken()); + if (DingTalkConstant.STATUS_OK.equals(response.getErrmsg())) { + String tmpContent = response.getBody(); + JsonNode bodyJson = JsonUtils.stringToJsonNode(tmpContent); + JsonNode resJson = bodyJson.get(DingTalkConstant.BODY_RESULT); + if (resJson.isArray()) { + for (JsonNode tmpJson : resJson) { + output.collect(new SeaTunnelRow(new Object[]{tmpJson.toString()})); + } + } + } + LOGGER.error("Ding Talk client execute exception, response status code:[{}], content:[{}]", response.getErrorCode(), response.getBody()); + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + context.signalNoMoreElement(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java index d72d70b2a79..55a356168db 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java @@ -31,6 +31,10 @@ public class SeaTunnelDingTalkApiExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { + testSourceDingTalk(); + } + + public static void testSinkDingTalk() throws FileNotFoundException, URISyntaxException, CommandException { String configFile = getTestConfigFile("/examples/fake_to_dingtalk.conf"); FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); flinkCommandArgs.setConfigFile(configFile); @@ -41,6 +45,17 @@ public static void main(String[] args) throws FileNotFoundException, URISyntaxEx Seatunnel.run(flinkCommand); } + public static void testSourceDingTalk() throws FileNotFoundException, URISyntaxException, CommandException { + String configFile = getTestConfigFile("/examples/dingtalk_source_to_console.conf"); + FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); + flinkCommandArgs.setConfigFile(configFile); + flinkCommandArgs.setCheckConfig(false); + flinkCommandArgs.setVariables(null); + Command flinkCommand = + new FlinkCommandBuilder().buildCommand(flinkCommandArgs); + Seatunnel.run(flinkCommand); + } + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { URL resource = SeaTunnelDingTalkApiExample.class.getResource(configFile); if (resource == null) { @@ -49,3 +64,4 @@ public static String getTestConfigFile(String configFile) throws FileNotFoundExc return Paths.get(resource.toURI()).toString(); } } + diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/dingtalk_source_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/dingtalk_source_to_console.conf new file mode 100644 index 00000000000..4b4338bb16d --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/dingtalk_source_to_console.conf @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism=1 + #job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + DingTalk { + api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub" + access_token="07c0cb8af8cc3c7f9927fbd3f53b886e" +} +# If you would like to get more information about how to configure seatunnel and see full list of source plugins, +# please go to https://seatunnel.apache.org/docs/category/source-v2 +} +transform { + # json { + # sql = "select name,age from fake" + # } + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform +} +sink { + Console {} + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +}