Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds registries and plugins #1894

Merged
merged 27 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c45197a
adds sources registry and factory, allows for late config binding and…
rudolfix Sep 28, 2024
9865fa8
converts rest_api to a standard source
rudolfix Sep 28, 2024
bc5bee2
marks secret values with Annotated, allows regular types to be used i…
rudolfix Sep 28, 2024
d85bf75
reduces the number of modules imported on initial dlt import
rudolfix Sep 29, 2024
cb6c355
Merge branch 'devel' into feat/adds-registries-and-plugins
rudolfix Oct 1, 2024
d493258
removes resource rename via AST in dlt init, provides new templates
rudolfix Oct 2, 2024
863a816
replaces hardcoded paths to settings and data with pluggable run context
rudolfix Oct 2, 2024
336ad35
fixes init command tests
rudolfix Oct 2, 2024
602354b
adds plugin system and example plugin tests
rudolfix Oct 2, 2024
6efb523
uses run context to load secrets / configs
rudolfix Oct 2, 2024
a06a7eb
adds run context name to source reference and uses it to resolve
rudolfix Oct 2, 2024
41bc22f
fixes module name and wrong SPEC for single resource sources when reg…
rudolfix Oct 2, 2024
f2aa71c
adds pluggy
rudolfix Oct 2, 2024
3777963
adds methods to get location of entities to run context
rudolfix Oct 3, 2024
bec141c
fixes toml provider to write toml objects, fixes toml writing to not …
rudolfix Oct 3, 2024
3882310
simplifies init command, makes sure it creates files according to run…
rudolfix Oct 3, 2024
b52df7e
fixes dbt test venv, prepares to use uv
rudolfix Oct 4, 2024
7bfbb15
adds SPEC for callable resources
rudolfix Oct 4, 2024
8642f97
fixes wrong SPEC passed to single resource source
rudolfix Oct 4, 2024
6bec9b5
allows mock run context to read from env
rudolfix Oct 4, 2024
46ef5c6
fixes oauth2 auth dataclass
rudolfix Oct 6, 2024
98e108d
fixes secrets masking for shorthand auth
rudolfix Oct 6, 2024
3d1111c
adds rest_api auth secret config injections tests, fixes some others
rudolfix Oct 6, 2024
e3aad5b
fixes docstrings
rudolfix Oct 6, 2024
2ecb177
allows source references to python modules out of registry
rudolfix Oct 7, 2024
e7c527b
Merge branch 'devel' into feat/adds-registries-and-plugins
rudolfix Oct 9, 2024
6a216b7
fixes lock
rudolfix Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from dlt.version import __version__
from dlt.common.configuration.accessors import config, secrets
from dlt.common.typing import TSecretValue as _TSecretValue
from dlt.common.typing import TSecretValue as _TSecretValue, TSecretStrValue as _TSecretStrValue
from dlt.common.configuration.specs import CredentialsConfiguration as _CredentialsConfiguration
from dlt.common.pipeline import source_state as state
from dlt.common.schema import Schema
Expand Down Expand Up @@ -50,10 +50,12 @@
TSecretValue = _TSecretValue
"When typing source/resource function arguments it indicates that a given argument is a secret and should be taken from dlt.secrets."

TSecretStrValue = _TSecretStrValue
"When typing source/resource function arguments it indicates that a given argument is a secret STRING and should be taken from dlt.secrets."

TCredentials = _CredentialsConfiguration
"When typing source/resource function arguments it indicates that a given argument represents credentials and should be taken from dlt.secrets. Credentials may be a string, dictionary or any other type."


__all__ = [
"__version__",
"config",
Expand All @@ -78,3 +80,12 @@
"sources",
"destinations",
]

# verify that no injection context was created
from dlt.common.configuration.container import Container as _Container

assert (
_Container._INSTANCE is None
), "Injection container should not be initialized during initial import"
# create injection container
_Container()
1 change: 1 addition & 0 deletions dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def write_spec(toml_table: TOMLTable, config: BaseConfiguration, overwrite_exist
def write_values(
toml: TOMLContainer, values: Iterable[WritableConfigValue], overwrite_existing: bool
) -> None:
# TODO: decouple writers from a particular object model ie. TOML
for value in values:
toml_table: TOMLTable = toml # type: ignore
for section in value.sections:
Expand Down
5 changes: 2 additions & 3 deletions dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from importlib.metadata import version as pkg_version

from dlt.common.configuration.providers import SECRETS_TOML, SECRETS_TOML_KEY
from dlt.common.configuration.paths import make_dlt_settings_path
from dlt.common.configuration.utils import serialize_value
from dlt.common.git import is_dirty

Expand Down Expand Up @@ -210,7 +209,7 @@ def _echo_instructions(self, *args: Optional[Any]) -> None:
fmt.echo(
"1. Add the following secret values (typically stored in %s): \n%s\nin %s"
% (
fmt.bold(make_dlt_settings_path(SECRETS_TOML)),
fmt.bold(utils.make_dlt_settings_path(SECRETS_TOML)),
fmt.bold(
"\n".join(
self.env_prov.get_key_name(s_v.key, *s_v.sections)
Expand Down Expand Up @@ -368,7 +367,7 @@ def _echo_instructions(self, *args: Optional[Any]) -> None:
"3. Add the following secret values (typically stored in %s): \n%s\n%s\nin"
" ENVIRONMENT VARIABLES using Google Composer UI"
% (
fmt.bold(make_dlt_settings_path(SECRETS_TOML)),
fmt.bold(utils.make_dlt_settings_path(SECRETS_TOML)),
fmt.bold(
"\n".join(
self.env_prov.get_key_name(s_v.key, *s_v.sections)
Expand Down
9 changes: 6 additions & 3 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

from dlt.common import git
from dlt.common.configuration.exceptions import LookupTrace, ConfigFieldMissingException
from dlt.common.configuration.providers import ConfigTomlProvider, EnvironProvider
from dlt.common.configuration.providers.toml import BaseDocProvider, StringTomlProvider
from dlt.common.configuration.providers import (
ConfigTomlProvider,
EnvironProvider,
StringTomlProvider,
)
from dlt.common.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.run_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
Expand Down Expand Up @@ -242,7 +245,7 @@ def _display_missing_secret_info(self) -> None:
)

def _lookup_secret_value(self, trace: LookupTrace) -> Any:
return dlt.secrets[BaseDocProvider.get_key_name(trace.key, *trace.sections)]
return dlt.secrets[StringTomlProvider.get_key_name(trace.key, *trace.sections)]

def _echo_envs(self) -> None:
for v in self.envs:
Expand Down
143 changes: 69 additions & 74 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
import ast
import shutil
import tomlkit
from types import ModuleType
from typing import Dict, List, Sequence, Tuple
from importlib.metadata import version as pkg_version
from typing import Dict, Sequence, Tuple
from pathlib import Path
from importlib import import_module

from dlt.common import git
from dlt.common.configuration.paths import get_dlt_settings_dir, make_dlt_settings_path
from dlt.common.configuration.specs import known_sections
from dlt.common.configuration.providers import (
CONFIG_TOML,
Expand All @@ -18,28 +14,29 @@
SecretsTomlProvider,
)
from dlt.common.pipeline import get_dlt_repos_dir
from dlt.common.source import _SOURCES
from dlt.version import DLT_PKG_NAME, __version__
from dlt.common.destination import Destination
from dlt.common.reflection.utils import rewrite_python_script
from dlt.common.runtime import run_context
from dlt.common.schema.utils import is_valid_schema_name
from dlt.common.schema.exceptions import InvalidSchemaName
from dlt.common.storages.file_storage import FileStorage
from dlt.sources import pipeline_templates as init_module

from dlt.sources import SourceReference

import dlt.reflection.names as n
from dlt.reflection.script_inspector import inspect_pipeline_script, load_script_module
from dlt.reflection.script_inspector import inspect_pipeline_script

from dlt.cli import echo as fmt, pipeline_files as files_ops, source_detection
from dlt.cli import utils
from dlt.cli.config_toml_writer import WritableConfigValue, write_values
from dlt.cli.pipeline_files import (
TEMPLATE_FILES,
SourceConfiguration,
TVerifiedSourceFileEntry,
TVerifiedSourceFileIndex,
)
from dlt.cli.exceptions import CliCommandException
from dlt.cli.requirements import SourceRequirements


DLT_INIT_DOCS_URL = "https://dlthub.com/docs/reference/command-line-interface#dlt-init"
Expand Down Expand Up @@ -213,7 +210,7 @@ def _welcome_message(
if is_new_source:
fmt.echo(
"* Add credentials for %s and other secrets in %s"
% (fmt.bold(destination_type), fmt.bold(make_dlt_settings_path(SECRETS_TOML)))
% (fmt.bold(destination_type), fmt.bold(utils.make_dlt_settings_path(SECRETS_TOML)))
)

if destination_type == "destination":
Expand Down Expand Up @@ -308,6 +305,9 @@ def init_command(
core_sources_storage = _get_core_sources_storage()
templates_storage = _get_templates_storage()

# get current run context
run_ctx = run_context.current()

# discover type of source
source_type: files_ops.TSourceType = "template"
if (
Expand All @@ -324,9 +324,9 @@ def init_command(
source_type = "verified"

# prepare destination storage
dest_storage = FileStorage(os.path.abspath("."))
if not dest_storage.has_folder(get_dlt_settings_dir()):
dest_storage.create_folder(get_dlt_settings_dir())
dest_storage = FileStorage(run_ctx.run_dir)
if not dest_storage.has_folder(run_ctx.settings_dir):
dest_storage.create_folder(run_ctx.settings_dir)
# get local index of verified source files
local_index = files_ops.load_verified_sources_local_index(source_name)
# folder deleted at dest - full refresh
Expand Down Expand Up @@ -376,8 +376,6 @@ def init_command(
f"The verified sources repository is dirty. {source_name} source files may not"
" update correctly in the future."
)
# add template files
source_configuration.files.extend(files_ops.TEMPLATE_FILES)

else:
if source_type == "core":
Expand All @@ -399,9 +397,9 @@ def init_command(
return

# add .dlt/*.toml files to be copied
source_configuration.files.extend(
[make_dlt_settings_path(CONFIG_TOML), make_dlt_settings_path(SECRETS_TOML)]
)
# source_configuration.files.extend(
# [run_ctx.get_setting(CONFIG_TOML), run_ctx.get_setting(SECRETS_TOML)]
# )

# add dlt extras line to requirements
source_configuration.requirements.update_dlt_extras(destination_type)
Expand Down Expand Up @@ -449,8 +447,6 @@ def init_command(
visitor,
[
("destination", destination_type),
("pipeline_name", source_name),
("dataset_name", source_name + "_data"),
],
source_configuration.src_pipeline_script,
)
Expand All @@ -465,54 +461,48 @@ def init_command(
# detect all the required secrets and configs that should go into tomls files
if source_configuration.source_type == "template":
# replace destination, pipeline_name and dataset_name in templates
transformed_nodes = source_detection.find_call_arguments_to_replace(
visitor,
[
("destination", destination_type),
("pipeline_name", source_name),
("dataset_name", source_name + "_data"),
],
source_configuration.src_pipeline_script,
)
# transformed_nodes = source_detection.find_call_arguments_to_replace(
# visitor,
# [
# ("destination", destination_type),
# ("pipeline_name", source_name),
# ("dataset_name", source_name + "_data"),
# ],
# source_configuration.src_pipeline_script,
# )
# template sources are always in module starting with "pipeline"
# for templates, place config and secrets into top level section
required_secrets, required_config, checked_sources = source_detection.detect_source_configs(
_SOURCES, source_configuration.source_module_prefix, ()
SourceReference.SOURCES, source_configuration.source_module_prefix, ()
)
# template has a strict rules where sources are placed
for source_q_name, source_config in checked_sources.items():
if source_q_name not in visitor.known_sources_resources:
raise CliCommandException(
"init",
f"The pipeline script {source_configuration.src_pipeline_script} imports a"
f" source/resource {source_config.f.__name__} from module"
f" {source_config.module.__name__}. In init scripts you must declare all"
" sources and resources in single file.",
)
# for source_q_name, source_config in checked_sources.items():
# if source_q_name not in visitor.known_sources_resources:
# raise CliCommandException(
# "init",
# f"The pipeline script {source_configuration.src_pipeline_script} imports a"
# f" source/resource {source_config.name} from section"
# f" {source_config.section}. In init scripts you must declare all"
# f" sources and resources in single file. Known names are {list(visitor.known_sources_resources.keys())}.",
# )
# rename sources and resources
transformed_nodes.extend(
source_detection.find_source_calls_to_replace(visitor, source_name)
)
# transformed_nodes.extend(
# source_detection.find_source_calls_to_replace(visitor, source_name)
# )
else:
# replace only destination for existing pipelines
transformed_nodes = source_detection.find_call_arguments_to_replace(
visitor, [("destination", destination_type)], source_configuration.src_pipeline_script
)
# pipeline sources are in module with name starting from {pipeline_name}
# for verified pipelines place in the specific source section
required_secrets, required_config, checked_sources = source_detection.detect_source_configs(
_SOURCES,
SourceReference.SOURCES,
source_configuration.source_module_prefix,
(known_sections.SOURCES, source_name),
)

# the intro template does not use sources, for now allow it to pass here
if len(checked_sources) == 0 and source_name != "intro":
raise CliCommandException(
"init",
f"The pipeline script {source_configuration.src_pipeline_script} is not creating or"
" importing any sources or resources. Exiting...",
)
if len(checked_sources) == 0:
raise CliCommandException(
"init",
f"The pipeline script {source_configuration.src_pipeline_script} is not creating or"
" importing any sources or resources. Exiting...",
)

# add destination spec to required secrets
required_secrets["destinations:" + destination_type] = WritableConfigValue(
Expand Down Expand Up @@ -570,23 +560,32 @@ def init_command(
)

# copy files at the very end
for file_name in source_configuration.files:
copy_files = []
# copy template files
for file_name in TEMPLATE_FILES:
dest_path = dest_storage.make_full_path(file_name)
# get files from init section first
if templates_storage.has_file(file_name):
if dest_storage.has_file(dest_path):
# do not overwrite any init files
continue
src_path = templates_storage.make_full_path(file_name)
else:
# only those that were modified should be copied from verified sources
if file_name in remote_modified:
src_path = source_configuration.storage.make_full_path(file_name)
else:
continue
copy_files.append((templates_storage.make_full_path(file_name), dest_path))

# only those that were modified should be copied from verified sources
for file_name in remote_modified:
copy_files.append(
(
source_configuration.storage.make_full_path(file_name),
# copy into where "sources" reside in run context, being root dir by default
dest_storage.make_full_path(
os.path.join(run_ctx.get_run_entity("sources"), file_name)
),
)
)

# modify storage at the end
for src_path, dest_path in copy_files:
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
shutil.copy2(src_path, dest_path)

if remote_index:
# delete files
for file_name in remote_deleted:
Expand All @@ -600,15 +599,11 @@ def init_command(
dest_storage.save(source_configuration.dest_pipeline_script, dest_script_source)

# generate tomls with comments
secrets_prov = SecretsTomlProvider()
secrets_toml = tomlkit.document()
write_values(secrets_toml, required_secrets.values(), overwrite_existing=False)
secrets_prov._config_doc = secrets_toml

config_prov = ConfigTomlProvider()
config_toml = tomlkit.document()
write_values(config_toml, required_config.values(), overwrite_existing=False)
config_prov._config_doc = config_toml
secrets_prov = SecretsTomlProvider(settings_dir=run_ctx.settings_dir)
write_values(secrets_prov._config_toml, required_secrets.values(), overwrite_existing=False)

config_prov = ConfigTomlProvider(settings_dir=run_ctx.settings_dir)
write_values(config_prov._config_toml, required_config.values(), overwrite_existing=False)

# write toml files
secrets_prov.write_toml()
Expand Down
Loading
Loading