NOTE: This project is no longer being updated publicly.
Table of Contents
The ingester manages loading processed event data into a redshift database and migrating the schemas of the database. At the highest level, it does this by receiving pointers to tsv files and loads them in batches, and migrates tables if it discovers a new version.
The metadatastorer (code) is a separate binary that has the simple task of reading messages off an SQS queue and then writing them as rows in a postgres metadata database. The incoming messages look like:
{
"KeyName":"spade-compacter-prod/20160729/oauth_authorize/v0/processor-data-ami-94f837f4/ip-10-192-9-216.us-west-2.compute.internal.1469832764.log.gz",
"TableName":"oauth_authorize",
"TableVersion":0
}
and get stored into the tsv
table, whose schema is
here.
The rsloadmanager (code) is the main binary that performs two major functions:
- Batch tsvs, and tell redshift to load them with manifests.
- Migrate schemas to new versions.
The loaders are a pool of goroutines that manages loading the tsvs into the redshift database.
Each goroutine does the following:
- It searches the
tsv
table for events that have--loadAgeSeconds
old tsvs, or--loadCountTrigger
many rows (both configurable) and pulls the oldest to load that is the current table version. - It then creates a row in the
manifest
table and sets themanifest_uuid
on the rows intsv
corresponding to that table-version. - It creates a manifest in s3 of all those s3 keys (from
the
tsv
rows). - Then it submits a
COPY
query to redshift, pointing at that manifest. If the load succeeds, the files and manifest are deleted fromtsv
andmanifest
.
The migrator (code) is a separate goroutine that discovers schemas that need to be migrated, and then migrates them.
On startup,
a shared (across all the goroutines) map of table
name to version number is pulled from the redshift table infra.table_version
.
The migrator does the following:
- It periodically polls the
tsv
table for(event_name, version)
pairs, and compares them to its table version cache. If it discovers a version that is higher than the one in its cache (or it isn't in the cache), that table needs to be migrated. - Then it hits blueprint's
/migration
endpoint to discover the operations it needs to apply to reach the next version. Example of the endpoint:
GET http://<blueprint>/migration/minute-watched?to_version=1
response body:
[
{"Action":"add","Name":"time","ActionMetadata":{"column_options":" sortkey","column_type":"f@timestamp@unix","inbound":"time"}
{"Action":"add","Name":"browser","ActionMetadata":{"column_options":"(180)","column_type":"varchar","inbound":"browser"}
{"Action":"add","Name":"channel","ActionMetadata":{"column_options":"(25)","column_type":"varchar","inbound":"channel"}
...
]
- It then runs the
CREATE TABLE
orALTER
query and updatesinfra.table_version
in a transaction, and updates its local cache. It then moves on to the next migration.
The migrator also handles calls to the /control/increment_version/:id
endpoint (see below).
It handles the necessary updates to infra.table_version
and the in-memory version cache so that
only one goroutine is ever modifying them.
The control module provides an API to control aspects parts of the ingester, called from blueprint.
On error, each of these endpoints returns a 4xx or 5xx and a JSON object: {"Error": }
POST endpoints:
/control/force_load
: Execute a force load. On success, response is empty with 204 (no content) status code. Body of request must be JSON with:
Table: name of the table to load
Requester: name of the person or system requesting the load
/control/increment_version/:id
: Increment a table's version without waiting for a TSV to come in and the migration to be executed. On success, response is empty with 204 (no content) status code.
GET endpoints:
/control/table_exists/:id
: Return if a table exists in theinfra.table_versions
table. Can return false positives for tables that have been dropped.
Response format:
{"Exists": bool}
Blueprint's UI forwards to the force load endpoint in response to a button press, and uses increment version to drop tables which don't have any events being sent.