Skip to content

Commit

Permalink
Bump Stream Enrich to 0.16.1 (closes #161)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed May 21, 2018
1 parent ee5dce5 commit a228b39
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 75 deletions.
26 changes: 14 additions & 12 deletions provisioning/resources/configs/enrichments/ip_lookups.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/1-0-0",
"data": {
"name": "ip_lookups",
"vendor": "com.snowplowanalytics.snowplow",
"enabled": true,
"parameters": {
"geo": {
"database": "GeoLiteCity.dat",
"uri": "http://s3-eu-west-1.amazonaws.com/snowplow-hosted-assets/third-party/maxmind"
}
}
}
"schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0",

"data": {

"name": "ip_lookups",
"vendor": "com.snowplowanalytics.snowplow",
"enabled": true,
"parameters": {
"geo": {
"database": "GeoLite2-City.mmdb",
"uri": "http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/pii_enrichment_config/jsonschema/1-0-0",
"data": {
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "pii_enrichment_config",
"enabled": true,
"parameters": {
"pii": [
{
"pojo": {
"field": "user_id"
}
},
{
"pojo": {
"field": "user_fingerprint"
}
},
{
"json": {
"field": "unstruct_event",
"schemaCriterion": "iglu:com.mailchimp/subscribe/jsonschema/1-0-*",
"jsonPath": "$.data.['email', 'ip_opt']"
}
}
],
"strategy": {
"pseudonymize": {
"hashFunction": "SHA-256"
}
}
}
}
}
145 changes: 85 additions & 60 deletions provisioning/resources/configs/snowplow-stream-enrich.hocon
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
Expand All @@ -15,33 +15,13 @@
# configuration options for Stream Enrich.

enrich {
# Sources currently supported are:
# 'kinesis' for reading Thrift-serialized records from a Kinesis stream
# 'kafka' for reading Thrift-serialized records from a Kafka topic
# 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
# 'nsq' for reading Base64-encoded Thrift-serialized records from NSQ
source = nsq

# Sinks currently supported are:
# 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
# 'kafka' for writing enriched events to one Kafka topic and invalid events to another.
# 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
# Using "sbt assembly" and "java -jar" is recommended to disable sbt logging.
# 'nsq' for writing enriched events to one NSQ topic and invalid events to another.
sink = nsq

# AWS credentials
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = ""
secretKey = ""
}

streams {
# Stream/topic where the raw events to be enriched are located
in.raw = RawEvents

in {
# Stream/topic where the raw events to be enriched are located
raw = RawEvents
}

out {
# Stream/topic where the events that were successfully enriched will end up
Expand All @@ -53,59 +33,94 @@ enrich {
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# possible partition keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = ""
}

kinesis {
# Configuration shown is for Kinesis, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'googlepubsub' for reading / writing to a Google PubSub topic
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
enabled = nsq

# Region where the streams are located
region = ""
# region = {{region}}

# AWS credentials
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = ""
secretKey = ""
}

# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# maxRecords = 10000

# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# Note: This only effects the first run of this application
# on a stream.
initialPosition = TRIM_HORIZON

# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 50
maxBackoff = 500
}
}
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# initialPosition = TRIM_HORIZON

# Kafka configuration
kafka {
brokers = ""
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
# initialTimestamp = "{{initialTimestamp}}"

# Number of retries to perform before giving up on sending a record
retries = 0
}

# config for nsq
nsq {
# Channel name for raw event source
# Minimum and maximum backoff periods, in milliseconds
# backoffPolicy {
# minBackoff = {{enrichStreamsOutMinBackoff}}
# maxBackoff = {{enrichStreamsOutMaxBackoff}}
# }

# Or Google PubSub
#googleProjectId = my-project-id
## Size of the subscriber thread pool
#threadPoolSize = 4
## Minimum, maximum and total backoff periods, in milliseconds
## and multiplier between two backoffs
#backoffPolicy {
# minBackoff = {{enrichStreamsOutMinBackoff}}
# maxBackoff = {{enrichStreamsOutMaxBackoff}}
# totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
# multiplier = {{enrichStreamsOutTotalBackoff}}
#}

# Or Kafka
#brokers = "{{kafkaBrokers}}"
## Number of retries to perform before giving up on sending a record
#retries = 0

# Or NSQ
## Channel name for nsq source
## If more than one application is reading from the same NSQ topic at the same time,
## all of them must have the same channel name
rawChannel = StreamEnrichChannel

# Host name for nsqd
## Host name for nsqd
host = "127.0.0.1"

# TCP port for nsqd
## TCP port for nsqd, 4150 by default
port = 4150

# Host name for lookupd
## Host name for lookupd
lookupHost = "127.0.0.1"

# Port for nsqlookupd
## HTTP port for nsqlookupd, 4161 by default
lookupPort = 4161
}

# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# NOTE: Buffering is not supported by NSQ.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
Expand All @@ -119,7 +134,17 @@ enrich {

# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
# Used as the Google PubSub subscription name.
appName = ""
}

# Optional section for tracking endpoints
#monitoring {
# snowplow {
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = {{enrichAppName}}
# method = GET
# }
#}
}
2 changes: 1 addition & 1 deletion provisioning/resources/init/snowplow_stream_enrich
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
### END INIT INFO

dir="/home/ubuntu/snowplow/bin/"
cmd="java -jar snowplow-stream-enrich-0.12.0.jar --config /home/ubuntu/snowplow/configs/snowplow-stream-enrich.hocon --resolver file:/home/ubuntu/snowplow/configs/iglu-resolver.json --enrichments file:/home/ubuntu/snowplow/configs/enrichments"
cmd="java -jar snowplow-stream-enrich-nsq-0.16.1.jar --config /home/ubuntu/snowplow/configs/snowplow-stream-enrich.hocon --resolver file:/home/ubuntu/snowplow/configs/iglu-resolver.json --enrichments file:/home/ubuntu/snowplow/configs/enrichments"
user="ubuntu"

name="snowplow_stream_enrich"
Expand Down
2 changes: 1 addition & 1 deletion provisioning/resources/ui/js/components/Overview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class Overview extends React.Component<{}, {}> {
<h3>The software stack installed: </h3>
<ul>
<li>Snowplow Stream Collector 0.11.0</li>
<li>Snowplow Stream Enrich 0.12.0</li>
<li>Snowplow Stream Enrich 0.16.1</li>
<li>Snowplow Elasticsearch Sink 0.10.1</li>
<li>Snowplow Iglu Server 0.3.0</li>
<li>NSQ 1.0.0</li>
Expand Down
2 changes: 1 addition & 1 deletion provisioning/roles/sp_mini_4_setup_apps/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
- name: Set variables
set_fact:
stream_collector_package: 'snowplow_scala_stream_collector_0.11.0.zip'
stream_enrich_package: 'snowplow_stream_enrich_0.12.0.zip'
stream_enrich_package: 'snowplow_stream_enrich_nsq_0.16.1.zip'
es_loader_package: 'snowplow_elasticsearch_loader_http_0.10.1.zip'
iglu_server_package: 'iglu_server_0.3.0.zip'
kibana_v: '4.0.1'
Expand Down

0 comments on commit a228b39

Please sign in to comment.