diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml index 4e7c884b5f4..0e2ce1c9f31 100644 --- a/external/storm-jdbc/pom.xml +++ b/external/storm-jdbc/pom.xml @@ -1,129 +1,150 @@ - - - - 4.0.0 - - - storm - org.apache.storm - 0.11.0-SNAPSHOT - ../../pom.xml - - - storm-jdbc - - - - Parth-Brahmbhatt - Parth Brahmbhatt - brahmbhatt.parth@gmail.com - - - - - 2.2.5 - - - - - org.apache.storm - storm-core - ${project.version} - provided - - - org.apache.commons - commons-lang3 - 3.3 - - - com.google.guava - guava - 17.0 - - - com.zaxxer - HikariCP-java6 - ${hikari.version} - compile - - - junit - junit - 4.11 - test - - - org.hsqldb - hsqldb - 2.3.1 - test - - - commons-lang - commons-lang - - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.5 - - - - test-jar - - - - - - org.codehaus.mojo - sql-maven-plugin - 1.5 - - - org.hsqldb - hsqldb - 2.3.2 - - - - - create-db - process-test-resources - - execute - - - org.hsqldb.jdbcDriver - jdbc:hsqldb:mem:test;shutdown=false - SA - - true - - src/test/sql/test.sql - - - - - - - - + + + + 4.0.0 + + + com.zkteco.timecube + storm-jdbc + 0.10.0-beta1 + + + Parth-Brahmbhatt + Parth Brahmbhatt + brahmbhatt.parth@gmail.com + + + + + 2.2.5 + + + + + central + http://192.168.0.230:8080/artifactory/repo + + + third-party + third-party-releases + http://192.168.0.230:8080/artifactory/third-party + + + zkteco-internal-repository + zkteco-internal-repository-releases + http://192.168.0.230:8080/artifactory/zkteco-internal-repository + + true + + + true + always + warn + + + + + + central + http://192.168.0.230:8080/artifactory/repo + + + zkteco-internal-repository + zkteco-internal-repository-releases + http://192.168.0.230:8080/artifactory/zkteco-internal-repository + + + + + zkteco-internal-repository + zkteco-internal-repository-releases + http://192.168.0.230:8080/artifactory/zkteco-internal-repository + + + zkteco-internal-repository + zkteco-internal-repository-snapshots + http://192.168.0.230:8080/artifactory/zkteco-internal-repository + + + + + + org.apache.storm + storm-core + 0.9.3 + provided + + + org.apache.commons + commons-lang3 + 3.3 + + + com.google.guava + guava + 17.0 + + + com.zaxxer + HikariCP-java6 + 2.2.5 + compile + + + junit + junit + 4.11 + test + + + org.hsqldb + hsqldb + 2.3.1 + test + + + commons-lang + commons-lang + 2.5 + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.5 + + + + test-jar + + + + + + + + diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index 15a23452969..0fbd49a4050 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -1,62 +1,62 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jdbc.bolt; - -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichBolt; -import org.apache.storm.jdbc.common.ConnectionProvider; -import org.apache.storm.jdbc.common.JdbcClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public abstract class AbstractJdbcBolt extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class); - - protected OutputCollector collector; - - protected transient JdbcClient jdbcClient; - protected String configKey; - protected Integer queryTimeoutSecs; - protected ConnectionProvider connectionProvider; - - @Override - public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { - this.collector = collector; - - connectionProvider.prepare(); - - if(queryTimeoutSecs == null) { - queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); - } - - this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs); - } - - public AbstractJdbcBolt(ConnectionProvider connectionProvider) { - this.connectionProvider = connectionProvider; - } - - @Override - public void cleanup() { - connectionProvider.cleanup(); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.base.BaseRichBolt; +import org.apache.storm.jdbc.common.ConnectionProvider; +import org.apache.storm.jdbc.common.JdbcClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public abstract class AbstractJdbcBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class); + + protected OutputCollector collector; + + protected transient JdbcClient jdbcClient; + protected String configKey; + protected Integer queryTimeoutSecs; + protected ConnectionProvider connectionProvider; + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { + this.collector = collector; + + connectionProvider.prepare(); + + if(queryTimeoutSecs == null) { + queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); + } + + this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs); + } + + public AbstractJdbcBolt(ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; + } + + @Override + public void cleanup() { + connectionProvider.cleanup(); + } +} diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java index f11d14cde76..48a9f5bbbf5 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java @@ -1,63 +1,64 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jdbc.common; - -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.Map; -import java.util.Properties; - -public class HikariCPConnectionProvider implements ConnectionProvider { - - private Map configMap; - private transient HikariDataSource dataSource; - - public HikariCPConnectionProvider(Map hikariCPConfigMap) { - this.configMap = hikariCPConfigMap; - } - - @Override - public synchronized void prepare() { - if(dataSource == null) { - Properties properties = new Properties(); - properties.putAll(configMap); - HikariConfig config = new HikariConfig(properties); - this.dataSource = new HikariDataSource(config); - this.dataSource.setAutoCommit(false); - } - } - - @Override - public Connection getConnection() { - try { - return this.dataSource.getConnection(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Override - public void cleanup() { - if(dataSource != null) { - dataSource.shutdown(); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.common; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +public class HikariCPConnectionProvider implements ConnectionProvider { + + private Map configMap; + private transient HikariDataSource dataSource; + + public HikariCPConnectionProvider(Map hikariCPConfigMap) { + this.configMap = hikariCPConfigMap; + } + + @Override + public synchronized void prepare() { + if(dataSource == null) { + Properties properties = new Properties(); + properties.putAll(configMap); + HikariConfig config = new HikariConfig(properties); + config.setConnectionTestQuery("select 1"); + this.dataSource = new HikariDataSource(config); + this.dataSource.setAutoCommit(false); + } + } + + @Override + public Connection getConnection() { + try { + return this.dataSource.getConnection(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void cleanup() { + if(dataSource != null) { + dataSource.shutdown(); + } + } +} diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java index 228babea1aa..771af431614 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java @@ -1,235 +1,442 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jdbc.common; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.*; -import java.sql.Date; -import java.util.*; - -public class JdbcClient { - private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class); - - private ConnectionProvider connectionProvider; - private int queryTimeoutSecs; - - public JdbcClient(ConnectionProvider connectionProvider, int queryTimeoutSecs) { - this.connectionProvider = connectionProvider; - this.queryTimeoutSecs = queryTimeoutSecs; - } - - public void insert(String tableName, List> columnLists) { - String query = constructInsertQuery(tableName, columnLists); - executeInsertQuery(query, columnLists); - } - - public void executeInsertQuery(String query, List> columnLists) { - Connection connection = null; - try { - connection = connectionProvider.getConnection(); - boolean autoCommit = connection.getAutoCommit(); - if(autoCommit) { - connection.setAutoCommit(false); - } - - LOG.debug("Executing query {}", query); - - PreparedStatement preparedStatement = connection.prepareStatement(query); - if(queryTimeoutSecs > 0) { - preparedStatement.setQueryTimeout(queryTimeoutSecs); - } - - for(List columnList : columnLists) { - setPreparedStatementParams(preparedStatement, columnList); - preparedStatement.addBatch(); - } - - int[] results = preparedStatement.executeBatch(); - if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) { - connection.rollback(); - throw new RuntimeException("failed at least one sql statement in the batch, operation rolled back."); - } else { - try { - connection.commit(); - } catch (SQLException e) { - throw new RuntimeException("Failed to commit insert query " + query, e); - } - } - } catch (SQLException e) { - throw new RuntimeException("Failed to execute insert query " + query, e); - } finally { - closeConnection(connection); - } - } - - private String constructInsertQuery(String tableName, List> columnLists) { - StringBuilder sb = new StringBuilder(); - sb.append("Insert into ").append(tableName).append(" ("); - Collection columnNames = Collections2.transform(columnLists.get(0), new Function() { - @Override - public String apply(Column input) { - return input.getColumnName(); - } - }); - String columns = Joiner.on(",").join(columnNames); - sb.append(columns).append(") values ( "); - - String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size())); - sb.append(placeHolders).append(")"); - - return sb.toString(); - } - - public List> select(String sqlQuery, List queryParams) { - Connection connection = null; - try { - connection = connectionProvider.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery); - if(queryTimeoutSecs > 0) { - preparedStatement.setQueryTimeout(queryTimeoutSecs); - } - setPreparedStatementParams(preparedStatement, queryParams); - ResultSet resultSet = preparedStatement.executeQuery(); - List> rows = Lists.newArrayList(); - while(resultSet.next()){ - ResultSetMetaData metaData = resultSet.getMetaData(); - int columnCount = metaData.getColumnCount(); - List row = Lists.newArrayList(); - for(int i=1 ; i <= columnCount; i++) { - String columnLabel = metaData.getColumnLabel(i); - int columnType = metaData.getColumnType(i); - Class columnJavaType = Util.getJavaType(columnType); - if (columnJavaType.equals(String.class)) { - row.add(new Column(columnLabel, resultSet.getString(columnLabel), columnType)); - } else if (columnJavaType.equals(Integer.class)) { - row.add(new Column(columnLabel, resultSet.getInt(columnLabel), columnType)); - } else if (columnJavaType.equals(Double.class)) { - row.add(new Column(columnLabel, resultSet.getDouble(columnLabel), columnType)); - } else if (columnJavaType.equals(Float.class)) { - row.add(new Column(columnLabel, resultSet.getFloat(columnLabel), columnType)); - } else if (columnJavaType.equals(Short.class)) { - row.add(new Column(columnLabel, resultSet.getShort(columnLabel), columnType)); - } else if (columnJavaType.equals(Boolean.class)) { - row.add(new Column(columnLabel, resultSet.getBoolean(columnLabel), columnType)); - } else if (columnJavaType.equals(byte[].class)) { - row.add(new Column(columnLabel, resultSet.getBytes(columnLabel), columnType)); - } else if (columnJavaType.equals(Long.class)) { - row.add(new Column(columnLabel, resultSet.getLong(columnLabel), columnType)); - } else if (columnJavaType.equals(Date.class)) { - row.add(new Column(columnLabel, resultSet.getDate(columnLabel), columnType)); - } else if (columnJavaType.equals(Time.class)) { - row.add(new Column