Skip to content

Commit

Permalink
reformat [INTERNAL_BRANCH=sean/ruff]
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Dec 12, 2022
1 parent 876d005 commit f2b8d88
Show file tree
Hide file tree
Showing 55 changed files with 209 additions and 78 deletions.
4 changes: 2 additions & 2 deletions docs/content/concepts/assets/asset-materializations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ There are a variety of types of metadata that can be associated with a materiali
#### Example: Op body

```python file=concepts/assets/materialization_ops.py startafter=start_materialization_ops_marker_2 endbefore=end_materialization_ops_marker_2
from dagster import op, AssetMaterialization, MetadataValue
from dagster import AssetMaterialization, MetadataValue, op


@op
Expand Down Expand Up @@ -173,7 +173,7 @@ Check our API docs for <PyObject module="dagster" object="MetadataEntry" /> for
If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object.

```python file=/concepts/assets/materialization_ops.py startafter=start_partitioned_asset_materialization endbefore=end_partitioned_asset_materialization
from dagster import op, AssetMaterialization
from dagster import AssetMaterialization, op


@op(config_schema={"date": str})
Expand Down
4 changes: 2 additions & 2 deletions docs/content/concepts/assets/asset-observations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ height={917}
There are a variety of types of metadata that can be associated with an observation event, all through the <PyObject object="MetadataEntry" /> class. Each observation event optionally takes a dictionary of metadata entries that are then displayed in the event log and the [Asset Details](/concepts/dagit/dagit#asset-details) page. Check our API docs for <PyObject object="MetadataEntry" /> for more details on the types of event metadata available.

```python file=concepts/assets/observations.py startafter=start_observation_asset_marker_2 endbefore=end_observation_asset_marker_2
from dagster import op, AssetObservation, MetadataValue
from dagster import AssetObservation, MetadataValue, op


@op
Expand Down Expand Up @@ -93,7 +93,7 @@ height={1146}
If you are observing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object.

```python file=/concepts/assets/observations.py startafter=start_partitioned_asset_observation endbefore=end_partitioned_asset_observation
from dagster import op, AssetMaterialization
from dagster import AssetMaterialization, op


@op(config_schema={"date": str})
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def nabisco_cereals():
**This recommended approach** constructs a group of assets from a specified module in your project. Using the `load_assets_from_package_module` function, you can import all assets in a module and apply a grouping:

```python file=/concepts/assets/asset_group_module.py startafter=start_example endbefore=end_example
import my_package.cereal as cereal
from my_package import cereal

cereal_assets = load_assets_from_package_module(
cereal,
Expand Down
6 changes: 2 additions & 4 deletions docs/content/concepts/dagit/graphql-client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ You can use the client to get the status of a job run as follows:

```python file=/concepts/dagit/graphql/client_example.py startafter=start_run_status_marker endbefore=end_run_status_marker
from dagster_graphql import DagsterGraphQLClientError

from dagster import DagsterRunStatus

try:
Expand All @@ -83,10 +84,7 @@ You can also reload a repository location in a Dagster deployment.
This reloads all repositories in that repository location. This is useful in a variety of contexts, including refreshing Dagit without restarting the server. Example usage is as follows:

```python file=/concepts/dagit/graphql/client_example.py startafter=start_reload_repo_location_marker endbefore=end_reload_repo_location_marker
from dagster_graphql import (
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
)
from dagster_graphql import ReloadRepositoryLocationInfo, ReloadRepositoryLocationStatus

reload_info: ReloadRepositoryLocationInfo = client.reload_repository_location(REPO_NAME)
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/logging/loggers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ Default loggers can be specified on a <PyObject object="repository" decorator />
Note that if you explicitly specify loggers on a job, they will override those provided to `default_logger_defs`.

```python file=/concepts/logging/custom_logger.py startafter=start_default_logger_repo endbefore=end_default_logger_repo
from dagster import repository, define_asset_job, asset
from dagster import asset, define_asset_job, repository
@asset
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/ops-jobs-graphs/op-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ To learn more about assets and how they are surfaced once you send this event, c
Attaching metadata to Asset Materializations is an important way of tracking aspects of a given asset over time. This functions essentially identically to other events which accept a `metadata` parameter, allowing you to attach a set of structured labels and values to display.

```python file=concepts/assets/materialization_ops.py startafter=start_materialization_ops_marker_2 endbefore=end_materialization_ops_marker_2
from dagster import op, AssetMaterialization, MetadataValue
from dagster import AssetMaterialization, MetadataValue, op


@op
Expand Down
12 changes: 11 additions & 1 deletion docs/content/concepts/ops-jobs-graphs/op-hooks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@ A <PyObject module="dagster" object="success_hook" displayText="@success_hook" /
## Defining a Op Hook

```python file=/concepts/ops_jobs_graphs/op_hooks.py startafter=start_repo_marker_0 endbefore=end_repo_marker_0
from dagster import HookContext, failure_hook, success_hook
from dagster import (
HookContext,
ResourceDefinition,
failure_hook,
file_relative_path,
graph,
job,
op,
repository,
success_hook,
)


@success_hook(required_resource_keys={"slack"})
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/testing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_data_without_resource(context):
Dagster allows you to define multiple "jobs" from the same computation graph. With resources, you can modify the op above to:

```python file=/concepts/resources/tests.py startafter=start_test_after_marker endbefore=end_test_after_marker
from dagster import op, graph
from dagster import graph, op


@op(required_resource_keys={"api"})
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/types.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Dagster offers a special type called <PyObject object="Nothing" />, which is use
You can use <PyObject module="dagster" object="check_dagster_type" /> to test the type check function of a custom Dagster Type:

```python file=/concepts/types/types.py startafter=start_test_dagster_type endbefore=end_test_dagster_type
from dagster import check_dagster_type, Dict, Any
from dagster import Any, Dict, check_dagster_type


def test_dagster_type():
Expand Down
4 changes: 2 additions & 2 deletions docs/content/deployment/executors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Every job has an executor. The default executor is the <PyObject object="multi_o
An executor can be specified directly on the job by supplying an <PyObject object="ExecutorDefinition" /> to the `executor_def` parameter of <PyObject object="job" decorator /> or <PyObject object="GraphDefinition" method="to_job" />.

```python file=/deploying/executors/executors.py startafter=start_executor_on_job endbefore=end_executor_on_job
from dagster import multiprocess_executor, job, graph
from dagster import graph, job, multiprocess_executor

# Providing an executor using the job decorator
@job(executor_def=multiprocess_executor)
Expand All @@ -43,7 +43,7 @@ other_job = the_graph.to_job(executor_def=multiprocess_executor)
A default executor can be specified for all jobs and assets provided to a repository using the `default_executor_def` argument of <PyObject object="repository" decorator />. All jobs that don't specify an executor will use this default executor, but if a job explicitly specifies an executor, then the default provided to the repository will not be used.

```python file=/deploying/executors/executors.py startafter=start_executor_on_repo endbefore=end_executor_on_repo
from dagster import multiprocess_executor, define_asset_job, asset, repository
from dagster import asset, define_asset_job, multiprocess_executor, repository


@asset
Expand Down
2 changes: 1 addition & 1 deletion docs/content/guides/dagster/branch_deployments.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def repo():

def get_current_env():
is_branch_depl = os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1"
assert is_branch_depl != None # env var must be set
assert is_branch_depl is not None # env var must be set
return "branch" if is_branch_depl else "prod"

return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,14 @@ from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

@op
def build_users():
raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
users_df.to_sql(name="users", con=create_db_connection())


@op(ins={"users": In(Nothing)})
def build_user_recommender_model():
users_df = read_sql(f"select * from users", con=create_db_connection())
users_df = read_sql("select * from users", con=create_db_connection())
users_recommender_model = train_recommender_model(users_df)
pickle_to_s3(users_recommender_model, key="users_recommender_model")

Expand Down Expand Up @@ -241,14 +241,14 @@ from .mylib import create_db_connection, pickle_to_s3, train_recommender_model

@asset(non_argument_deps={"raw_users"})
def users():
raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
users_df.to_sql(name="users", con=create_db_connection())


@asset(non_argument_deps={"users"})
def user_recommender_model():
users_df = read_sql(f"select * from users", con=create_db_connection())
users_df = read_sql("select * from users", con=create_db_connection())
users_recommender_model = train_recommender_model(users_df)
pickle_to_s3(users_recommender_model, key="users_recommender_model")

Expand Down
3 changes: 2 additions & 1 deletion docs/content/guides/dagster/re-execution.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ Re-execution can be triggered via the API as well.
Again, let's revist the job `unreliable_job`, which has a op named `unreliable`.

```python file=/guides/dagster/reexecution/reexecution_api.py startafter=start_initial_execution_marker endbefore=end_initial_execution_marker
from dagster import DagsterInstance, ReexecutionOptions, execute_job, reconstructable
from docs_snippets.guides.dagster.reexecution.unreliable_job import unreliable_job

from dagster import DagsterInstance, ReexecutionOptions, execute_job, reconstructable

instance = DagsterInstance.ephemeral()

# Initial execution
Expand Down
14 changes: 9 additions & 5 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ It's common to use a utility like <PyObject object="load_assets_from_modules" />
```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/weather_assets.py startafter=gather_assets_start endbefore=gather_assets_end
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
from . import table_assets
from dagster import load_assets_from_modules, with_resources

from . import table_assets

weather_assets = with_resources(
load_assets_from_modules(modules=[table_assets]),
resource_defs={
Expand Down Expand Up @@ -109,9 +110,11 @@ class LocalFileSystemIOManager(IOManager):
Not all the assets in the same dependency graph need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the `daily_temperature_highs` asset we defined above using Pandas.

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/spark_asset.py
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import Window
from pyspark.sql import functions as f
from pyspark.sql import (
DataFrame as SparkDF,
Window,
functions as f,
)

from dagster import asset

Expand All @@ -132,9 +135,10 @@ def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:
Here's an extended version of `weather_assets` that contains the new asset:

```python file=../../assets_pandas_pyspark/assets_pandas_pyspark/assets/spark_weather_assets.py startafter=gather_assets_start endbefore=gather_assets_end
from . import table_assets, spark_asset
from dagster import load_assets_from_modules, with_resources

from . import spark_asset, table_assets

spark_weather_assets = with_resources(
load_assets_from_modules(modules=[table_assets, spark_asset]),
resource_defs={
Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations/dbt-cloud.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Now that your dbt Cloud assets are loaded, you can define a Dagster job that mat
You can explicitly define when your assets should be materialized. For example, you can schedule assets based on their upstream or downstream dependencies, external events using a sensor, or a cron schedule.

```python startafter=start_schedule_dbt_cloud_assets endbefore=end_schedule_dbt_cloud_assets file=/integrations/dbt/dbt_cloud.py dedent=4
from dagster import ScheduleDefinition, define_asset_job, repository, AssetSelection
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, repository

# Materialize all assets in the repository
run_everything_job = define_asset_job("run_everything_job", AssetSelection.all())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from dagster import AssetIn, IOManager, asset, io_manager, repository, with_resources

from .asset_input_managers import load_numpy_array, load_pandas_dataframe, store_pandas_dataframe
from .asset_input_managers import (
load_numpy_array,
load_pandas_dataframe,
store_pandas_dataframe,
)


# start_numpy_example
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
from dagster import AssetsDefinition, GraphOut, Out, Output, define_asset_job, graph, op, repository
from dagster import (
AssetsDefinition,
GraphOut,
Out,
Output,
define_asset_job,
graph,
op,
repository,
)


# start_graph_backed_asset_foo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ def do_something_with_exc(some_exception): # pylint: disable=W0613
# end_reload_repo_location_marker

# start_shutdown_repo_location_marker
from dagster_graphql import ShutdownRepositoryLocationInfo, ShutdownRepositoryLocationStatus
from dagster_graphql import (
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)

shutdown_info: ShutdownRepositoryLocationInfo = client.shutdown_repository_location(
REPO_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from dagster import graph, job, op

from .unnested_ops import add_thirty_two, log_number, multiply_by_one_point_eight, return_fifty
from .unnested_ops import (
add_thirty_two,
log_number,
multiply_by_one_point_eight,
return_fifty,
)

# start_composite_solid_example_marker

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from dagster import AssetSelection, HourlyPartitionsDefinition, asset, define_asset_job, repository
from dagster import (
AssetSelection,
HourlyPartitionsDefinition,
asset,
define_asset_job,
repository,
)

hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# isort: skip_file

from docs_snippets.concepts.partitions_schedules_sensors.partitioned_job import do_stuff_partitioned
from docs_snippets.concepts.partitions_schedules_sensors.partitioned_job import (
do_stuff_partitioned,
)


# start
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from dagster import DagsterRunStatus, RunRequest, SkipReason, run_failure_sensor, run_status_sensor
from dagster import (
DagsterRunStatus,
RunRequest,
SkipReason,
run_failure_sensor,
run_status_sensor,
)

status_reporting_job = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ def my_slack_on_run_failure(context: RunFailureSensorContext):

slack_client.chat_postMessage(
channel="#alert-channel",
message=f'Job "{context.dagster_run.job_name}" failed. Error: {context.failure_event.message}',
message=(
f'Job "{context.dagster_run.job_name}" failed. Error:'
f" {context.failure_event.message}"
),
)


Expand All @@ -29,7 +32,10 @@ def email_alert(_):

@run_failure_sensor
def my_email_failure_sensor(context: RunFailureSensorContext):
message = f'Job "{context.dagster_run.job_name}" failed. Error: {context.failure_event.message}'
message = (
f'Job "{context.dagster_run.job_name}" failed. Error:'
f" {context.failure_event.message}"
)
email_alert(message)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from dagster import graph, repository, with_resources

from .clone_and_drop_db import drop_database_clone
from .repository_v2 import clone_prod, comments, get_current_env, items, resource_defs, stories
from .repository_v2 import (
clone_prod,
comments,
get_current_env,
items,
resource_defs,
stories,
)


# start_drop_db
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from docs_snippets.concepts.assets.asset_dependency import downstream_asset, upstream_asset
from docs_snippets.concepts.assets.asset_dependency import (
downstream_asset,
upstream_asset,
)


def test_asset_dependency():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from docs_snippets.concepts.assets.cross_repository_asset import repository_a, repository_b
from docs_snippets.concepts.assets.cross_repository_asset import (
repository_a,
repository_b,
)


def test_repository_asset_groups():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import tempfile

from docs_snippets.concepts.configuration.make_values_resource_any import file_dir_job
from docs_snippets.concepts.configuration.make_values_resource_config_schema import file_dirs_job
from docs_snippets.concepts.configuration.make_values_resource_config_schema import (
file_dirs_job,
)


def test_make_values_resource_any():
Expand Down
Loading

0 comments on commit f2b8d88

Please sign in to comment.