Skip to content

Commit

Permalink
Python sources refactoring (airbytehq#592)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Oct 16, 2020
1 parent bd69601 commit bc56f02
Show file tree
Hide file tree
Showing 65 changed files with 606 additions and 279 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ build
!tools/build
.DS_Store
data

# Python
*.egg-info
__pycache__
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.7.9
5 changes: 1 addition & 4 deletions airbyte-integrations/base-python/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
*
!Dockerfile
!base.py
!airbyte_protocol
build
1 change: 1 addition & 0 deletions airbyte-integrations/base-python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte_protocol/models/yaml
21 changes: 7 additions & 14 deletions airbyte-integrations/base-python/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
FROM python:3.7-slim
COPY --from=airbyte/integration-base:dev /airbyte /airbyte

WORKDIR /airbyte
ENV VIRTUAL_ENV=/airbyte/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

WORKDIR /airbyte/airbyte_protocol
COPY airbyte_protocol .
WORKDIR /airbyte/base_python_code
COPY airbyte_protocol ./airbyte_protocol
COPY setup.py ./
RUN pip install .

WORKDIR /airbyte/base-python
COPY base.py .

ENV AIRBYTE_SPEC_CMD "python3 /airbyte/base-python/base.py spec"
ENV AIRBYTE_CHECK_CMD "python3 /airbyte/base-python/base.py check"
ENV AIRBYTE_DISCOVER_CMD "python3 /airbyte/base-python/base.py discover"
ENV AIRBYTE_READ_CMD "python3 /airbyte/base-python/base.py read"
ENV AIRBYTE_SPEC_CMD "base-python spec"
ENV AIRBYTE_CHECK_CMD "base-python check"
ENV AIRBYTE_DISCOVER_CMD "base-python discover"
ENV AIRBYTE_READ_CMD "base-python read"

ENTRYPOINT ["/airbyte/base.sh"]

Expand Down
12 changes: 12 additions & 0 deletions airbyte-integrations/base-python/airbyte_protocol/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .integration import *
from .logger import AirbyteLogger
from .models import AirbyteCatalog
from .models import AirbyteLogMessage
from .models import AirbyteMessage
from .models import AirbyteRecordMessage
from .models import AirbyteStateMessage
from .models import AirbyteStream

# Must be the last one because the way we load the connector module creates a circular
# dependency and models might not have been loaded yet
from .entrypoint import AirbyteEntrypoint

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
import argparse
import importlib
import os.path
import sys
import tempfile
import os.path
import importlib

from airbyte_protocol import ConfigContainer
from airbyte_protocol import Source
from airbyte_protocol import AirbyteLogger
from airbyte_protocol import AirbyteLogMessage
from airbyte_protocol import AirbyteMessage

impl_module = os.environ['AIRBYTE_IMPL_MODULE']
impl_class = os.environ['AIRBYTE_IMPL_PATH']
from .integration import ConfigContainer, Source
from .logger import AirbyteLogger

impl_module = os.environ.get('AIRBYTE_IMPL_MODULE', Source.__module__)
impl_class = os.environ.get('AIRBYTE_IMPL_PATH', Source.__name__)
module = importlib.import_module(impl_module)
impl = getattr(module, impl_class)

logger = AirbyteLogger()


class AirbyteEntrypoint(object):
def __init__(self, source):
self.source = source

def start(self):
def start(self, args):
# set up parent parsers
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -57,23 +54,25 @@ def start(self):
help='path to the catalog used to determine which data to read')

# parse the args
parsed_args = main_parser.parse_args()
parsed_args = main_parser.parse_args(args)

# execute
cmd = parsed_args.command
if not cmd:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes

with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
# todo: output this as a JSON formatted message
print(source.spec().spec_string)
print(self.source.spec(logger).spec_string)
sys.exit(0)

rendered_config_path = os.path.join(temp_dir, 'config.json')
raw_config = source.read_config(parsed_args.config)
rendered_config = source.transform_config(raw_config)
source.write_config(rendered_config, rendered_config_path)
raw_config = self.source.read_config(parsed_args.config)
rendered_config = self.source.transform_config(raw_config)
self.source.write_config(rendered_config, rendered_config_path)

config_container = ConfigContainer(
raw_config=raw_config,
Expand All @@ -82,30 +81,35 @@ def start(self):
rendered_config_path=rendered_config_path)

if cmd == "check":
check_result = source.check(logger, config_container)
check_result = self.source.check(logger, config_container)
if check_result.successful:
logger.info("Check succeeded")
sys.exit(0)
else:
logger.error("Check failed")
sys.exit(1)
elif cmd == "discover":
catalog = source.discover(logger, config_container)
catalog = self.source.discover(logger, config_container)
print(catalog.serialize())
sys.exit(0)
elif cmd == "read":
generator = source.read(logger, config_container, parsed_args.catalog, parsed_args.state)
generator = self.source.read(logger, config_container, parsed_args.catalog, parsed_args.state)
for message in generator:
print(message.serialize())
sys.exit(0)
else:
raise Exception("Unexpected command " + cmd)


# set up and run entrypoint
source = impl()
def launch(source, args):
AirbyteEntrypoint(source).start(args)


def main():
# set up and run entrypoint
source = impl()

if not isinstance(source, Source):
raise Exception("Source implementation provided does not implement Source class!")
if not isinstance(source, Source):
raise Exception("Source implementation provided does not implement Source class!")

AirbyteEntrypoint(source).start()
launch(source, sys.argv[1:])
75 changes: 75 additions & 0 deletions airbyte-integrations/base-python/airbyte_protocol/integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import json
import pkgutil
from dataclasses import dataclass
from typing import Generator

from .models import AirbyteCatalog, AirbyteMessage


class AirbyteSpec(object):
@staticmethod
def from_file(file):
with open(file) as file:
spec_text = file.read()
return AirbyteSpec(spec_text)

def __init__(self, spec_string):
self.spec_string = spec_string


class AirbyteCheckResponse(object):
def __init__(self, successful, field_to_error):
self.successful = successful
self.field_to_error = field_to_error


@dataclass
class ConfigContainer:
raw_config: object
rendered_config: object
raw_config_path: str
rendered_config_path: str


class Integration(object):
def __init__(self):
pass

def spec(self, logger) -> AirbyteSpec:
raw_spec = pkgutil.get_data(self.__class__.__module__.split('.')[0], 'spec.json')
# we need to output a spec on a single line
flattened_json = json.dumps(json.loads(raw_spec))
return AirbyteSpec(flattened_json)

def read_config(self, config_path):
with open(config_path, 'r') as file:
contents = file.read()
return json.loads(contents)

# can be overridden to change an input file config
def transform_config(self, raw_config):
return raw_config

def write_config(self, config_object, path):
with open(path, 'w') as fh:
fh.write(json.dumps(config_object))

def check(self, logger, config_container) -> AirbyteCheckResponse:
raise Exception("Not Implemented")

def discover(self, logger, config_container) -> AirbyteCatalog:
raise Exception("Not Implemented")


class Source(Integration):
def __init__(self):
super().__init__()

# Iterator<AirbyteMessage>
def read(self, logger, config_container, catalog_path, state=None) -> Generator[AirbyteMessage, None, None]:
raise Exception("Not Implemented")


class Destination(Integration):
def __init__(self):
super().__init__()
Loading

0 comments on commit bc56f02

Please sign in to comment.