Skip to content

Commit

Permalink
cherry-pick Use AdHoc datasource client in sqlTask apache#14631
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong committed Jul 28, 2023
1 parent 64b9200 commit 50a6973
Show file tree
Hide file tree
Showing 77 changed files with 1,432 additions and 1,030 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ jobs:
path: /tmp
- name: Load Docker Images
run: |
docker load -i /tmp/standalone-image.tar
docker load -i /tmp/standalone-image.tar
- name: Run Test
run: |
./mvnw -B -f dolphinscheduler-e2e/pom.xml -am \
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/en/contribute/backend/spi/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ If you are using MySQL or ORACLE data source, you need to place the correspondin

org.apache.dolphinscheduler.spi.datasource.DataSourceChannel
org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
org.apache.dolphinscheduler.spi.datasource.client.DataSourceClient

1. In the first step, the data source plug-in can implement the above interfaces and inherit the general client. For details, refer to the implementation of data source plug-ins such as sqlserver and mysql. The addition methods of all RDBMS plug-ins are the same.

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/zh/contribute/backend/spi/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

org.apache.dolphinscheduler.spi.datasource.DataSourceChannel
org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
org.apache.dolphinscheduler.spi.datasource.client.DataSourceClient

1. 第一步数据源插件实现以上接口和继承通用client即可,具体可以参考sqlserver、mysql等数据源插件实现,所有RDBMS插件的添加方式都是一样的。
2. 在数据源插件pom.xml添加驱动配置
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public Result<Object> verifyDataSourceName(String name) {
@Override
public Result<Object> checkConnection(DbType type, ConnectionParam connectionParam) {
Result<Object> result = new Result<>();
try (Connection connection = DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) {
try (Connection connection = DataSourceClientProvider.getAdHocConnection(type, connectionParam)) {
if (connection == null) {
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

import org.slf4j.Logger;
Expand All @@ -45,6 +46,14 @@ public static ExecutorService newDaemonFixedThreadExecutor(String threadName, in
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}

public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadName)
.setDaemon(true)
.build();
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}

/**
* Sleep in given mills, this is not accuracy.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ ALTER TABLE t_ds_fav RENAME t_ds_fav_task;
END IF;
END;
d//

delimiter ;
CALL ut_dolphin_T_t_ds_fav;
DROP PROCEDURE ut_dolphin_T_t_ds_fav;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));


-- add unique key to t_ds_process_definition_log
drop PROCEDURE if EXISTS add_t_ds_process_definition_log_uk_uniq_idx_code_version;
delimiter d//
Expand All @@ -30,9 +29,7 @@ BEGIN
ALTER TABLE t_ds_process_definition_log ADD UNIQUE KEY uniq_idx_code_version(`code`,`version`);
END IF;
END;

d//

delimiter ;
CALL add_t_ds_process_definition_log_uk_uniq_idx_code_version;
DROP PROCEDURE add_t_ds_process_definition_log_uk_uniq_idx_code_version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.datasource.api.client;

import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;

import java.sql.Connection;
import java.sql.SQLException;

public abstract class BaseAdHocDataSourceClient implements AdHocDataSourceClient {

private final BaseConnectionParam baseConnectionParam;
private final DbType dbType;

protected BaseAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
this.baseConnectionParam = baseConnectionParam;
this.dbType = dbType;
}

@Override
public Connection getConnection() throws SQLException {
try {
return DataSourceProcessorProvider.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
} catch (Exception e) {
throw new SQLException("Create adhoc connection error", e);
}
}

@Override
public void close() {
// do nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.datasource.api.client;

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;

import org.apache.commons.collections4.MapUtils;

import java.sql.Connection;
import java.sql.SQLException;

import lombok.extern.slf4j.Slf4j;

import com.zaxxer.hikari.HikariDataSource;

@Slf4j
public abstract class BasePooledDataSourceClient implements PooledDataSourceClient {

protected final BaseConnectionParam baseConnectionParam;
protected HikariDataSource dataSource;

public BasePooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {

this.baseConnectionParam = checkNotNull(baseConnectionParam, "baseConnectionParam is null");
this.dataSource = createDataSourcePool(baseConnectionParam, checkNotNull(dbType, "dbType is null"));
}

// todo: support multiple version databases
@Override
public HikariDataSource createDataSourcePool(BaseConnectionParam baseConnectionParam, DbType dbType) {

HikariDataSource dataSource = new HikariDataSource();

dataSource.setDriverClassName(baseConnectionParam.getDriverClassName());
dataSource.setJdbcUrl(DataSourceUtils.getJdbcUrl(dbType, baseConnectionParam));
dataSource.setUsername(baseConnectionParam.getUser());
dataSource.setPassword(PasswordUtils.decodePassword(baseConnectionParam.getPassword()));

dataSource.setMinimumIdle(PropertyUtils.getInt(DataSourceConstants.SPRING_DATASOURCE_MIN_IDLE, 5));
dataSource.setMaximumPoolSize(PropertyUtils.getInt(DataSourceConstants.SPRING_DATASOURCE_MAX_ACTIVE, 50));
dataSource.setConnectionTestQuery(baseConnectionParam.getValidationQuery());

if (MapUtils.isNotEmpty(baseConnectionParam.getOther())) {
baseConnectionParam.getOther().forEach(dataSource::addDataSourceProperty);
}

log.info("Creating HikariDataSource for {} success.", dbType.name());
return dataSource;
}

@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public void close() {
log.info("do close dataSource {}.", baseConnectionParam.getDatabase());
try (HikariDataSource closedDatasource = dataSource) {
// only close the resource
}
}

}

This file was deleted.

Loading

0 comments on commit 50a6973

Please sign in to comment.