Skip to content

Commit

Permalink
[Feature][Connector2] Add DingTalk Source apache#2684
Browse files Browse the repository at this point in the history
  • Loading branch information
MRYOG committed Sep 16, 2022
1 parent 90ce385 commit 8cda41d
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 0 deletions.
71 changes: 71 additions & 0 deletions docs/en/connector-v2/source/dingtalk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# DingTalk

> DinkTalk source connector
## Description

A source plugin which use DingTalk

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------| ---------- | -------- | ------------- |
| api_client | string | yes | - |
| access_token | string | yes | - |
| app_key | string | yes | - |
| app_secret | string | yes | - |


### url [string]

[DingTalk API](https://open.dingtalk.com/document/orgapp-server/api-overview) address like : https://oapi.dingtalk.com/topapi/v2/department/listsub(string)

### access_token [string]

DingTalk access_token [DingTalk Doc](https://open.dingtalk.com/document/orgapp-server/obtain-the-access_token-of-an-internal-app)
The valid period of access token is 7200 seconds , if access token expired , can use app_key and app_secret get new message (string)
access_token is the source voucher , can direct use of access_token or use app_key and app_secret get it

### app_key [string]

DingTalk app key (string)

### app_secret [string]

DingTalk app secret (string)

## Example

```hocon
source {
DingTalk {
api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub"
access_token="8c61c395035c37c7812b9b1b1dbecb20"
}
}
or
source {
DingTalk {
api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub"
app_key="dingpsi2nsmw2v"
app_secret="qv5pLes-M9JvHXjaBqkPFdAyk9WKEjDjEZOpLZKHi"
}
}
result :
row=1 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101254282,"ext":"{\"faceCount\":\"1\"}","name":"技术部","parent_id":1}
row=2 : {"auto_add_user":true,"create_dept_group":true,"dept_id":101279294,"name":"财务部","parent_id":1}
row=3 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101316242,"ext":"{\"faceCount\":\"2\"}","name":"渠道部","parent_id":1}
row=4 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101340237,"name":"运营部","parent_id":1}
row=5 : {"auto_add_user":false,"create_dept_group":false,"dept_id":101467231,"ext":"{\"faceCount\":\"1\"}","name":"客服部","parent_id":1}
row=6 : {"auto_add_user":true,"create_dept_group":true,"dept_id":101532253,"name":"人力部","parent_id":1}
row=7 : {"auto_add_user":true,"create_dept_group":true,"dept_id":101532254,"name":"直销部","parent_id":1}
```
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ seatunnel.sink.OssFile = connector-file-oss
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.source.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
Expand Down
13 changes: 13 additions & 0 deletions seatunnel-connectors-v2/connector-dingtalk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dingtalk</artifactId>
<version>1.4.26</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-http-base</artifactId>
<version>2.1.3-SNAPSHOT</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common;

/**
* @description: DingTalk contant
**/

public class DingTalkConstant {
public static final String APP_KEY = "app_key";
public static final String APP_SECRET = "app_secret";
public static final String ACCESS_TOKEN = "access_token";
public static final String API_CLIENT = "api_client";
public static final String DEFAULT_FORMAT = "json";
public static final String SCHEMA = "schema";
public static final String STATUS_OK = "ok";
public static final String BODY_RESULT = "result";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common;

import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Data;

import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @description: Ding Talk Parameter
**/

@Data
public class DingTalkParameter implements Serializable {

private String appKey;
private String appSecret;
private String accessToken;
private String apiClient;
private Map<String, String> params;

public void buildParameter(Config pluginConfig, Boolean hasToken) {
if (!hasToken) {
// DingTalk app key
this.setAppKey(pluginConfig.getString(DingTalkConstant.APP_KEY));
// DingTalk app secret
this.setAppSecret(pluginConfig.getString(DingTalkConstant.APP_SECRET));
} else {
// DingTalk app token
this.setAccessToken(pluginConfig.getString(DingTalkConstant.ACCESS_TOKEN));
}
// DingTalk api client
this.setApiClient(pluginConfig.getString(DingTalkConstant.API_CLIENT));
// set params
if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common;

import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponse;

/**
* @description: Ding Talk util
**/

public class DingTalkUtil {

/**
* @return Client
*/
public static com.aliyun.dingtalkoauth2_1_0.Client createClient() throws Exception {
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
config.protocol = "https";
config.regionId = "central";
return new com.aliyun.dingtalkoauth2_1_0.Client(config);
}

public static String getAppToken(String appKey, String appSecret) {
String appToken = null;
try {
com.aliyun.dingtalkoauth2_1_0.Client client = DingTalkUtil.createClient();
com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest getAccessTokenRequest = new com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest()
.setAppKey(appKey).setAppSecret(appSecret);
GetAccessTokenResponse res = client.getAccessToken(getAccessTokenRequest);
appToken = res.getBody().getAccessToken();
} catch (Exception e) {
e.printStackTrace();
}
return appToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.source;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkUtil;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSource.class)
public class DingTalkSource extends AbstractSingleSplitSource<SeaTunnelRow> {

protected final DingTalkParameter dtParameter = new DingTalkParameter();
protected SeaTunnelRowType rowType;
protected JobContext jobContext;
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;

@Override
public String getPluginName() {
return "DingTalk";
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult hasClient = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.API_CLIENT);
if (!hasClient.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasClient.getMsg());
}
CheckResult hasToken = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.ACCESS_TOKEN);
if (!hasToken.isSuccess()) {
CheckResult hasKey = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.APP_KEY, DingTalkConstant.APP_SECRET);
if (!hasKey.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasKey.getMsg());
}
String appToken = DingTalkUtil.getAppToken(pluginConfig.getString(DingTalkConstant.APP_KEY), pluginConfig.getString(DingTalkConstant.APP_SECRET));
if (null == appToken) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Get App Token Error!");
}
this.dtParameter.setAccessToken(appToken);
}
this.dtParameter.buildParameter(pluginConfig, hasToken.isSuccess());

if (pluginConfig.hasPath(DingTalkConstant.SCHEMA)) {
Config schema = pluginConfig.getConfig(DingTalkConstant.SCHEMA);
this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
} else {
this.rowType = SeaTunnelSchema.buildSimpleTextSchema();
}
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.rowType;
}

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new DingTalkSourceReader(this.dtParameter, readerContext, this.deserializationSchema);
}

}
Loading

0 comments on commit 8cda41d

Please sign in to comment.