Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

Latest commit

 

History

History
138 lines (102 loc) · 4.4 KB

README.md

File metadata and controls

138 lines (102 loc) · 4.4 KB

Divolte Spark

This project contains some helpers for working with data collected by the Divolte Collector using Apache Spark.

QuickStart

Python

To use the helpers from PySpark you need to build a JAR containing the helper classes and dependencies. As a prerequisite, you need to have SBT installed.

% git clone https://github.com/divolte/divolte-spark.git
% cd divolte-spark
% sbt assembly
% export DIVOLTE_SPARK_JAR="$PWD"/target/scala-*/divolte-spark-assembly-*.jar

This will result in the JAR being placed in the current directory.

You can then start the PySpark REPL:

% pyspark --jars "$DIVOLTE_SPARK_JAR" --driver-class-path "$DIVOLTE_SPARK_JAR"

To access the Divolte events that use the default schema you can use:

# Assuming 'sc' is the Spark Context…
events = sc.newAPIHadoopFile(
    "hdfs:///path/to/avro/files/*.avro",
    'org.apache.avro.mapreduce.AvroKeyInputFormat',
    'org.apache.avro.mapred.AvroKey',
    'org.apache.hadoop.io.NullWritable',
    keyConverter='io.divolte.spark.pyspark.avro.AvroWrapperToJavaConverter').map(lambda (k,v): k)
# 'events' is now an RDD containing the events in the matching Avro files.

When using spark-submit to submit jobs, the JAR needs to be passed to using both the --jars and --driver-class-path options.

Scala

If building a Spark application, you can add Divolte Spark to your dependencies by adding the following line to your SBT build:

libraryDependencies += "io.divolte" %% "divolte-spark" % "0.1"

To load Divolte events as a Spark RDD:

import io.divolte.spark.avro._
import org.apache.avro.generic.IndexedRecord
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val sc = new SparkContext()
val events = sc.newAvroFile[IndexedRecord](path)

At this point you can either elect to process the events as full Record instances, or just extract specific fields for further processing:

val records = events.toRecords
// or
val eventFields = events.fields("sessionId", "location", "timestamp")

Spark Streaming

We also provide some helpers for using Spark Streaming to process events published by Divolte via Kafka. This requires some additional dependencies:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % sparkV excludeAll(
  ExclusionRule(organization = "org.apache.spark", name = "spark-streaming_2.10"),
  ExclusionRule(organization = "javax.jms")
)

libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1" excludeAll(
  ExclusionRule(organization = "com.sun.jdmk"),
  ExclusionRule(organization = "com.sun.jmx"),
  ExclusionRule(organization = "javax.jms"),
  ExclusionRule(organization = "log4j")
)

To process Divolte events from Kafka:

// Kafka configuration.
val consumerConfig = Map(
  "group.id"                -> "some-id-for-the-consumer-group",
  "zookeeper.connect"       -> "zookeeper-connect-string",
  "auto.commit.interval.ms" -> "5000",
  "auto.offset.reset"       -> "largest"
)
val topicSettings = Map("divolte" -> Runtime.getRuntime.availableProcessors())

val sc = new SparkContext()
val ssc = new StreamingContext(sc, Seconds(15))

// Establish the source event stream.
val stream = ssc.divolteStream[GenericRecord](consumerConfig, topicSettings, StorageLevel.MEMORY_ONLY)

As above, you then need to choose whether to process complete records or just extract some specific fields:

val eventStream = stream.toRecords
// or
val locationStream = stream.fields("location")

The DStream contains key-value pairs where the key is the party ID associated with the event, and the value is the event itself (or extracted fields).

Examples

Further examples demonstrating how to use Divolte Spark can be found in the Divolte Examples project under the spark/ and pyspark directories.

License

This project and its artifacts are licensed under the terms of the Apache License, Version 2.0.