Skip to content

Commit

Permalink
First commit: pushing code for Apache Storm and IBM InfoSphere Streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zubair Nabi committed Jul 28, 2014
1 parent c099626 commit b449e60
Show file tree
Hide file tree
Showing 107 changed files with 6,439 additions and 0 deletions.
Binary file added StormEmailBenchmark/LICENSE.md
Binary file not shown.
44 changes: 44 additions & 0 deletions StormEmailBenchmark/conf/storm.email.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#Parallelism of the various bolts and spouts
deserializebolt=1
modifybolt=1
filterbolt=1
metricsbolt=1
serializebolt=1
writeoutputbolt=1
readinputspout=1
globalmetricsbolt=1
#The flush interval of the metrics and global metrics bolts
flushinterval=4
#Topology processes
numprocesses=1
#Sleep in ms for each tuple emitted by the spout
spoutsleep=0
#Debug mode, emits more log statements
debug=false
#Logs folder path
logspath=
#Path to input file
filepath=
#Name of the input file
filename=
#Extension of the input file
fileext=
#Path to output file
outputpath=/dev/null
#Log each time a new record is read/written?
logemailreadwrite=false
#Log each time a value is filtered by the filter bolt?
logfilter=false
#Storm version, options: 0.8.2 or 0.9.0.1
stormversion=0.8.2
#Give each spout its own input file?
specificfile=true
#Total number of emails to process; required by global metrics bolt (see README)
totalemails=108960
#25% dataset
#totalemails=129356
#25% dataset, trivial*
#totalemails=429760
#100% dataset
#totalemails=517424
#100% dataset, trivial*
143 changes: 143 additions & 0 deletions StormEmailBenchmark/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
<!--
*******************************************************************************
* Copyright (C)2014, International Business Machines Corporation and *
* others. All Rights Reserved. *
*******************************************************************************
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.ibm.storm.email.benchmark</groupId>
<artifactId>storm-email-benchmark</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>storm-email-benchmark</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>0.8.2</storm.version>
<junit.version>3.8.1</junit.version>
<avro.version>1.7.4</avro.version>
<commons.version>3.2.1</commons.version>
<log4j.version>1.2.16</log4j.version>
<logback.version>1.0.6</logback.version>
<mail.version>1.4.7</mail.version>
<lang.commons.version>3.3.2</lang.commons.version>
</properties>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
<repository>
<id>repository.jboss.org-public</id>
<name>JBoss repository</name>
<url>https://repository.jboss.org/nexus/content/groups/public</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>${mail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${lang.commons.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main</sourceDirectory>
<testSourceDirectory>src/test</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
22 changes: 22 additions & 0 deletions StormEmailBenchmark/src/avro/email.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// *******************************************************************************
// * Copyright (C)2014, International Business Machines Corporation and *
// * others. All Rights Reserved. *
// *******************************************************************************
//
{ "namespace": "com.ibm.streamsx.storm.email.benchmark.avro",
"type": "record",
"name": "Email",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "From", "type": "string" },
{ "name": "Date", "type": "string" },
{ "name": "Subject", "type": ["string", "null"] },
{ "name": "ToList", "type": "string" },
{ "name": "CcList", "type": ["string", "null"] },
{ "name": "BccList", "type": ["string", "null"] },
{ "name": "Body", "type": ["string", "null"] },
{ "name": "CharCount", "type": ["int", "null"] },
{ "name": "Wordcount", "type": ["int", "null"] },
{ "name": "ParaCount", "type": ["int", "null"] }
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// *******************************************************************************
// * Copyright (C)2014, International Business Machines Corporation and *
// * others. All Rights Reserved. *
// *******************************************************************************
//
package com.ibm.streamsx.storm.email.benchmark;

import com.ibm.streamsx.storm.email.benchmark.bolts.AvroDeserializeBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.AvroSerializeBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.BytesGlobalMetricsBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.UnitFilterBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.UnitMetricsBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.UnitModifyBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.WriteOutputCompressBolt;
import com.ibm.streamsx.storm.email.benchmark.spouts.ReadEmailsDecompressSpout;
import com.ibm.streamsx.storm.email.benchmark.utils.ConfigProps;
import com.ibm.streamsx.storm.email.benchmark.utils.Constants;

import backtype.storm.topology.TopologyBuilder;


public class BareboneTopology {

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(Constants.SPOUT_INPUT, new ReadEmailsDecompressSpout(),
ConfigProps.getIntProperty(Constants.SPOUT_INPUT));

builder.setBolt(Constants.BOLT_DESERIALIZE, new AvroDeserializeBolt(),
ConfigProps.getIntProperty(Constants.BOLT_DESERIALIZE))
.shuffleGrouping(Constants.SPOUT_INPUT, Constants.STREAM_PATH);

builder.setBolt(Constants.BOLT_FILTER, new UnitFilterBolt(),
ConfigProps.getIntProperty(Constants.BOLT_FILTER))
.shuffleGrouping(Constants.BOLT_DESERIALIZE);

builder.setBolt(Constants.BOLT_MODIFY, new UnitModifyBolt(),
ConfigProps.getIntProperty(Constants.BOLT_MODIFY))
.shuffleGrouping(Constants.BOLT_FILTER);

builder.setBolt(Constants.BOLT_METRICS, new UnitMetricsBolt(),
ConfigProps.getIntProperty(Constants.BOLT_METRICS))
.shuffleGrouping(Constants.BOLT_MODIFY);

builder.setBolt(Constants.BOLT_METRICS_GLOBAL, new BytesGlobalMetricsBolt(
ConfigProps.getIntProperty(Constants.BOLT_METRICS),
System.currentTimeMillis()),
ConfigProps.getIntProperty(Constants.BOLT_METRICS_GLOBAL))
.globalGrouping(Constants.BOLT_METRICS, Constants.STREAM_FINALOFFPATH);

builder.setBolt(Constants.BOLT_SERIALIZE, new AvroSerializeBolt(),
ConfigProps.getIntProperty(Constants.BOLT_SERIALIZE))
.shuffleGrouping(Constants.BOLT_METRICS, Constants.STREAM_PATH);

builder.setBolt(Constants.BOLT_OUTPUT, new WriteOutputCompressBolt(),
ConfigProps.getIntProperty(Constants.BOLT_OUTPUT))
.shuffleGrouping(Constants.BOLT_SERIALIZE);

ConfigProps.configureTopologyLocal(builder, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// *******************************************************************************
// * Copyright (C)2014, International Business Machines Corporation and *
// * others. All Rights Reserved. *
// *******************************************************************************
//
package com.ibm.streamsx.storm.email.benchmark;

import com.ibm.streamsx.storm.email.benchmark.bolts.AvroDeserializeBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.AvroSerializeBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.GlobalMetricsBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.ModifyBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.NewFilterBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.NewMetricsBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.WriteOutputCompressBolt;
import com.ibm.streamsx.storm.email.benchmark.spouts.ReadEmailsDecompressSpout;
import com.ibm.streamsx.storm.email.benchmark.utils.ConfigProps;
import com.ibm.streamsx.storm.email.benchmark.utils.Constants;

import backtype.storm.topology.TopologyBuilder;


public class EnronTopology {


public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(Constants.SPOUT_INPUT, new ReadEmailsDecompressSpout(),
ConfigProps.getIntProperty(Constants.SPOUT_INPUT));

builder.setBolt(Constants.BOLT_DESERIALIZE, new AvroDeserializeBolt(),
ConfigProps.getIntProperty(Constants.BOLT_DESERIALIZE))
.shuffleGrouping(Constants.SPOUT_INPUT, Constants.STREAM_PATH);

builder.setBolt(Constants.BOLT_FILTER, new NewFilterBolt(),
ConfigProps.getIntProperty(Constants.BOLT_FILTER))
.shuffleGrouping(Constants.BOLT_DESERIALIZE);

builder.setBolt(Constants.BOLT_MODIFY, new ModifyBolt(),
ConfigProps.getIntProperty(Constants.BOLT_MODIFY))
.shuffleGrouping(Constants.BOLT_FILTER);

builder.setBolt(Constants.BOLT_METRICS, new NewMetricsBolt(),
ConfigProps.getIntProperty(Constants.BOLT_METRICS))
.shuffleGrouping(Constants.BOLT_MODIFY);

builder.setBolt(Constants.BOLT_METRICS_GLOBAL, new GlobalMetricsBolt(
ConfigProps.getIntProperty(Constants.BOLT_METRICS),
System.currentTimeMillis()),
ConfigProps.getIntProperty(Constants.BOLT_METRICS_GLOBAL))
.globalGrouping(Constants.BOLT_METRICS, Constants.STREAM_OFFPATH)
.globalGrouping(Constants.BOLT_METRICS, Constants.STREAM_FINALOFFPATH);

builder.setBolt(Constants.BOLT_SERIALIZE, new AvroSerializeBolt(),
ConfigProps.getIntProperty(Constants.BOLT_SERIALIZE))
.shuffleGrouping(Constants.BOLT_METRICS, Constants.STREAM_PATH);

builder.setBolt(Constants.BOLT_OUTPUT, new WriteOutputCompressBolt(),
ConfigProps.getIntProperty(Constants.BOLT_OUTPUT))
.shuffleGrouping(Constants.BOLT_SERIALIZE);

ConfigProps.configureTopologyLocal(builder, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// *******************************************************************************
// * Copyright (C)2014, International Business Machines Corporation and *
// * others. All Rights Reserved. *
// *******************************************************************************
//
package com.ibm.streamsx.storm.email.benchmark;

import com.ibm.streamsx.storm.email.benchmark.bolts.BytesGlobalMetricsBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.BytesSinkBolt;
import com.ibm.streamsx.storm.email.benchmark.bolts.BytesUnitMetricsBolt;
import com.ibm.streamsx.storm.email.benchmark.spouts.ReadEmailsUncompressedSpout;
import com.ibm.streamsx.storm.email.benchmark.utils.ConfigProps;
import com.ibm.streamsx.storm.email.benchmark.utils.Constants;

import backtype.storm.topology.TopologyBuilder;


public class RestrictedTopology {

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(Constants.SPOUT_INPUT, new ReadEmailsUncompressedSpout(),
ConfigProps.getIntProperty(Constants.SPOUT_INPUT));

builder.setBolt(Constants.BOLT_METRICS, new BytesUnitMetricsBolt(),
ConfigProps.getIntProperty(Constants.BOLT_METRICS))
.shuffleGrouping(Constants.SPOUT_INPUT, Constants.STREAM_PATH);

builder.setBolt(Constants.BOLT_METRICS_GLOBAL, new BytesGlobalMetricsBolt(
ConfigProps.getIntProperty(Constants.BOLT_METRICS),
System.currentTimeMillis()),
ConfigProps.getIntProperty(Constants.BOLT_METRICS_GLOBAL))
.globalGrouping(Constants.BOLT_METRICS, Constants.STREAM_FINALOFFPATH);

builder.setBolt(Constants.BOLT_OUTPUT, new BytesSinkBolt(),
ConfigProps.getIntProperty(Constants.BOLT_OUTPUT))
.shuffleGrouping(Constants.BOLT_METRICS, Constants.STREAM_PATH);

ConfigProps.configureTopologyLocal(builder, args);
}
}
Loading

0 comments on commit b449e60

Please sign in to comment.