Skip to content

integration of a kafka spout with apache storm for the purpose of setting up a simplified lambda stack

Notifications You must be signed in to change notification settings

michallorens/storm-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

storm-kafka

Integracja z systemem kolejkowym Apache Kafka

Storm skonfigurowany jest tak, aby nasłuchiwał przychodzących krotek `TridentTuple` z zadanego wątku systemu kolejkowego - `topicName`. Krotki te tworzą nastepnie strumień przekazywany do kolejnego elementu w topologii. ```java ZkHosts zkHosts = new ZkHosts("localhost"); TridentKafkaConfig spoutConfig = new TridentKafkaConfig(zkHosts, topicName); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig); ... Stream inputStream = topology.newStream(kafkaSpoutName, spout); ```

Integracja z Cassandra

Konfiguracja połączenia z Cassandra: ```java final Config configuration = new Config(); configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_CQL_HOSTS, "localhost"); ``` Przekazanie przychodzących krotek do Cassandry: ```java UpdateMapper mapper = new UpdateMapper(keySpaceName, tableName, idColumnName, valColumnName); inputStream.partitionPersist(new CassandraCqlStateFactory(ConsistencyLevel.ONE), new Fields(idColumnName, valColumnName), new CassandraCqlStateUpdater(mapper)); inputStream.each(new Fields(idColumnName, valColumnName), new Debug()); ``` Tworzymy klasę `UpdateMapper` odpowiedzialną za mapowanie krotek na rekordy w Cassandrze i rejestrujemy ja jako `CassandraCqlStateUpdater` na strumieniu z systemu kolejkowego. `UpdateMapper` służy do wykonywania określonego przez nas zapytania CQL: ```java public Statement map(TridentTuple tuple) { SimpleStatement statement = new SimpleStatement(cqlStatement, (Object[]) fields); statement.setKeyspace(keySpace); return statement; } ```

Konfiguracja operacji na Cassandrze

Określenie odpowiednich przestrzeni, tabeli i kolumn odbywa się poprzez argumenty wywołania: ``` -cqlhost VAL : Cassandra host address (default: localhost) -kafkaspout VAL : Kafka spout name (default: kafka-spout) -keySpace VAL : Cassandra keyspace name (default: demo) -remote : Run as RMI server (default: false) -sql VAL : SQL statement/query (default: insert into demo.stormcf (word, count) values (?, ?)) -table VAL : Storm topology name (default: stormcf) -topic VAL : Kafka topic name (default: tuple) -topology VAL : Storm topology name (default: test-topology) -workers N : Number of worker threads (default: 3) -zkhost VAL : ZooKeeper host address (default: localhost) ```

Uruchamianie aplikacji

Uruchomienie systemu kolejkowego

``` bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server0.properties ``` Tworzenie nowego wątku: ``` bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic sentence-spout --partitions 3 --replication-factor 1 ``` Producent i konsument systemu kolejkowego w razie konieczności testów: ``` bin/kafka-console-producer.sh --broker-list localhost:9092 --sync --topic sentence-spout bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sentence-spout --from-beginning ```

Uruchomienie Storma na klastrze

``` bin/storm nimbus bin/storm supervisor bin/storm ui ``` Uruchomienie topologii: ``` bin/storm jar ../storm-kafka-server-1.0-SNAPSHOT-jar-with-dependencies.jar pl.edu.agh.iosr.lambda.kafkastorm.KafkaStormTopology -remote ``` Wszystkie pliki konfiguracyjne konieczne do uruchomienia Storma, Kafki i Zookeepera załączone są w katalogu `config/`.

About

integration of a kafka spout with apache storm for the purpose of setting up a simplified lambda stack

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages