Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make State a first class object #119

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,9 @@ ENV/

# PyCharm
.idea/

# VSCode
.vscode/

# mypy
.mypy_cache/
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
'simplejson==3.11.1',
'python-dateutil>=2.6.0',
'backoff==1.8.0',
'ciso8601',
'ciso8601',
'typing-extensions'
],
extras_require={
'dev': [
Expand Down
12 changes: 1 addition & 11 deletions singer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,7 @@
)
from singer.schema import Schema

from singer.bookmarks import (
write_bookmark,
get_bookmark,
clear_bookmark,
reset_stream,
set_offset,
clear_offset,
get_offset,
set_currently_syncing,
get_currently_syncing,
)
from singer.bookmarks import State

if __name__ == "__main__":
import doctest
Expand Down
130 changes: 84 additions & 46 deletions singer/bookmarks.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,84 @@
def ensure_bookmark_path(state, path):
submap = state
for path_component in path:
if submap.get(path_component) is None:
submap[path_component] = {}

submap = submap[path_component]
return state

def write_bookmark(state, tap_stream_id, key, val):
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id])
state['bookmarks'][tap_stream_id][key] = val
return state

def clear_bookmark(state, tap_stream_id, key):
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id])
state['bookmarks'][tap_stream_id].pop(key, None)
return state

def reset_stream(state, tap_stream_id):
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id])
state['bookmarks'][tap_stream_id] = {}
return state

def get_bookmark(state, tap_stream_id, key, default=None):
return state.get('bookmarks', {}).get(tap_stream_id, {}).get(key, default)

def set_offset(state, tap_stream_id, offset_key, offset_value):
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset", offset_key])
state['bookmarks'][tap_stream_id]["offset"][offset_key] = offset_value
return state

def clear_offset(state, tap_stream_id):
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset"])
state['bookmarks'][tap_stream_id]["offset"] = {}
return state

def get_offset(state, tap_stream_id, default=None):
return state.get('bookmarks', {}).get(tap_stream_id, {}).get("offset", default)

def set_currently_syncing(state, tap_stream_id):
state['currently_syncing'] = tap_stream_id
return state

def get_currently_syncing(state, default=None):
return state.get('currently_syncing', default)
from typing import Any, Dict, Optional, Sequence, Union
from .logger import get_logger


LOGGER = get_logger()


class State:
def __init__(
self, bookmarks: Optional[Dict] = None, currently_syncing: Optional[str] = None # pylint: disable=bad-continuation
) -> None:
self._bookmarks = bookmarks or {}
self._currently_syncing = currently_syncing

def __str__(self) -> str:
return str(self.__dict__)

def __eq__(self, other: Any) -> bool:
return self.__dict__ == other.__dict__

@property
def bookmarks(self) -> Dict:
return self._bookmarks

@classmethod
def from_dict(cls, data: Dict) -> "State":
return State(
bookmarks=data.get("bookmarks"),
currently_syncing=data.get("currently_syncing"),
)

def to_dict(self) -> Dict:
state = {"bookmarks": self.bookmarks} # type: Dict[str, Any]
if self.get_currently_syncing():
state["currently_syncing"] = self.get_currently_syncing()
return state

def _ensure_bookmark_path(self, path: Sequence) -> None:
submap = self.bookmarks
for path_component in path:
if submap.get(path_component) is None:
submap[path_component] = {}

submap = submap[path_component]

def write_bookmark(self, tap_stream_id: str, key: str, val: Any) -> None:
self._ensure_bookmark_path((tap_stream_id,))
self.bookmarks[tap_stream_id][key] = val

def clear_bookmark(self, tap_stream_id: str, key: str) -> None:
self._ensure_bookmark_path((tap_stream_id,))
self.bookmarks[tap_stream_id].pop(key, None)

def reset_stream(self, tap_stream_id: str) -> None:
self._ensure_bookmark_path((tap_stream_id,))
self.bookmarks[tap_stream_id] = {}

def get_bookmark(self, tap_stream_id: str, key: str, default: Any = None) -> Any:
return self.bookmarks.get(tap_stream_id, {}).get(key, default)

def set_offset(
self, tap_stream_id: str, offset_key: str, offset_value: Any # pylint: disable=bad-continuation
) -> None:
self._ensure_bookmark_path((tap_stream_id, "offset", offset_key))
self.bookmarks[tap_stream_id]["offset"][offset_key] = offset_value

def clear_offset(self, tap_stream_id: str) -> None:
self._ensure_bookmark_path((tap_stream_id, "offset"))
self.bookmarks[tap_stream_id]["offset"] = {}

def get_offset(
self, tap_stream_id: str, offset_key: str, default: Any = None # pylint: disable=bad-continuation
) -> Any:
return (
self.bookmarks.get(tap_stream_id, {})
.get("offset", {})
.get(offset_key, default)
)

def get_currently_syncing(self, default: Optional[str] = None) -> Optional[str]:
return self._currently_syncing or default

def set_currently_syncing(self, value: Union[str, None]) -> None:
self._currently_syncing = value
3 changes: 1 addition & 2 deletions singer/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys

from . import metadata as metadata_module
from .bookmarks import get_currently_syncing
from .logger import get_logger
from .schema import Schema

Expand Down Expand Up @@ -132,7 +131,7 @@ def get_stream(self, tap_stream_id):
return None

def _shuffle_streams(self, state):
currently_syncing = get_currently_syncing(state)
currently_syncing = state.get_currently_syncing()

if currently_syncing is None:
return self.streams
Expand Down
5 changes: 3 additions & 2 deletions singer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytz
import backoff as backoff_module

from singer.bookmarks import State
from singer.catalog import Catalog

DATETIME_PARSE = "%Y-%m-%dT%H:%M:%SZ"
Expand Down Expand Up @@ -169,9 +170,9 @@ def parse_args(required_config_keys):
args.config = load_json(args.config)
if args.state:
setattr(args, 'state_path', args.state)
args.state = load_json(args.state)
args.state = State.from_dict(load_json(args.state))
else:
args.state = {}
args.state = State()
if args.properties:
setattr(args, 'properties_path', args.properties)
args.properties = load_json(args.properties)
Expand Down
Loading