Skip to content

Commit

Permalink
Merge pull request #493 from splitgraph/feature/airbyte-data-source-C…
Browse files Browse the repository at this point in the history
…U-15xp9xt

Airbyte data source [CU-15xp9xt]
  • Loading branch information
mildbyte authored Jul 21, 2021
2 parents 7194827 + 8e5c387 commit 1affeba
Show file tree
Hide file tree
Showing 25 changed files with 1,794 additions and 182 deletions.
179 changes: 116 additions & 63 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pyyaml = ">=5.1"
jsonschema = ">=3.1.0"
cryptography = ">=3.4.0"
pydantic = ">=1.8.1"
chardet = "^4.0.0"

# Socrata dataset mounting.
# This could be optional but it's very lightweight (only requires requests).
Expand Down
45 changes: 2 additions & 43 deletions splitgraph/commandline/engine.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import logging
import os
import platform
import time
from io import BytesIO
from pathlib import Path, PureWindowsPath
from tarfile import TarFile, TarInfo
from typing import Dict, TYPE_CHECKING
from urllib.parse import urlparse

Expand All @@ -14,53 +11,15 @@
from splitgraph.__version__ import __version__
from splitgraph.config import CONFIG, SG_CMD_ASCII
from splitgraph.exceptions import DockerUnavailableError, EngineSetupError
from splitgraph.utils.docker import get_docker_client, copy_to_container

if TYPE_CHECKING:
from docker.models.containers import Container
pass


DEFAULT_ENGINE = "default"


def get_docker_client():
"""Wrapper around client.from_env() that also pings the daemon
to make sure it can connect and if not, raises an error."""
import docker

try:
client = docker.from_env()
client.ping()
return client
except Exception as e:
raise DockerUnavailableError("Could not connect to the Docker daemon") from e


def copy_to_container(container: "Container", source_path: str, target_path: str) -> None:
"""
Copy a file into a Docker container
:param container: Container object
:param source_path: Source file path
:param target_path: Target file path (in the container)
:return:
"""
# https://github.com/docker/docker-py/issues/1771
with open(source_path, "rb") as f:
data = f.read()

tarinfo = TarInfo(name=os.path.basename(target_path))
tarinfo.size = len(data)
tarinfo.mtime = int(time.time())

stream = BytesIO()
tar = TarFile(fileobj=stream, mode="w")
tar.addfile(tarinfo, BytesIO(data))
tar.close()

stream.seek(0)
container.put_archive(path=os.path.dirname(target_path), data=stream.read())


def patch_and_save_config(config, patch):
from splitgraph.config.config import patch_config
from splitgraph.config.system_config import HOME_SUB_DIR
Expand Down
4 changes: 2 additions & 2 deletions splitgraph/commandline/image_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import click

from splitgraph.commandline.common import ImageType, RepositoryType, JsonType, remote_switch_option
from splitgraph.config import get_singleton, CONFIG
from splitgraph.config import DEFAULT_CHUNK_SIZE
from splitgraph.exceptions import TableNotFoundError


Expand Down Expand Up @@ -72,7 +72,7 @@ def checkout_c(image_spec, force, uncheckout, layered):
@click.option(
"-c",
"--chunk-size",
default=int(get_singleton(CONFIG, "SG_COMMIT_CHUNK_SIZE")),
default=DEFAULT_CHUNK_SIZE,
type=int,
help="Split new tables into chunks of this many rows (by primary key). The default "
"value is governed by the SG_COMMIT_CHUNK_SIZE configuration parameter.",
Expand Down
2 changes: 2 additions & 0 deletions splitgraph/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

SG_CMD_ASCII = get_singleton(CONFIG, "SG_CMD_ASCII") == "true"

DEFAULT_CHUNK_SIZE = int(get_singleton(CONFIG, "SG_COMMIT_CHUNK_SIZE"))

REMOTES = list(CONFIG.get("remotes", []))

# This is a global variable that gets flipped to True by the Multicorn FDW class
Expand Down
14 changes: 10 additions & 4 deletions splitgraph/core/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,17 @@ def checkout(self, force: bool = False, layered: bool = False) -> None:
self.object_engine.delete_table(target_schema, table)

if layered:
self._lq_checkout()
self.lq_checkout()
else:
for table in self.get_tables():
self.get_table(table).materialize(table)
set_head(self.repository, self.image_hash)

def _lq_checkout(
self, target_schema: Optional[str] = None, wrapper: Optional[str] = FDW_CLASS
def lq_checkout(
self,
target_schema: Optional[str] = None,
wrapper: Optional[str] = FDW_CLASS,
only_tables: Optional[List[str]] = None,
) -> None:
"""
Intended to be run on the sgr side. Initializes the FDW for all tables in a given image,
Expand Down Expand Up @@ -198,6 +201,9 @@ def _lq_checkout(

# It's easier to create the foreign tables from our side than to implement IMPORT FOREIGN SCHEMA by the FDW
for table_name in self.get_tables():
if only_tables and table_name not in only_tables:
continue

logging.debug(
"Mounting %s:%s/%s into %s",
self.repository.to_schema(),
Expand All @@ -220,7 +226,7 @@ def query_schema(
tmp_schema = str.format("o{:032x}", getrandbits(128))
try:
self.object_engine.create_schema(tmp_schema)
self._lq_checkout(target_schema=tmp_schema, wrapper=wrapper)
self.lq_checkout(target_schema=tmp_schema, wrapper=wrapper)
if commit:
self.object_engine.commit() # Make sure the new tables are seen by other connections

Expand Down
10 changes: 8 additions & 2 deletions splitgraph/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ def unwrap(
return good, bad


def get_table_params(table_info: TableInfo, table_name: str) -> TableParams:
if isinstance(table_info, dict) and table_name in table_info:
return table_info[table_name][1]
return TableParams({})


class Comparable(metaclass=ABCMeta):
@abstractmethod
def __lt__(self, other: Any) -> bool:
Expand All @@ -91,11 +97,11 @@ def dict_to_table_schema_params(

def table_schema_params_to_dict(
tables: Dict[str, Tuple[TableSchema, TableParams]]
) -> Dict[str, Dict[str, Dict[str, str]]]:
) -> Dict[str, Dict[str, Dict[str, Any]]]:
return {
t: {
"schema": {c.name: c.pg_type for c in ts},
"options": {tpk: str(tpv) for tpk, tpv in tp.items()},
"options": tp,
}
for t, (ts, tp) in tables.items()
}
20 changes: 14 additions & 6 deletions splitgraph/hooks/data_source/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from psycopg2._json import Json
from psycopg2.sql import SQL, Identifier

from splitgraph.config import DEFAULT_CHUNK_SIZE
from splitgraph.core.engine import repository_exists
from splitgraph.core.image import Image
from splitgraph.core.types import (
Expand Down Expand Up @@ -68,11 +69,16 @@ def __init__(
self.credentials = credentials
self.params = params

self._validate_table_params(tables)
self.tables = tables

@classmethod
def _validate_table_params(cls, tables: Optional[TableInfo]) -> None:
import jsonschema

if isinstance(tables, dict):
for _, table_params in tables.values():
jsonschema.validate(instance=table_params, schema=self.table_params_schema)

self.tables = tables
jsonschema.validate(instance=table_params, schema=cls.table_params_schema)

@abstractmethod
def introspect(self) -> IntrospectionResult:
Expand Down Expand Up @@ -114,6 +120,7 @@ def _load(self, schema: str, tables: Optional[TableInfo] = None):
raise NotImplementedError

def load(self, repository: "Repository", tables: Optional[TableInfo] = None) -> str:
self._validate_table_params(tables)
if not repository_exists(repository):
repository.init()

Expand All @@ -132,7 +139,7 @@ def load(self, repository: "Repository", tables: Optional[TableInfo] = None) ->
head=None,
image_hash=image_hash,
snap_only=True,
chunk_size=100000,
chunk_size=DEFAULT_CHUNK_SIZE,
schema=tmp_schema,
)
finally:
Expand All @@ -159,6 +166,7 @@ def sync(
image_hash: Optional[str],
tables: Optional[TableInfo] = None,
) -> str:
self._validate_table_params(tables)
if not repository_exists(repository):
repository.init()

Expand Down Expand Up @@ -213,7 +221,7 @@ def get_ingestion_state(repository: "Repository", image_hash: Optional[str]) ->


def prepare_new_image(
repository: "Repository", hash_or_tag: Optional[str]
repository: "Repository", hash_or_tag: Optional[str], comment: str = "Singer tap ingestion"
) -> Tuple[Optional[Image], str]:
new_image_hash = "{:064x}".format(getrandbits(256))
if repository_exists(repository):
Expand All @@ -235,5 +243,5 @@ def prepare_new_image(
)
else:
base_image = None
repository.images.add(parent_id=None, image=new_image_hash, comment="Singer tap ingestion")
repository.images.add(parent_id=None, image=new_image_hash, comment=comment)
return base_image, new_image_hash
1 change: 1 addition & 0 deletions splitgraph/hooks/data_source/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def mount(
tables: Optional[TableInfo] = None,
overwrite: bool = True,
) -> Optional[List[MountError]]:
self._validate_table_params(tables)
tables = tables or self.tables or []

fdw = self.get_fdw_name()
Expand Down
Empty file.
Loading

0 comments on commit 1affeba

Please sign in to comment.