Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options to spark_df_output_schema #2616

Merged
merged 6 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 317 additions & 9 deletions python_modules/libraries/dagster-pyspark/dagster_pyspark/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,337 @@

from pyspark.sql import DataFrame as NativeSparkDataFrame

from dagster import Bool, Field, Materialization, PythonObjectDagsterType, String, check
from dagster import (
Bool,
Enum,
EnumValue,
Field,
Materialization,
Permissive,
PythonObjectDagsterType,
String,
check,
)
from dagster.config.field_utils import Selector
from dagster.core.storage.system_storage import fs_system_storage
from dagster.core.storage.type_storage import TypeStoragePlugin
from dagster.core.types.config_schema import output_selector_schema

WriteModeOptions = Enum(
'WriteMode',
[
EnumValue(
'append', description="Append contents of this :class:`DataFrame` to existing data."
),
EnumValue('overwrite', description="Overwrite existing data."),
EnumValue('ignore', description="Silently ignore this operation if data already exists."),
EnumValue(
'error', description="(default case): Throw an exception if data already exists."
),
EnumValue(
'errorifexists',
description="(default case): Throw an exception if data already exists.",
),
],
)


WriteCompressionTextOptions = Enum(
'WriteCompressionText',
[
EnumValue('none'),
EnumValue('bzip2'),
EnumValue('gzip'),
EnumValue('lz4'),
EnumValue('snappy'),
EnumValue('deflate'),
],
)


WriteCompressionOrcOptions = Enum(
'WriteCompressionOrc',
[EnumValue('none'), EnumValue('snappy'), EnumValue('zlib'), EnumValue('lzo'),],
)


WriteCompressionParquetOptions = Enum(
'WriteCompressionParquet',
[
EnumValue('none'),
EnumValue('uncompressed'),
EnumValue('snappy'),
EnumValue('gzip'),
EnumValue('lzo'),
EnumValue('brotli'),
EnumValue('lz4'),
EnumValue('zstd'),
],
)


@output_selector_schema(
Selector(
{
'csv': {
'path': Field(String),
'sep': Field(String, is_required=False),
'header': Field(Bool, is_required=False),
},
'csv': Permissive(
{
'path': Field(
String,
is_required=True,
description="the path in any Hadoop supported file system.",
),
'mode': Field(
WriteModeOptions,
is_required=False,
description="specifies the behavior of the save operation when data already exists.",
),
'compression': Field(
WriteCompressionTextOptions,
is_required=False,
description="compression codec to use when saving to file.",
),
'sep': Field(
String,
is_required=False,
description="sets a single character as a separator for each field and value. If None is set, it uses the default value, ``,``.",
),
'quote': Field(
String,
is_required=False,
description="""sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ``"``. If an empty string is set, it uses ``u0000`` (null character).""",
),
'escape': Field(
String,
is_required=False,
description="sets a single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, ``\``.",
),
'escapeQuotes': Field(
Bool,
is_required=False,
description="a flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value ``true``, escaping all values containing a quote character.",
),
'quoteAll': Field(
Bool,
is_required=False,
description="a flag indicating whether all values should always be enclosed in quotes. If None is set, it uses the default value ``false``, only escaping values containing a quote character.",
),
'header': Field(
Bool,
is_required=False,
description="writes the names of columns as the first line. If None is set, it uses the default value, ``false``.",
),
'nullValue': Field(
String,
is_required=False,
description="sets the string representation of a null value. If None is set, it uses the default value, empty string.",
),
'dateFormat': Field(
String,
is_required=False,
description="sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the default value, ``yyyy-MM-dd``.",
),
'timestampFormat': Field(
String,
is_required=False,
description="sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.",
),
'ignoreLeadingWhiteSpace': Field(
Bool,
is_required=False,
description="a flag indicating whether or not leading whitespaces from values being written should be skipped. If None is set, it uses the default value, ``true``.",
),
'ignoreTrailingWhiteSpace': Field(
Bool,
is_required=False,
description="a flag indicating whether or not trailing whitespaces from values being written should be skipped. If None is set, it uses the default value, ``true``.",
),
'charToEscapeQuoteEscaping': Field(
String,
is_required=False,
description="sets a single character used for escaping the escape for the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise..",
),
'encoding': Field(
String,
is_required=False,
description="sets the encoding (charset) of saved csv files. If None is set, the default UTF-8 charset will be used.",
),
'emptyValue': Field(
String,
is_required=False,
description="sets the string representation of an empty value. If None is set, it uses the default value, ``"
"``.",
),
}
),
'parquet': Permissive(
{
'path': Field(
String,
is_required=True,
description="the path in any Hadoop supported file system.",
),
'mode': Field(
WriteModeOptions,
is_required=False,
description="specifies the behavior of the save operation when data already exists.",
),
'partitionBy': Field(
String, is_required=False, description="names of partitioning columns."
),
'compression': Field(
WriteCompressionParquetOptions,
is_required=False,
description="compression codec to use when saving to file. This will override ``spark.sql.parquet.compression.codec``. If None is set, it uses the value specified in ``spark.sql.parquet.compression.codec``.",
),
}
),
'json': Permissive(
{
'path': Field(
String,
is_required=True,
description="the path in any Hadoop supported file system.",
),
'mode': Field(
WriteModeOptions,
is_required=False,
description="specifies the behavior of the save operation when data already exists.",
),
'compression': Field(
WriteCompressionTextOptions,
is_required=False,
description="compression codec to use when saving to file.",
),
'dateFormat': Field(
String,
is_required=False,
description="sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the default value, ``yyyy-MM-dd``.",
),
'timestampFormat': Field(
String,
is_required=False,
description="sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.",
),
'encoding': Field(
String,
is_required=False,
description="sets the encoding (charset) of saved csv files. If None is set, the default UTF-8 charset will be used.",
),
'lineSep': Field(
String,
is_required=False,
description="defines the line separator that should be used for writing. If None is set, it uses the default value, ``\\n``.",
),
}
),
'jdbc': Permissive(
{
'url': Field(
String,
is_required=True,
description="a JDBC URL of the form ``jdbc:subprotocol:subname``.",
),
'table': Field(
String,
is_required=True,
description="Name of the table in the external database.",
),
'mode': Field(
WriteModeOptions,
is_required=False,
description="specifies the behavior of the save operation when data already exists.",
),
'properties': Field(
Permissive(),
is_required=False,
description="""a dictionary of JDBC database connection arguments. Normally at least properties "user" and "password" with their corresponding values. For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }.""",
),
}
),
'orc': Permissive(
{
'path': Field(
String,
is_required=True,
description="the path in any Hadoop supported file system.",
),
'mode': Field(
WriteModeOptions,
is_required=False,
description="specifies the behavior of the save operation when data already exists.",
),
'partitionBy': Field(
String, is_required=False, description="names of partitioning columns."
),
'compression': Field(
WriteCompressionOrcOptions,
is_required=False,
description="compression codec to use when saving to file. This will override ``orc.compress`` and ``spark.sql.orc.compression.codec``. If None is set, it uses the value specified in ``spark.sql.orc.compression.codec``.",
),
}
),
'saveAsTable': Permissive(
{
'name': Field(String, is_required=True, description="the table name."),
'format': Field(
String, is_required=False, description="the format used to save."
),
'mode': Field(
WriteModeOptions,
is_required=False,
description="specifies the behavior of the save operation when data already exists.",
),
'partitionBy': Field(
String, is_required=False, description="names of partitioning columns."
),
'options': Field(
Permissive(), is_required=False, description="all other string options."
),
}
),
'text': Permissive(
{
'path': Field(
String,
is_required=True,
description="he path in any Hadoop supported file system.",
),
'compression': Field(
WriteCompressionTextOptions,
is_required=False,
description="compression codec to use when saving to file. This will override ``orc.compress`` and ``spark.sql.orc.compression.codec``. If None is set, it uses the value specified in ``spark.sql.orc.compression.codec``.",
),
'lineSep': Field(
String,
is_required=False,
description="defines the line separator that should be used for writing. If None is set, it uses the default value, ``\\n``.",
),
}
),
}
)
)
def spark_df_output_schema(_context, file_type, file_options, spark_df):
if file_type == 'csv':
spark_df.write.csv(
file_options['path'], header=file_options.get('header'), sep=file_options.get('sep')
)
spark_df.write.csv(**file_options)
return Materialization.file(file_options['path'])
elif file_type == 'parquet':
spark_df.write.parquet(**file_options)
return Materialization.file(file_options['path'])
elif file_type == 'json':
spark_df.write.json(**file_options)
return Materialization.file(file_options['path'])
elif file_type == 'jdbc':
spark_df.write.jdbc(**file_options)
return Materialization.file(file_options['url'])
elif file_type == 'orc':
spark_df.write.orc(**file_options)
return Materialization.file(file_options['path'])
elif file_type == 'saveAsTable':
spark_df.write.saveAsTable(**file_options)
return Materialization.file(file_options['name'])
elif file_type == 'text':
spark_df.write.text(**file_options)
return Materialization.file(file_options['path'])
else:
check.failed('Unsupported file type: {}'.format(file_type))
Expand Down
Loading