Skip to content

Commit

Permalink
Add key properties to SCHEMA message; Migrate fields to metadata (#25)
Browse files Browse the repository at this point in the history
* Add key properties to SCHEMA message; Migrate applicable fields to metadata

* Move database name to metadata and refactor code

* Fix test
  • Loading branch information
Gbolahan Okerayi authored and rflprr committed Jun 13, 2018
1 parent 6918196 commit 228ffd8
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 80 deletions.
48 changes: 29 additions & 19 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ Example:
"tap_stream_id": "sample-dbname.public.sample-name",
"stream": "sample-stream",
"database_name": "sample-dbname",
"table_name": "public.sample-name",
"is_view": false,
"key_properties": [
"id"
],
"table_name": "public.sample-name"
"schema": {
"properties": {
"id": {
Expand Down Expand Up @@ -113,14 +109,21 @@ Example:
{
"metadata": {
"selected-by-default": false,
"selected": true
"selected": true,
"is-view": false,
"table-key-properties": ["id"],
"schema-name": "sample-stream",
"valid-replication-keys": [
"updated_at"
]
},
"breadcrumb": [],
},
{
"metadata": {
"selected-by-default": true,
"sql-datatype": "int2"
"sql-datatype": "int2",
"inclusion": "automatic"
},
"breadcrumb": [
"properties",
Expand All @@ -130,7 +133,8 @@ Example:
{
"metadata": {
"selected-by-default": true,
"sql-datatype": "varchar"
"sql-datatype": "varchar",
"inclusion": "available"
},
"breadcrumb": [
"properties",
Expand All @@ -140,7 +144,8 @@ Example:
{
"metadata": {
"selected-by-default": true,
"sql-datatype": "datetime"
"sql-datatype": "datetime",
"inclusion": "available",
},
"breadcrumb": [
"properties",
Expand Down Expand Up @@ -171,7 +176,8 @@ Example:
{
"breadcrumb": [],
"metadata": {
"selected-by-default": false
"selected-by-default": false,
...
}
}
]
Expand All @@ -187,7 +193,8 @@ Example:
"breadcrumb": [],
"metadata": {
"selected": true,
"selected-by-default": false
"selected-by-default": false,
...
}
}
]
Expand Down Expand Up @@ -217,21 +224,24 @@ Incremental replication works in conjunction with a state file to only extract n
time the tap is invoked i.e continue from the last synced data.

To use incremental replication, we need to add the ``replication_method`` and ``replication_key``
to the top level under each stream in the ``catalog.json`` file.
to the streams (tables) metadata in the ``catalog.json`` file.

Example:

.. code-block:: json
{
"streams": [
{
"replication_method": "INCREMENTAL",
"replication_key": "updated_at",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"selected": true,
"selected-by-default": false,
"replication-method": "INCREMENTAL",
"replication-key": "updated_at",
...
}
]
}
}
]
We can then invoke the tap again in sync mode. This time the output will have ``STATE`` messages
that contains a ``replication_key_value`` and ``bookmark`` for data that were extracted.
Expand Down
86 changes: 55 additions & 31 deletions tap_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from tap_redshift import resolve

__version__ = '1.0.0b7'
__version__ = '1.0.0b8'

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -126,24 +126,22 @@ def discover_catalog(conn, db_schema):
schema = Schema(type='object',
properties={
c['name']: schema_for_column(c) for c in cols})
metadata = create_column_metadata(cols)
key_properties = [
column for column in table_pks.get(table_name, [])
if schema.properties[column].inclusion != 'unsupported']
is_view = table_types.get(table_name) == 'VIEW'
db_name = conn.get_dsn_parameters()['dbname']
metadata = create_column_metadata(
db_name, cols, is_view, table_name, key_properties)
tap_stream_id = '{}.{}'.format(
conn.get_dsn_parameters()['dbname'], qualified_table_name)
db_name, qualified_table_name)
entry = CatalogEntry(
database=conn.get_dsn_parameters()['dbname'],
tap_stream_id=tap_stream_id,
stream=table_name,
schema=schema,
table=qualified_table_name,
metadata=metadata)
key_properties = [
column for column in table_pks.get(table_name, [])
if schema.properties[column].inclusion != 'unsupported']

if key_properties:
entry.key_properties = key_properties

entry.is_view = table_types.get(table_name) == 'VIEW'
entries.append(entry)

return Catalog(entries)
Expand Down Expand Up @@ -200,9 +198,20 @@ def schema_for_column(c):
return result


def create_column_metadata(cols):
def create_column_metadata(
db_name, cols, is_view,
table_name, key_properties=[]):
mdata = metadata.new()
mdata = metadata.write(mdata, (), 'selected-by-default', False)
if not is_view:
mdata = metadata.write(
mdata, (), 'table-key-properties', key_properties)
else:
mdata = metadata.write(
mdata, (), 'view-key-properties', key_properties)
mdata = metadata.write(mdata, (), 'is-view', is_view)
mdata = metadata.write(mdata, (), 'schema-name', table_name)
mdata = metadata.write(mdata, (), 'database-name', db_name)
valid_rep_keys = []

for c in cols:
Expand All @@ -219,6 +228,10 @@ def create_column_metadata(cols):
('properties', c['name']),
'sql-datatype',
c['type'].lower())
mdata = metadata.write(mdata,
('properties', c['name']),
'inclusion',
schema.inclusion)
if valid_rep_keys:
mdata = metadata.write(mdata, (), 'valid-replication-keys',
valid_rep_keys)
Expand Down Expand Up @@ -297,9 +310,8 @@ def sync_table(connection, catalog_entry, state):
formatted_start_date = str(datetime.datetime.strptime(
start_date, '%Y-%m-%dT%H:%M:%SZ'))

replication_key = singer.get_bookmark(state,
tap_stream_id,
'replication_key')
replication_key = metadata.to_map(catalog_entry.metadata).get(
(), {}).get('replication-key')
replication_key_value = None
bookmark_is_empty = state.get('bookmarks', {}).get(
tap_stream_id) is None
Expand Down Expand Up @@ -389,6 +401,13 @@ def generate_messages(conn, db_schema, catalog, state):
for catalog_entry in catalog.streams:
state = singer.set_currently_syncing(state,
catalog_entry.tap_stream_id)
catalog_md = metadata.to_map(catalog_entry.metadata)

if catalog_md.get((), {}).get('is-view'):
key_properties = catalog_md.get((), {}).get('view-key-properties')
else:
key_properties = catalog_md.get((), {}).get('table-key-properties')
bookmark_properties = catalog_md.get((), {}).get('replication-key')

# Emit a state message to indicate that we've started this stream
yield singer.StateMessage(value=copy.deepcopy(state))
Expand All @@ -397,7 +416,8 @@ def generate_messages(conn, db_schema, catalog, state):
yield singer.SchemaMessage(
stream=catalog_entry.stream,
schema=catalog_entry.schema.to_dict(),
key_properties=catalog_entry.key_properties)
key_properties=key_properties,
bookmark_properties=bookmark_properties)

# Emit a RECORD message for each record in the result set
with metrics.job_timer('sync_table') as timer:
Expand Down Expand Up @@ -439,33 +459,37 @@ def build_state(raw_state, catalog):

for catalog_entry in catalog.streams:
tap_stream_id = catalog_entry.tap_stream_id
if catalog_entry.replication_key:
state = singer.write_bookmark(state,
tap_stream_id,
'replication_key',
catalog_entry.replication_key)
catalog_metadata = metadata.to_map(catalog_entry.metadata)
replication_method = catalog_metadata.get(
(), {}).get('replication-method')
raw_stream_version = singer.get_bookmark(
raw_state, tap_stream_id, 'version')

if replication_method == 'INCREMENTAL':
replication_key = catalog_metadata.get(
(), {}).get('replication-key')

state = singer.write_bookmark(
state, tap_stream_id, 'replication_key', replication_key)

# Only keep the existing replication_key_value if the
# replication_key hasn't changed.
raw_replication_key = singer.get_bookmark(raw_state,
tap_stream_id,
'replication_key')
if raw_replication_key == catalog_entry.replication_key:
rep_key_val = singer.get_bookmark(raw_state,
tap_stream_id,
'replication_key_value')
raw_replication_key_value = rep_key_val
if raw_replication_key == replication_key:
raw_replication_key_value = singer.get_bookmark(
raw_state, tap_stream_id, 'replication_key_value')
state = singer.write_bookmark(state,
tap_stream_id,
'replication_key_value',
raw_replication_key_value)

# Persist any existing version, even if it's None
if raw_state.get('bookmarks', {}).get(tap_stream_id):
raw_stream_version = singer.get_bookmark(raw_state,
tap_stream_id,
'version')
if raw_stream_version is not None:
state = singer.write_bookmark(
state, tap_stream_id, 'version', raw_stream_version)

elif replication_method == 'FULL_TABLE' and raw_stream_version is None:
state = singer.write_bookmark(state,
tap_stream_id,
'version',
Expand Down
5 changes: 1 addition & 4 deletions tap_redshift/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,9 @@ def resolve_catalog(discovered, catalog, state):
result.streams.append(CatalogEntry(
tap_stream_id=catalog_entry.tap_stream_id,
stream=catalog_entry.stream,
database=catalog_entry.database,
table=catalog_entry.table,
is_view=catalog_entry.is_view,
schema=schema,
replication_key=catalog_entry.replication_key,
key_properties=catalog_entry.key_properties
metadata=catalog_entry.metadata
))

return result
Loading

0 comments on commit 228ffd8

Please sign in to comment.