Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

working python source #533

Merged
merged 34 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5f8f787
use log messages for check and add todos
jrhizor Oct 9, 2020
5a1c16e
Merge branch 'master' into jrhizor/py-schema-converter
jrhizor Oct 9, 2020
13eea10
working state
jrhizor Oct 9, 2020
92b3bd7
handle stderr in discoer and use airbyte catalog as output
jrhizor Oct 13, 2020
d732de6
Merge branch 'master' into jrhizor/py-schema-converter
jrhizor Oct 13, 2020
f701fa4
misc restructure
jrhizor Oct 13, 2020
7cfb17c
inject logger again
jrhizor Oct 13, 2020
07e7e4d
use config container (still at the base level)
jrhizor Oct 13, 2020
d394766
add tests and fix logging for check command
jrhizor Oct 13, 2020
cde03b9
move michel's branch into this one
jrhizor Oct 13, 2020
f926838
fmt
jrhizor Oct 13, 2020
4d5c0c9
fix invalid credential test
jrhizor Oct 13, 2020
84c7db9
restructure catalog helpers
jrhizor Oct 13, 2020
09a442c
terrible hack to get stripe to not fail completely
jrhizor Oct 13, 2020
4ad1db2
fix discovery test now that a single message contains the catalog
jrhizor Oct 13, 2020
dda65c3
all working tests except sync
jrhizor Oct 13, 2020
bbe34ad
partially working mapping (in wrong location)
jrhizor Oct 13, 2020
b4a6e92
working but untested reads
jrhizor Oct 13, 2020
5fe3a1e
split logic out
jrhizor Oct 14, 2020
da2d155
fmt
jrhizor Oct 14, 2020
8fc3b1a
fix deadlock bug
jrhizor Oct 14, 2020
9a23ff3
convert catalog in test to airbyte catalog format
jrhizor Oct 14, 2020
f485b29
fix stripe tests
jrhizor Oct 14, 2020
8bf9d50
fmt
jrhizor Oct 14, 2020
76a97e5
Merge branch 'master' into jrhizor/py-schema-converter
jrhizor Oct 14, 2020
9342e08
remoce unused log command
jrhizor Oct 14, 2020
75ddaa4
output credentials
jrhizor Oct 14, 2020
4cfaf8e
clean up optional state command
jrhizor Oct 14, 2020
4aa95e0
use integration launcher
jrhizor Oct 14, 2020
0751013
DRY up read eval
jrhizor Oct 14, 2020
df15670
logging fixes and remove break
jrhizor Oct 14, 2020
d380330
rename
jrhizor Oct 14, 2020
bef954b
fmt
jrhizor Oct 14, 2020
8f673ee
fix inverted if statement
jrhizor Oct 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Generator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are there two airbyte_protocol directories? It's not clear to me what the difference between the two is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like standard structure for python packages:

pkg
pkg/setup.py
pkg/pkg
pkg/pkg/__init__.py
pkg/pkg/*

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, thanks for bearing with my un-pythonic ways :P

import yaml
import json
import pkgutil
import warnings
import python_jsonschema_objects as pjs
from dataclasses import dataclass


def _load_classes(yaml_path: str):
Expand Down Expand Up @@ -36,38 +38,30 @@ def __init__(self, successful, field_to_error):
self.field_to_error = field_to_error


class AirbyteSchema(object):
def __init__(self, schema):
self.schema = schema


class AirbyteConfig(object):
def __init__(self, config_string):
self.config_string = config_string


class Integration(object):
def __init__(self):
pass

def spec(self) -> AirbyteSpec:
raise Exception("Not Implemented")

# default version reads the config_path to a string
# this will often be overwritten to add fields for easy consumption or to modify the string for delegating to singer
def read_config(self, config_path) -> AirbyteConfig:
def read_config(self, config_path):
with open(config_path, 'r') as file:
contents = file.read()
return AirbyteConfig(contents)
return json.loads(contents)

def render_config(self, config_object, rendered_config_path):
with open(rendered_config_path, 'w') as fh:
fh.write(config_object.config_string)
# can be overridden to change an input file config
def transform_config(self, raw_config):
return raw_config

def check(self, config_object, rendered_config_path) -> AirbyteCheckResponse:
def write_config(self, config_object, path):
jrhizor marked this conversation as resolved.
Show resolved Hide resolved
with open(path, 'w') as fh:
fh.write(json.dumps(config_object))

def check(self, logger, config_container) -> AirbyteCheckResponse:
raise Exception("Not Implemented")

def discover(self, config_object, rendered_config_path) -> AirbyteSchema:
def discover(self, logger, config_container) -> AirbyteCatalog:
raise Exception("Not Implemented")


Expand All @@ -76,10 +70,34 @@ def __init__(self):
pass

# Iterator<AirbyteMessage>
def read(self, config_object, rendered_config_path, state=None) -> Generator[AirbyteMessage, None, None]:
def read(self, logger, config_container, catalog_path, state=None) -> Generator[AirbyteMessage, None, None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will come in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is expected to be implemented by specific sources. There is no default implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, makes sense

raise Exception("Not Implemented")


class Destination(Integration):
def __init__(self):
pass


valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]
jrhizor marked this conversation as resolved.
Show resolved Hide resolved


def log_line(line, default_level):
split_line = line.split()
first_word = next(iter(split_line), None)
if first_word in valid_log_types:
log_level = first_word
rendered_line = " ".join(split_line[1:])
else:
log_level = default_level
rendered_line = line
log_record = AirbyteLogMessage(level=log_level, message=rendered_line)
log_message = AirbyteMessage(type="LOG", log=log_record)
print(log_message.serialize())

@dataclass
class ConfigContainer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to explain what these fields are.

raw_config: object
rendered_config: object
raw_config_path: str
rendered_config_path: str
41 changes: 28 additions & 13 deletions airbyte-integrations/base-python/base.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import argparse
import logging
import sys
import tempfile
import os.path
import importlib

from airbyte_protocol import ConfigContainer
from airbyte_protocol import Source
from airbyte_protocol import AirbyteLogMessage
from airbyte_protocol import AirbyteMessage
from airbyte_protocol import log_line

impl_module = os.environ['AIRBYTE_IMPL_MODULE']
impl_class = os.environ['AIRBYTE_IMPL_PATH']

module = importlib.import_module(impl_module)
impl = getattr(module, impl_class)


def log(level, text):
log_message = AirbyteLogMessage(level=level, message=text)
message = AirbyteMessage(type="LOG", log=log_message)
print(message.serialize)


class AirbyteEntrypoint(object):
def __init__(self, source):
self.source = source
Expand Down Expand Up @@ -43,8 +53,8 @@ def start(self):
# read
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT",
parents=[parent_parser])
# todo: re-add state handling
# read_parser.add_argument('--state', type=str, required=False, help='path to the json-encoded state file')

read_parser.add_argument('--state', type=str, required=False, help='path to the json-encoded state file')
required_read_parser = read_parser.add_argument_group('required named arguments')
required_read_parser.add_argument('--config', type=str, required=True,
help='path to the json configuration file')
Expand All @@ -66,25 +76,30 @@ def start(self):
sys.exit(0)

rendered_config_path = os.path.join(temp_dir, 'config.json')
config_object = source.read_config(parsed_args.config)
source.render_config(config_object, rendered_config_path)
raw_config = source.read_config(parsed_args.config)
rendered_config = source.transform_config(raw_config)
source.write_config(rendered_config, rendered_config_path)

config_container = ConfigContainer(
raw_config=raw_config,
rendered_config=rendered_config,
raw_config_path=parsed_args.config,
rendered_config_path=rendered_config_path)

# todo: output message for check
if cmd == "check":
check_result = source.check(logging, rendered_config_path)
check_result = source.check(log_line, config_container)
if check_result.successful:
print("Check succeeded")
log_line("Check succeeded", "INFO")
sys.exit(0)
else:
print("Check failed")
log_line("Check failed", "ERROR")
sys.exit(1)
elif cmd == "discover":
schema = source.discover(logging, rendered_config_path)
print(schema.schema)
catalog = source.discover(log_line, config_container)
print(catalog.serialize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is print the standard way of logging in python or is there a "slf4j" equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to dictate the precise format of what we're outputting, we don't want an slf4j/logger equivalent.

It's debatable if we should use sys.stdout.write() or print, but print just seemed easier (we do want newlines after everthing, and we it's easy to use the string conversions in some cases).

sys.exit(0)
elif cmd == "read":
# todo: pass in state
generator = source.read(logging, rendered_config_path)
generator = source.read(log_line, config_container, parsed_args.catalog, parsed_args.state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have have a similar "container" for catalog and state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do this in a separate PR

for message in generator:
print(message.serialize())
sys.exit(0)
Expand Down
7 changes: 5 additions & 2 deletions airbyte-integrations/base/base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ function main() {
eval "$AIRBYTE_DISCOVER_CMD" --config "$CONFIG_FILE"
;;
read)
# todo: state should be optional: --state "$STATE_FILE"
eval "$AIRBYTE_READ_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE"
if [[ -z "$STATE_FILE" ]]; then
jrhizor marked this conversation as resolved.
Show resolved Hide resolved
eval "$AIRBYTE_READ_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE"
else
eval "$AIRBYTE_READ_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE" --state "$STATE_FILE"
fi
;;
write)
eval "$AIRBYTE_WRITE_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE"
Expand Down
104 changes: 72 additions & 32 deletions airbyte-integrations/singer/base-singer/base_singer/singer_helpers.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
import json
import subprocess
from airbyte_protocol import AirbyteSpec
from airbyte_protocol import AirbyteSchema
from airbyte_protocol import AirbyteCatalog
from airbyte_protocol import AirbyteMessage
from airbyte_protocol import AirbyteLogMessage
from airbyte_protocol import AirbyteRecordMessage
from airbyte_protocol import AirbyteStateMessage
from airbyte_protocol import AirbyteStream
from typing import Generator
from datetime import datetime


# helper to delegate input and output to a piped command
# todo: add error handling (make sure the overall tap fails if there's a failure in here)

valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]


def log_line(line, default_level):
split_line = line.split()
first_word = next(iter(split_line), None)
if first_word in valid_log_types:
log_level = first_word
rendered_line = " ".join(split_line[1:])
else:
log_level = default_level
rendered_line = line
log_record = AirbyteLogMessage(level=log_level, message=rendered_line)
log_message = AirbyteMessage(type="LOG", log=log_record)
print(log_message.serialize())
from dataclasses import dataclass


def to_json(string):
Expand All @@ -37,27 +19,58 @@ def to_json(string):
return False


def is_field_metadata(metadata):
if len(metadata.get("breadcrumb")) != 2:
return False
else:
return metadata.get("breadcrumb")[0] != "property"


@dataclass
class Catalogs:
singer_catalog: object
airbyte_catalog: AirbyteCatalog


class SingerHelper:
@staticmethod
def spec_from_file(spec_path) -> AirbyteSpec:
with open(spec_path) as file:
spec_text = file.read()
return AirbyteSpec(spec_text)

# todo: support stderr in the discover process
@staticmethod
def discover(shell_command, transform=(lambda x: AirbyteSchema(x))) -> AirbyteSchema:
def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog)) -> Catalogs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is singer_transform and airbyte_transform used for?

completed_process = subprocess.run(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
return transform(completed_process.stdout)

for line in completed_process.stderr.splitlines():
logger(line, "ERROR")

airbyte_streams = []
singer_catalog = singer_transform(json.loads(completed_process.stdout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only try to load the line if the line starts with { so we don't have the risk of reading some debug logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most singer taps write all logs to stderr for the discover command and the entire stdout is expected to be readable as the catalog. I think it's probably fine to expect that behavior for now and adapt if we run into a case that violates that expectation. I imagine for that case we should be doing custom work at that source level to handle it.


for stream in singer_catalog.get("streams"):
name = stream.get("stream")
schema = stream.get("schema").get("properties")

# todo: figure out how to serialize an object with an items key in python_jsonschema_objects
if name == "subscriptions":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean we are blotting out parts of the catalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated #530 to show what it needs to do to fix this. I've only seen this on Stripe so far but this is definitely something we need to fix before release. We can do it outside of this PR.

del schema["items"]

airbyte_streams += [AirbyteStream(name=name, schema=schema)]

airbyte_catalog = airbyte_transform(AirbyteCatalog(streams=airbyte_streams))

return Catalogs(singer_catalog=singer_catalog, airbyte_catalog=airbyte_catalog)

@staticmethod
def read(shell_command, is_message=(lambda x: True), transform=(lambda x: x)) -> Generator[AirbyteMessage, None, None]:
def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x: x)) -> Generator[AirbyteMessage, None, None]:
with subprocess.Popen(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1,
universal_newlines=True) as p:
for tuple in zip(p.stdout, p.stderr):
out_line = tuple[0]
err_line = tuple[1]
for line_tuple in zip(p.stdout, p.stderr):
out_line = line_tuple[0]
err_line = line_tuple[1]

if out_line:
out_json = to_json(out_line)
Expand All @@ -71,8 +84,6 @@ def read(shell_command, is_message=(lambda x: True), transform=(lambda x: x)) ->
out_message = AirbyteMessage(type="STATE", state=out_record)
yield transform(out_message)
else:
# todo: remove type from record
# todo: handle stream designation
# todo: check that messages match the discovered schema
stream_name = transformed_json["stream"]
out_record = AirbyteRecordMessage(
Expand All @@ -82,7 +93,36 @@ def read(shell_command, is_message=(lambda x: True), transform=(lambda x: x)) ->
out_message = AirbyteMessage(type="RECORD", record=out_record)
yield transform(out_message)
elif out_line:
log_line(out_line, "INFO")
logger(out_line, "INFO")

if err_line:
log_line(err_line, "ERROR")
logger(err_line, "ERROR")

@staticmethod
def combine_catalogs(masked_airbyte_catalog, discovered_singer_catalog) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the output here expected to be an object or a Singer or Airbytecatalog? if so can the type signature reflect that? Also, this is more of making selections on the discovered catalog rather than "combining" the two catalogs right? Can we rename the method to reflect that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can you add a unit test for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the rename here. I'll do the documentation piece and type handling separately.

combined_catalog_path = "/mount/rendered_catalog.json"
masked_singer_streams = []

stream_to_airbyte_schema = {}
for stream in masked_airbyte_catalog["streams"]:
stream_to_airbyte_schema[stream.get("name")] = stream

for singer_stream in discovered_singer_catalog.get("streams"):
if singer_stream.get("stream") in stream_to_airbyte_schema:
new_metadatas = []
metadatas = singer_stream.get("metadata")
for metadata in metadatas:
new_metadata = metadata
new_metadata["metadata"]["selected"] = True
if not is_field_metadata(new_metadata):
new_metadata["metadata"]["forced-replication-method"] = "FULL_TABLE"
new_metadatas += [new_metadata]
singer_stream["metadata"] = new_metadatas

masked_singer_streams += [singer_stream]

combined_catalog = {"streams": masked_singer_streams}
with open(combined_catalog_path, 'w') as fh:
fh.write(json.dumps(combined_catalog))

return combined_catalog_path
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from airbyte_protocol import Source
from airbyte_protocol import AirbyteSpec
from airbyte_protocol import AirbyteCheckResponse
from airbyte_protocol import AirbyteSchema
from airbyte_protocol import AirbyteCatalog
from airbyte_protocol import AirbyteMessage
import urllib.request
from typing import Generator
from base_singer import SingerHelper

from base_singer import Catalogs

class SourceExchangeRatesApiSinger(Source):
def __init__(self):
Expand All @@ -15,13 +15,17 @@ def __init__(self):
def spec(self) -> AirbyteSpec:
return SingerHelper.spec_from_file("/airbyte/exchangeratesapi-files/spec.json")

def check(self, config_object, rendered_config_path) -> AirbyteCheckResponse:
def check(self, logger, config_container) -> AirbyteCheckResponse:
code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode()
logger(f"Ping response code: {code}", "INFO")
return AirbyteCheckResponse(code == 200, {})

def discover(self, config_object, rendered_config_path) -> AirbyteSchema:
return SingerHelper.discover("tap-exchangeratesapi | grep '\"type\": \"SCHEMA\"' | head -1 | jq -c '{\"streams\":[{\"stream\": .stream, \"schema\": .schema}]}'")
def discover(self, logger, config_container) -> AirbyteCatalog:
catalogs = SingerHelper.get_catalogs(logger, "tap-exchangeratesapi | grep '\"type\": \"SCHEMA\"' | head -1 | jq -c '{\"streams\":[{\"stream\": .stream, \"schema\": .schema}]}'")
return catalogs.airbyte_catalog

# todo: handle state
def read(self, config_object, rendered_config_path, state=None) -> Generator[AirbyteMessage, None, None]:
return SingerHelper.read(f"tap-exchangeratesapi --config {rendered_config_path}")
def read(self, logger, config_container, catalog_path, state=None) -> Generator[AirbyteMessage, None, None]:
if state:
return SingerHelper.read(logger, f"tap-exchangeratesapi --config {config_container.rendered_config_path} --state {state}")
jrhizor marked this conversation as resolved.
Show resolved Hide resolved
else:
return SingerHelper.read(logger, f"tap-exchangeratesapi --config {config_container.rendered_config_path}")
Loading