Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

working python source #533

Merged
merged 34 commits into from
Oct 14, 2020
Merged

working python source #533

merged 34 commits into from
Oct 14, 2020

Conversation

jrhizor
Copy link
Contributor

@jrhizor jrhizor commented Oct 9, 2020

  • adds state handling
  • adds schema handling, including marking singer catalogs as selected based on the incoming airbyte catalog
  • logging
  • interface changes

recommended reading order:

  • base / helper files
  • source_stripe_py
  • rest

@jrhizor
Copy link
Contributor Author

jrhizor commented Oct 9, 2020

Working state example:

config.json:

{
  "base": "AUD",
  "start_date": "2020-10-02"
}

state.json:

{"start_date": "2020-10-09"}
→ docker run -v $(pwd):/mount --rm airbyte/source-exchangeratesapi-singer:dev read --config /mount/config.json --catalog /mount/catalog.json
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-02 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9530295627, "HKD": 5.5527730271, "ISK": 99.071585634, "PHP": 34.7324700709, "DKK": 4.5451991204, "HUF": 219.2035182018, "CZK": 16.5037869533, "GBP": 0.5538297093, "RON": 2.9755069631, "SEK": 6.3675787931, "IDR": 10655.5888101637, "INR": 52.5516125092, "BRL": 4.0336550208, "RUB": 56.2438920108, "HRK": 4.6188614708, "JPY": 75.3725873442, "THB": 22.5940630344, "CHF": 0.6590520401, "EUR": 0.610798925, "MYR": 2.9837527486, "BGN": 1.1946005375, "TRY": 5.5452602003, "CNY": 4.8653799169, "NOK": 6.6601514781, "NZD": 1.07952602, "ZAR": 11.8496213047, "USD": 0.716467139, "MXN": 15.6720009773, "SGD": 0.9765453213, "AUD": 1.0, "ILS": 2.4580991937, "KRW": 833.5450769607, "PLN": 2.7451746885, "date": "2020-10-02T00:00:00Z"}, "emitted_at": "2020-10-09 18:05:05.462186"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-03 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9532938519, "HKD": 5.5682886623, "ISK": 99.1513523414, "PHP": 34.7768484034, "DKK": 4.542401856, "HUF": 218.6763538678, "CZK": 16.5431345015, "GBP": 0.5544294523, "RON": 2.975761646, "SEK": 6.3889736858, "IDR": 10577.3307283717, "INR": 52.5593748092, "BRL": 4.0642285854, "RUB": 56.4963062458, "HRK": 4.6205507052, "JPY": 75.8593320716, "THB": 22.4958788693, "CHF": 0.6582208926, "EUR": 0.6105378839, "MYR": 2.9835154771, "BGN": 1.1940899933, "TRY": 5.5933817693, "CNY": 4.8921179559, "NOK": 6.6460101349, "NZD": 1.0808352158, "ZAR": 11.8106722022, "USD": 0.7184809817, "MXN": 15.4191953111, "SGD": 0.9772879907, "AUD": 1.0, "ILS": 2.4569876061, "KRW": 832.602722999, "PLN": 2.7461994017, "date": "2020-10-05T00:00:00Z"}, "emitted_at": "2020-10-09 18:05:05.765276"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-04 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9488829529, "HKD": 5.549538611, "ISK": 98.8343856241, "PHP": 34.6873482273, "DKK": 4.5172413793, "HUF": 218.3827100534, "CZK": 16.41270034, "GBP": 0.5528047596, "RON": 2.9601748422, "SEK": 6.3828314716, "IDR": 10551.195968917, "INR": 52.5567629917, "BRL": 3.9681277319, "RUB": 55.7139388052, "HRK": 4.5949490044, "JPY": 75.6313744536, "THB": 22.3379067508, "CHF": 0.6545046139, "EUR": 0.6070908208, "MYR": 2.9741986401, "BGN": 1.1873482273, "TRY": 5.5624696455, "CNY": 4.8634652744, "NOK": 6.593188441, "NZD": 1.0765541525, "ZAR": 11.8097377368, "USD": 0.7160636231, "MXN": 15.2762870325, "SGD": 0.9732880039, "AUD": 1.0, "ILS": 2.440080136, "KRW": 830.7977173385, "PLN": 2.724137931, "date": "2020-10-06T00:00:00Z"}, "emitted_at": "2020-10-09 18:05:05.874866"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-05 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9478787879, "HKD": 5.5284242424, "ISK": 98.6666666667, "PHP": 34.5327272727, "DKK": 4.5103636364, "HUF": 217.7272727273, "CZK": 16.4024242424, "GBP": 0.5540181818, "RON": 2.9544242424, "SEK": 6.3533333333, "IDR": 10520.9151515152, "INR": 52.2806060606, "BRL": 3.9665454545, "RUB": 55.7403030303, "HRK": 4.586969697, "JPY": 75.5939393939, "THB": 22.2775757576, "CHF": 0.6537575758, "EUR": 0.6060606061, "MYR": 2.9646666667, "BGN": 1.1853333333, "TRY": 5.6195151515, "CNY": 4.8444848485, "NOK": 6.625030303, "NZD": 1.0825454545, "ZAR": 11.8662424242, "USD": 0.7133333333, "MXN": 15.3096363636, "SGD": 0.9693939394, "AUD": 1.0, "ILS": 2.4269090909, "KRW": 825.8787878788, "PLN": 2.7189090909, "date": "2020-10-07T00:00:00Z"}, "emitted_at": "2020-10-09 18:05:05.990058"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-06 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9489677852, "HKD": 5.5527068997, "ISK": 99.1413434017, "PHP": 34.7140856221, "DKK": 4.532184398, "HUF": 217.6968515925, "CZK": 16.4983862128, "GBP": 0.5543815846, "RON": 2.9692466963, "SEK": 6.3618537239, "IDR": 10573.1685037452, "INR": 52.4852323245, "BRL": 4.0090128494, "RUB": 55.4579501857, "HRK": 4.6087327203, "JPY": 75.9332561963, "THB": 22.3640460386, "CHF": 0.6576335181, "EUR": 0.6089763108, "MYR": 2.9750928689, "BGN": 1.1910358687, "TRY": 5.6819925705, "CNY": 4.8653553377, "NOK": 6.6401558979, "NZD": 1.0873881006, "ZAR": 11.8747335729, "USD": 0.7164606297, "MXN": 15.3107606114, "SGD": 0.9730832471, "AUD": 1.0, "ILS": 2.4303635589, "KRW": 825.4552097923, "PLN": 2.7308324706, "date": "2020-10-08T00:00:00Z"}, "emitted_at": "2020-10-09 18:05:06.098758"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-07 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9473940872, "HKD": 5.5722035965, "ISK": 99.2380371838, "PHP": 34.7960987504, "DKK": 4.5365437367, "HUF": 217.1776897287, "CZK": 16.5254495581, "GBP": 0.5557269125, "RON": 2.9695214874, "SEK": 6.351722036, "IDR": 10569.954282231, "INR": 52.5440414508, "BRL": 4.0107284365, "RUB": 55.4351112466, "HRK": 4.6184090216, "JPY": 76.1658031088, "THB": 22.3279487961, "CHF": 0.6566900335, "EUR": 0.609570253, "MYR": 2.9751904907, "BGN": 1.1921975008, "TRY": 5.6860103627, "CNY": 4.8184699787, "NOK": 6.6213349589, "NZD": 1.0863151478, "ZAR": 11.8448643706, "USD": 0.7189881134, "MXN": 15.2900335264, "SGD": 0.9744590064, "AUD": 1.0, "ILS": 2.4296860713, "KRW": 823.2368180433, "PLN": 2.7249009448, "date": "2020-10-09T00:00:00Z"}, "emitted_at": "2020-10-09 18:05:06.199196"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-08 using base AUD"}}
{"type": "STATE", "state": {"data": {"start_date": "2020-10-09"}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-09 using base AUD"}}
→ docker run -v $(pwd):/mount --rm airbyte/source-exchangeratesapi-singer:dev read --config /mount/config.json --catalog /mount/catalog.json --state /mount/state.json
{"type": "LOG", "log": {"level": "INFO", "message": "Replicating exchange rate data from 2020-10-09 using base AUD"}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "data": {"CAD": 0.9473940872, "HKD": 5.5722035965, "ISK": 99.2380371838, "PHP": 34.7960987504, "DKK": 4.5365437367, "HUF": 217.1776897287, "CZK": 16.5254495581, "GBP": 0.5557269125, "RON": 2.9695214874, "SEK": 6.351722036, "IDR": 10569.954282231, "INR": 52.5440414508, "BRL": 4.0107284365, "RUB": 55.4351112466, "HRK": 4.6184090216, "JPY": 76.1658031088, "THB": 22.3279487961, "CHF": 0.6566900335, "EUR": 0.609570253, "MYR": 2.9751904907, "BGN": 1.1921975008, "TRY": 5.6860103627, "CNY": 4.8184699787, "NOK": 6.6213349589, "NZD": 1.0863151478, "ZAR": 11.8448643706, "USD": 0.7189881134, "MXN": 15.2900335264, "SGD": 0.9744590064, "AUD": 1.0, "ILS": 2.4296860713, "KRW": 823.2368180433, "PLN": 2.7249009448, "date": "2020-10-09T00:00:00Z"}, "emitted_at": "2020-10-09 18:04:57.870061"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Tap exiting normally"}}

@jrhizor jrhizor requested review from sherifnada, cgardens and michel-tricot and removed request for sherifnada October 14, 2020 15:20
@jrhizor jrhizor marked this pull request as ready for review October 14, 2020 15:20
@jrhizor
Copy link
Contributor Author

jrhizor commented Oct 14, 2020

making this available for review even though stripe tests need updating.

@jrhizor jrhizor changed the title state + schema handling in python source working python source Oct 14, 2020
print(log_message.serialize())

@dataclass
class ConfigContainer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to explain what these fields are.

sys.exit(0)
elif cmd == "read":
# todo: pass in state
generator = source.read(logging, rendered_config_path)
generator = source.read(log_line, config_container, parsed_args.catalog, parsed_args.state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have have a similar "container" for catalog and state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do this in a separate PR

airbyte-integrations/base/base.sh Outdated Show resolved Hide resolved
logger(line, "ERROR")

airbyte_streams = []
singer_catalog = singer_transform(json.loads(completed_process.stdout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only try to load the line if the line starts with { so we don't have the risk of reading some debug logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most singer taps write all logs to stderr for the discover command and the entire stdout is expected to be readable as the catalog. I think it's probably fine to expect that behavior for now and adapt if we run into a case that violates that expectation. I imagine for that case we should be doing custom work at that source level to handle it.

@jrhizor jrhizor force-pushed the jrhizor/py-schema-converter branch from 90110fb to 9342e08 Compare October 14, 2020 17:21
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a first pass on the python files, will do a second pass later. The usage is looking clean so far! Very exciting

@@ -76,10 +70,34 @@ def __init__(self):
pass

# Iterator<AirbyteMessage>
def read(self, config_object, rendered_config_path, state=None) -> Generator[AirbyteMessage, None, None]:
def read(self, logger, config_container, catalog_path, state=None) -> Generator[AirbyteMessage, None, None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will come in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is expected to be implemented by specific sources. There is no default implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, makes sense

@@ -1,8 +1,10 @@
from typing import Generator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are there two airbyte_protocol directories? It's not clear to me what the difference between the two is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like standard structure for python packages:

pkg
pkg/setup.py
pkg/pkg
pkg/pkg/__init__.py
pkg/pkg/*

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, thanks for bearing with my un-pythonic ways :P

schema = source.discover(logging, rendered_config_path)
print(schema.schema)
catalog = source.discover(log_line, config_container)
print(catalog.serialize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is print the standard way of logging in python or is there a "slf4j" equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to dictate the precise format of what we're outputting, we don't want an slf4j/logger equivalent.

It's debatable if we should use sys.stdout.write() or print, but print just seemed easier (we do want newlines after everthing, and we it's easy to use the string conversions in some cases).

schema = stream.get("schema").get("properties")

# todo: figure out how to serialize an object with an items key in python_jsonschema_objects
if name == "subscriptions":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean we are blotting out parts of the catalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated #530 to show what it needs to do to fix this. I've only seen this on Stripe so far but this is definitely something we need to fix before release. We can do it outside of this PR.

@staticmethod
def discover(shell_command, transform=(lambda x: AirbyteSchema(x))) -> AirbyteSchema:
def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog)) -> Catalogs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is singer_transform and airbyte_transform used for?


ok = True
while ok:
for key, val1 in sel.select():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does select break if the underlying stream has closed? From the select docs: If timeout is None, the call will block until a monitored file object becomes ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't break when the underlying process ends. The final iteration of this loop is when the file object p.stdout is already closed, and it exits gracefully by reaching the ok = False condition.


if err_line:
log_line(err_line, "ERROR")
def combine_catalogs(masked_airbyte_catalog, discovered_singer_catalog) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the output here expected to be an object or a Singer or Airbytecatalog? if so can the type signature reflect that? Also, this is more of making selections on the discovered catalog rather than "combining" the two catalogs right? Can we rename the method to reflect that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can you add a unit test for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the rename here. I'll do the documentation piece and type handling separately.

@jrhizor
Copy link
Contributor Author

jrhizor commented Oct 14, 2020

A few of these comments can be summarized as:

  • add type hints
  • add comments/docs
  • add unit tests

I'll address these in a separate PR after this is merged.

@jrhizor jrhizor merged commit 8d165a7 into master Oct 14, 2020
@jrhizor jrhizor deleted the jrhizor/py-schema-converter branch October 14, 2020 19:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants