Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
1.1.7: review all dates/times in records and adjust them of they're n…
Browse files Browse the repository at this point in the history
…ot valid before insert/update (#48)
  • Loading branch information
Samira-El authored Dec 9, 2019
1 parent 93ddc07 commit 5d917ca
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name="pipelinewise-target-snowflake",
version="1.1.6",
version="1.1.7",
description="Singer.io target for loading data to Snowflake - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
47 changes: 42 additions & 5 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
import os
import sys
import copy
import singer

from datetime import datetime
from decimal import Decimal
from tempfile import NamedTemporaryFile, mkstemp

import singer
from typing import Dict
from dateutil import parser
from dateutil.parser import ParserError
from joblib import Parallel, delayed, parallel_backend
from jsonschema import Draft4Validator, FormatChecker

Expand All @@ -22,6 +25,12 @@
DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables
DEFAULT_MAX_PARALLELISM = 16 # Don't use more than this number of threads by default when flushing streams in parallel

# max timestamp/datetime supported in SF, used to reset all invalid dates that are beyond this value
MAX_TIMESTAMP = '9999-12-31 23:59:59.999999'

# max time supported in SF, used to reset all invalid times that are beyond this value
MAX_TIME = '23:59:59.999999'


def float_to_decimal(value):
"""Walk the given data structure and turn all instances of float into double."""
Expand Down Expand Up @@ -98,6 +107,33 @@ def load_information_schema_cache(config):
return information_schema_cache


def adjust_timestamps_in_record(record: Dict, schema: Dict) -> None:
"""
Goes through every field that is of type date/datetime/time and if its value is out of range,
resets it to MAX value accordingly
Args:
record: record containing properties and values
schema: json schema that has types of each property
"""
# creating this internal function to avoid duplicating code and too many nested blocks.
def reset_new_value(record: Dict, key: str, format: str):
try:
parser.parse(record[key])
except ParserError:
record[key] = MAX_TIMESTAMP if format != 'time' else MAX_TIME

for key, value in record.items():
if value is not None and key in schema['properties']:
if 'anyOf' in schema['properties'][key]:
for type_dict in schema['properties'][key]['anyOf']:
if 'string' in type_dict['type'] and type_dict.get('format', None) in {'date-time', 'time', 'date'}:
reset_new_value(record, key, type_dict['format'])
break
else:
if 'string' in schema['properties'][key]['type'] and \
schema['properties'][key].get('format', None) in {'date-time', 'time', 'date'}:
reset_new_value(record, key, schema['properties'][key]['format'])

# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def persist_lines(config, lines, information_schema_cache=None) -> None:
state = None
Expand Down Expand Up @@ -135,6 +171,8 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
# Get schema for this record's stream
stream = o['stream']

adjust_timestamps_in_record(o['record'], schemas[stream])

# Validate record
try:
validators[stream].validate(float_to_decimal(o['record']))
Expand Down Expand Up @@ -190,9 +228,8 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:

stream = o['stream']

schemas[stream] = o
schema = float_to_decimal(o['schema'])
validators[stream] = Draft4Validator(schema, format_checker=FormatChecker())
schemas[stream] = float_to_decimal(o['schema'])
validators[stream] = Draft4Validator(schemas[stream], format_checker=FormatChecker())

# flush records from previous stream SCHEMA
# if same stream has been encountered again, it means the schema might have been altered
Expand Down
51 changes: 50 additions & 1 deletion tests/unit/test_target_snowflake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import unittest
import os
import logging

from unittest.mock import patch

Expand Down Expand Up @@ -31,3 +30,53 @@ def test_persist_lines_with_40_records_and_batch_size_of_20_expect_flushing_once
target_snowflake.persist_lines(self.config, lines)

flush_streams_mock.assert_called_once()

def test_adjust_timestamps_in_record(self):
record = {
'key1': '1',
'key2': '2030-01-22',
'key3': '10000-01-22 12:04:22',
'key4': '25:01:01',
'key5': 'I\'m good',
'key6': None
}

schema = {
'properties': {
'key1': {
'type': ['null', 'string', 'integer'],
},
'key2': {
'anyOf': [
{'type': ['null', 'string'], 'format': 'date'},
{'type': ['null', 'string']}
]
},
'key3': {
'type': ['null', 'string'], 'format': 'date-time',
},
'key4': {
'anyOf': [
{'type': ['null', 'string'], 'format': 'time'},
{'type': ['null', 'string']}
]
},
'key5': {
'type': ['null', 'string'],
},
'key6': {
'type': ['null', 'string'], 'format': 'time',
},
}
}

target_snowflake.adjust_timestamps_in_record(record, schema)

self.assertDictEqual({
'key1': '1',
'key2': '2030-01-22',
'key3': '9999-12-31 23:59:59.999999',
'key4': '23:59:59.999999',
'key5': 'I\'m good',
'key6': None
}, record)

0 comments on commit 5d917ca

Please sign in to comment.