Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
05e7fac
Add a link to AssignableMetric.java.
mnogu Mar 23, 2015
ef25356
Put emitted and transferred stats under correct columns
Mar 26, 2015
6d3ee24
Include Executors (Window Hint) if the component is of Bolt type
Mar 26, 2015
a1d7b3e
While sending tuple, check task->node+port with read lock
HeartSaVioR Apr 13, 2015
14477f4
Log "task is null" instead of let worker died
HeartSaVioR Apr 17, 2015
700c8ed
Also leave tuple type to log when task is null
HeartSaVioR Apr 22, 2015
6f62a4e
fix log presentation: (str nil) is empty string, (pr-str nil) is "nil"
HeartSaVioR Apr 25, 2015
85af195
Change TransferDrainer to re-group msg by destination when sending
HeartSaVioR May 13, 2015
52bd47b
Merge branch 'master' into STORM-737
HeartSaVioR May 13, 2015
85c5096
Apply addressed comments from review
HeartSaVioR May 20, 2015
f3d5aca
STORM-835 Netty Client hold batch object until io operation complete
May 25, 2015
dc8dea6
Merge branch 'STORM-790' of github.com:HeartSaVioR/storm
ptgoetz May 28, 2015
59895a7
add STORM-790 to changelog
ptgoetz May 28, 2015
dc3f28f
Merge branch 'storm-netty-client-improve' of https://github.com/Haile…
May 28, 2015
93544b2
Added STORM-835 to Changelog and README
May 28, 2015
d686079
Merge branch 'storm-728-ui-columns' of https://github.com/d2r/storm i…
May 28, 2015
34a0c09
Added STORM-728 to Changelog
May 28, 2015
d189d99
Merge branch 'storm-729-bolt-comp-window' of https://github.com/d2r/s…
May 28, 2015
431468b
Added STORM-729 to Changelog
May 28, 2015
a257f54
Merge branch 'add-link-to-AssignableMetric' of https://github.com/mno…
May 28, 2015
3f50b72
Added STORM-715 to Changelog.
May 28, 2015
4073dbe
Merge branch 'STORM-737' of github.com:HeartSaVioR/storm
ptgoetz May 29, 2015
13c33f3
add STORM-737 to changelog
ptgoetz May 29, 2015
05d1f8b
Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-sto…
ptgoetz May 29, 2015
0c2b3a4
ass STORM-821 to changelog
ptgoetz May 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
## 0.11.0
* STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.
* STORM-737: Check task->node+port with read lock to prevent sending to closed connection
* STORM-715: Add a link to AssignableMetric.java in Metrics.md
* STORM-729: Include Executors (Window Hint) if the component is of Bolt type
* STORM-835: Netty Client hold batch object until io operation complete
* STORM-790: Log "task is null" instead of let worker died when task is null in transfer-fn
* STORM-596: remove config topology.receiver.buffer.size
* STORM-808: allow null to be parsed as null
* STORM-816: maven-gpg-plugin does not work with gpg 2.1
Expand Down Expand Up @@ -53,6 +59,7 @@
* STORM-749: Remove CSRF check from the REST API.

## 0.10.0
* STORM-728: Put emitted and transferred stats under correct columns
* STORM-752: [storm-redis] Clarify Redis*StateUpdater's expire is optional
* STORM-681: Auto insert license header with genthrift.sh
* STORM-707: Client (Netty): improve logging to help troubleshooting connection woes
Expand Down
1 change: 1 addition & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ under the License.
* Mark Davis ([@markdav](https://github.com/markdav))
* Zhuo Liu ([@zhuoliu](https://github.com/zhuoliu))
* jangie ([@jangie](https://github.com/jangie))
* Hailei Zhang ([@Hailei](https://github.com/Hailei))

## Acknowledgements

Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/Metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Metrics have to implement just one method, `getValueAndReset` -- do any remainin

Storm gives you these metric types:

* [AssignableMetric]() -- set the metric to the explicit value you supply. Useful if it's an external value or in the case that you are already calculating the summary statistic yourself.
* [AssignableMetric](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/AssignableMetric.java) -- set the metric to the explicit value you supply. Useful if it's an external value or in the case that you are already calculating the summary statistic yourself.
* [CombinedMetric](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/CombinedMetric.java) -- generic interface for metrics that can be updated associatively.
* [CountMetric](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java) -- a running total of the supplied values. Call `incr()` to increment by one, `incrBy(n)` to add/subtract the given number.
- [MultiCountMetric](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java) -- a hashmap of count metrics.
Expand Down
72 changes: 54 additions & 18 deletions external/storm-jdbc/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
#Storm JDBC
Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples
in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
in a storm topology.

## Inserting into a database.
The bolt and trident state included in this package for inserting data into a database tables are tied to a single table.

### ConnectionProvider
An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionPrvoider`

```java
public interface ConnectionPrvoider extends Serializable {
/**
* method must be idempotent.
*/
void prepare();

/**
*
* @return a DB connection over which the queries can be executed.
*/
Connection getConnection();

/**
* called once when the system is shutting down, should be idempotent.
*/
void cleanup();
}
```

Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP.

###JdbcMapper
The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:

```java
Expand All @@ -17,25 +44,34 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re
**The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.**
For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item
of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse
the supplied queries to try and resolve place holder by column names.
the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector
to be used by some non-standard sql frameworks like Pheonix which only supports upsert into.

### JdbcInsertBolt
To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the
hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply
To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation
and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply
a table name using `withTableName` method or an insert query using `withInsertQuery`.
If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query.
You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take.
The default is set to value of topology.message.timeout.secs and a value of -1 will indicate not to set any query timeout.
You should set the query timeout value to be <= topology.message.timeout.secs.

```java
Config config = new Config();
config.put("jdbc.conf", hikariConfigMap);
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);

String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("user")
.withQueryTimeoutSecs(30);
Or
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withInsertQuery("insert into user values (?,?)")
.withQueryTimeoutSecs(30);
```
Expand All @@ -45,7 +81,7 @@ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMa
tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in
the database table that you intend to write to.

To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a connectionProvider instance.

The following code creates a `SimpleJdbcMapper` instance that:

Expand All @@ -60,8 +96,9 @@ hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDa
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
```
The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn`
method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them.
Expand Down Expand Up @@ -89,12 +126,12 @@ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
```
### JdbcTridentState
We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
hikari configuration map. See the example below:
state you need to initialize it with the table name or an insert query, the JdbcMapper instance and connection provider instance.
See the example below:

```java
JdbcState.Options options = new JdbcState.Options()
.withConfigKey("jdbc.conf")
.withConnectionProvider(connectionProvider)
.withMapper(jdbcMapper)
.withTableName("user_details")
.withQueryTimeoutSecs(30);
Expand Down Expand Up @@ -151,15 +188,14 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
```

### JdbcLookupBolt
To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the
hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
To use the `JdbcLookupBolt`, construct an instance of it using a `ConnectionProvider` instance, `JdbcLookupMapper` instance and the select query to execute.
You can optionally specify a query timeout seconds param that specifies max seconds the select query can take.
The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.

```java
String selectSql = "select user_name from user_details where user_id = ?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper)
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
.withQueryTimeoutSecs(30);
```

Expand All @@ -168,7 +204,7 @@ We also support a trident query state that can be used with trident topologies.

```java
JdbcState.Options options = new JdbcState.Options()
.withConfigKey("jdbc.conf")
.withConnectionProvider(connectionProvider)
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
.withSelectQuery("select user_name from user_details where user_id = ?");
.withQueryTimeoutSecs(30);
Expand Down Expand Up @@ -210,7 +246,7 @@ To make it work with Mysql, you can add the following to the pom.xml
```

You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute
mvn clean compile assembly:single.
`mvn clean compile assembly:single`

```
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import org.apache.commons.lang.Validate;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,22 +36,27 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
protected transient JdbcClient jdbcClient;
protected String configKey;
protected Integer queryTimeoutSecs;
protected ConnectionProvider connectionProvider;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;

Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
connectionProvider.prepare();

if(queryTimeoutSecs == null) {
queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
}

this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
}

public AbstractJdbcBolt(String configKey) {
this.configKey = configKey;
public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}

@Override
public void cleanup() {
connectionProvider.cleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import backtype.storm.tuple.Tuple;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,8 +43,8 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
private String insertQuery;
private JdbcMapper jdbcMapper;

public JdbcInsertBolt(String configKey, JdbcMapper jdbcMapper) {
super(configKey);
public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) {
super(connectionProvider);
this.jdbcMapper = jdbcMapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,8 +38,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {

private JdbcLookupMapper jdbcLookupMapper;

public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
super(configKey);
public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
super(connectionProvider);
this.selectQuery = selectQuery;
this.jdbcLookupMapper = jdbcLookupMapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,18 @@ public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Column)) return false;

Column column = (Column) o;
Column<?> column = (Column<?>) o;

if (sqlType != column.sqlType) return false;
if (!columnName.equals(column.columnName)) return false;
if (!val.equals(column.val)) return false;
return val != null ? val.equals(column.val) : column.val == null;

return true;
}

@Override
public int hashCode() {
int result = columnName.hashCode();
result = 31 * result + val.hashCode();
result = 31 * result + (val != null ? val.hashCode() : 0);
result = 31 * result + sqlType;
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.storm.jdbc.common;

import java.io.Serializable;
import java.sql.Connection;
import java.util.Map;

/**
* Provides a database connection.
*/
public interface ConnectionProvider extends Serializable {
/**
* method must be idempotent.
*/
void prepare();

/**
*
* @return a DB connection over which the queries can be executed.
*/
Connection getConnection();

/**
* called once when the system is shutting down, should be idempotent.
*/
void cleanup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.storm.jdbc.common;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

public class HikariCPConnectionProvider implements ConnectionProvider {

private Map<String, Object> configMap;
private transient HikariDataSource dataSource;

public HikariCPConnectionProvider(Map<String, Object> hikariCPConfigMap) {
this.configMap = hikariCPConfigMap;
}

@Override
public synchronized void prepare() {
if(dataSource == null) {
Properties properties = new Properties();
properties.putAll(configMap);
HikariConfig config = new HikariConfig(properties);
this.dataSource = new HikariDataSource(config);
this.dataSource.setAutoCommit(false);
}
}

@Override
public Connection getConnection() {
try {
return this.dataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
public void cleanup() {
if(dataSource != null) {
dataSource.shutdown();
}
}
}
Loading