Skip to content

Commit

Permalink
Merge branch 'main' into enh_athena2pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
einavfiterTB committed Aug 25, 2024
2 parents c7033b8 + 064d375 commit 65aa46a
Show file tree
Hide file tree
Showing 41 changed files with 1,435 additions and 1,300 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "3.9.0"
current_version = "3.9.1"
commit = false
tag = false
tag_name = "{new_version}"
Expand Down
80 changes: 40 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,27 +94,27 @@ FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
## At scale
AWS SDK for pandas can also run your workflows at scale by leveraging [Modin](https://modin.readthedocs.io/en/stable/) and [Ray](https://www.ray.io/). Both projects aim to speed up data workloads by distributing processing over a cluster of workers.

Read our [docs](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/scale.html) or head to our latest [tutorials](https://github.com/aws/aws-sdk-pandas/tree/main/tutorials) to learn more.
Read our [docs](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/scale.html) or head to our latest [tutorials](https://github.com/aws/aws-sdk-pandas/tree/main/tutorials) to learn more.

> ⚠️ **Ray is currently not available for Python 3.12. While AWS SDK for pandas supports Python 3.12, it cannot be used at scale.**
## [Read The Docs](https://aws-sdk-pandas.readthedocs.io/)

- [**What is AWS SDK for pandas?**](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/about.html)
- [**Install**](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html)
- [PyPi (pip)](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#pypi-pip)
- [Conda](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#conda)
- [AWS Lambda Layer](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#aws-lambda-layer)
- [AWS Glue Python Shell Jobs](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#aws-glue-python-shell-jobs)
- [AWS Glue PySpark Jobs](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#aws-glue-pyspark-jobs)
- [Amazon SageMaker Notebook](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#amazon-sagemaker-notebook)
- [Amazon SageMaker Notebook Lifecycle](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#amazon-sagemaker-notebook-lifecycle)
- [EMR](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#emr)
- [From source](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/install.html#from-source)
- [**At scale**](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/scale.html)
- [Getting Started](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/scale.html#getting-started)
- [Supported APIs](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/scale.html#supported-apis)
- [Resources](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/scale.html#resources)
- [**What is AWS SDK for pandas?**](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/about.html)
- [**Install**](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html)
- [PyPi (pip)](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#pypi-pip)
- [Conda](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#conda)
- [AWS Lambda Layer](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#aws-lambda-layer)
- [AWS Glue Python Shell Jobs](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#aws-glue-python-shell-jobs)
- [AWS Glue PySpark Jobs](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#aws-glue-pyspark-jobs)
- [Amazon SageMaker Notebook](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#amazon-sagemaker-notebook)
- [Amazon SageMaker Notebook Lifecycle](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#amazon-sagemaker-notebook-lifecycle)
- [EMR](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#emr)
- [From source](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/install.html#from-source)
- [**At scale**](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/scale.html)
- [Getting Started](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/scale.html#getting-started)
- [Supported APIs](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/scale.html#supported-apis)
- [Resources](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/scale.html#resources)
- [**Tutorials**](https://github.com/aws/aws-sdk-pandas/tree/main/tutorials)
- [001 - Introduction](https://github.com/aws/aws-sdk-pandas/blob/main/tutorials/001%20-%20Introduction.ipynb)
- [002 - Sessions](https://github.com/aws/aws-sdk-pandas/blob/main/tutorials/002%20-%20Sessions.ipynb)
Expand Down Expand Up @@ -155,30 +155,30 @@ Read our [docs](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/scale.html) or he
- [039 - Athena Iceberg](https://github.com/aws/aws-sdk-pandas/blob/main/tutorials/039%20-%20Athena%20Iceberg.ipynb)
- [040 - EMR Serverless](https://github.com/aws/aws-sdk-pandas/blob/main/tutorials/040%20-%20EMR%20Serverless.ipynb)
- [041 - Apache Spark on Amazon Athena](https://github.com/aws/aws-sdk-pandas/blob/main/tutorials/041%20-%20Apache%20Spark%20on%20Amazon%20Athena.ipynb)
- [**API Reference**](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html)
- [Amazon S3](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-s3)
- [AWS Glue Catalog](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#aws-glue-catalog)
- [Amazon Athena](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-athena)
- [Amazon Redshift](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-redshift)
- [PostgreSQL](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#postgresql)
- [MySQL](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#mysql)
- [SQL Server](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#sqlserver)
- [Oracle](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#oracle)
- [Data API Redshift](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#data-api-redshift)
- [Data API RDS](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#data-api-rds)
- [OpenSearch](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#opensearch)
- [AWS Glue Data Quality](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#aws-glue-data-quality)
- [Amazon Neptune](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-neptune)
- [DynamoDB](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#dynamodb)
- [Amazon Timestream](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-timestream)
- [Amazon EMR](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-emr)
- [Amazon CloudWatch Logs](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-cloudwatch-logs)
- [Amazon Chime](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-chime)
- [Amazon QuickSight](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#amazon-quicksight)
- [AWS STS](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#aws-sts)
- [AWS Secrets Manager](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#aws-secrets-manager)
- [Global Configurations](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#global-configurations)
- [Distributed - Ray](https://aws-sdk-pandas.readthedocs.io/en/3.9.0/api.html#distributed-ray)
- [**API Reference**](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html)
- [Amazon S3](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-s3)
- [AWS Glue Catalog](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#aws-glue-catalog)
- [Amazon Athena](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-athena)
- [Amazon Redshift](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-redshift)
- [PostgreSQL](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#postgresql)
- [MySQL](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#mysql)
- [SQL Server](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#sqlserver)
- [Oracle](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#oracle)
- [Data API Redshift](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#data-api-redshift)
- [Data API RDS](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#data-api-rds)
- [OpenSearch](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#opensearch)
- [AWS Glue Data Quality](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#aws-glue-data-quality)
- [Amazon Neptune](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-neptune)
- [DynamoDB](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#dynamodb)
- [Amazon Timestream](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-timestream)
- [Amazon EMR](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-emr)
- [Amazon CloudWatch Logs](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-cloudwatch-logs)
- [Amazon Chime](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-chime)
- [Amazon QuickSight](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#amazon-quicksight)
- [AWS STS](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#aws-sts)
- [AWS Secrets Manager](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#aws-secrets-manager)
- [Global Configurations](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#global-configurations)
- [Distributed - Ray](https://aws-sdk-pandas.readthedocs.io/en/3.9.1/api.html#distributed-ray)
- [**License**](https://github.com/aws/aws-sdk-pandas/blob/main/LICENSE.txt)
- [**Contributing**](https://github.com/aws/aws-sdk-pandas/blob/main/CONTRIBUTING.md)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.0
3.9.1
2 changes: 1 addition & 1 deletion awswrangler/__metadata__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

__title__: str = "awswrangler"
__description__: str = "Pandas on AWS."
__version__: str = "3.9.0"
__version__: str = "3.9.1"
__license__: str = "Apache License 2.0"
1 change: 1 addition & 0 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"pyodbc": "sqlserver",
"gremlin_python": "gremlin",
"opensearchpy": "opensearch",
"jsonpath_ng": "opensearch",
"oracledb": "oracle",
}

Expand Down
16 changes: 8 additions & 8 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,11 +794,11 @@ def read_sql_query(
**Related tutorial:**
- `Amazon Athena <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
- `Amazon Athena <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/006%20-%20Amazon%20Athena.html>`_
- `Athena Cache <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
- `Athena Cache <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/019%20-%20Athena%20Cache.html>`_
- `Global Configurations <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
- `Global Configurations <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/021%20-%20Global%20Configurations.html>`_
**There are three approaches available through ctas_approach and unload_approach parameters:**
Expand Down Expand Up @@ -862,7 +862,7 @@ def read_sql_query(
/athena.html#Athena.Client.get_query_execution>`_ .
For a practical example check out the
`related tutorial <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
`related tutorial <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/024%20-%20Athena%20Query%20Metadata.html>`_!
Expand Down Expand Up @@ -1139,11 +1139,11 @@ def read_sql_table(
**Related tutorial:**
- `Amazon Athena <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
- `Amazon Athena <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/006%20-%20Amazon%20Athena.html>`_
- `Athena Cache <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
- `Athena Cache <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/019%20-%20Athena%20Cache.html>`_
- `Global Configurations <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
- `Global Configurations <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/021%20-%20Global%20Configurations.html>`_
**There are three approaches available through ctas_approach and unload_approach parameters:**
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def read_sql_table(
/athena.html#Athena.Client.get_query_execution>`_ .
For a practical example check out the
`related tutorial <https://aws-sdk-pandas.readthedocs.io/en/3.9.0/
`related tutorial <https://aws-sdk-pandas.readthedocs.io/en/3.9.1/
tutorials/024%20-%20Athena%20Query%20Metadata.html>`_!
Expand Down
151 changes: 117 additions & 34 deletions awswrangler/athena/_write_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,109 @@ def _validate_args(
)


def _merge_iceberg(
df: pd.DataFrame,
database: str,
table: str,
source_table: str,
merge_cols: list[str] | None = None,
merge_condition: Literal["update", "ignore"] = "update",
merge_match_nulls: bool = False,
kms_key: str | None = None,
boto3_session: boto3.Session | None = None,
s3_output: str | None = None,
workgroup: str = "primary",
encryption: str | None = None,
data_source: str | None = None,
) -> None:
"""
Merge iceberg.
Merge data from source_table and write it to an Athena iceberg table.
Parameters
----------
df : pd.DataFrame
Pandas DataFrame.
database : str
AWS Glue/Athena database name - It is only the origin database from where the query will be launched.
You can still using and mixing several databases writing the full table name within the sql
(e.g. `database.table`).
table : str
AWS Glue/Athena destination table name.
source_table: str
AWS Glue/Athena source table name.
merge_cols: List[str], optional
List of column names that will be used for conditional inserts and updates.
https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
merge_condition: str, optional
The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore'].
merge_match_nulls: bool, optional
Instruct whether to have nulls in the merge condition match other nulls
kms_key : str, optional
For SSE-KMS, this is the KMS key ARN or ID.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
s3_output : str, optional
Amazon S3 path used for query execution.
workgroup : str
Athena workgroup. Primary by default.
encryption : str, optional
Valid values: [None, 'SSE_S3', 'SSE_KMS']. Notice: 'CSE_KMS' is not supported.
data_source : str, optional
Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
Returns
-------
None
"""
wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup)

sql_statement: str
if merge_cols:
if merge_condition == "update":
match_condition = f"""WHEN MATCHED THEN
UPDATE SET {', '.join([f'"{x}" = source."{x}"' for x in df.columns])}"""
else:
match_condition = ""

if merge_match_nulls:
merge_conditions = [f'(target."{x}" IS NOT DISTINCT FROM source."{x}")' for x in merge_cols]
else:
merge_conditions = [f'(target."{x}" = source."{x}")' for x in merge_cols]

sql_statement = f"""
MERGE INTO "{database}"."{table}" target
USING "{database}"."{source_table}" source
ON {' AND '.join(merge_conditions)}
{match_condition}
WHEN NOT MATCHED THEN
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
VALUES ({', '.join([f'source."{x}"' for x in df.columns])})
"""
else:
sql_statement = f"""
INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])})
SELECT {', '.join([f'"{x}"' for x in df.columns])}
FROM "{database}"."{source_table}"
"""

query_execution_id: str = _start_query_execution(
sql=sql_statement,
workgroup=workgroup,
wg_config=wg_config,
database=database,
data_source=data_source,
s3_output=s3_output,
encryption=encryption,
kms_key=kms_key,
boto3_session=boto3_session,
)
wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)


@apply_configs
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
Expand All @@ -253,6 +356,7 @@ def to_iceberg(
partition_cols: list[str] | None = None,
merge_cols: list[str] | None = None,
merge_condition: Literal["update", "ignore"] = "update",
merge_match_nulls: bool = False,
keep_files: bool = True,
data_source: str | None = None,
s3_output: str | None = None,
Expand Down Expand Up @@ -301,6 +405,8 @@ def to_iceberg(
https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
merge_condition: str, optional
The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore'].
merge_match_nulls: bool, optional
Instruct whether to have nulls in the merge condition match other nulls
keep_files : bool
Whether staging files produced by Athena are retained. 'True' by default.
data_source : str, optional
Expand Down Expand Up @@ -504,44 +610,21 @@ def to_iceberg(
glue_table_settings=glue_table_settings,
)

# Insert or merge into Iceberg table
sql_statement: str
if merge_cols:
if merge_condition == "update":
match_condition = f"""WHEN MATCHED THEN
UPDATE SET {', '.join([f'"{x}" = source."{x}"' for x in df.columns])}"""
else:
match_condition = ""
sql_statement = f"""
MERGE INTO "{database}"."{table}" target
USING "{database}"."{temp_table}" source
ON {' AND '.join([
f'(target."{x}" = source."{x}" OR (target."{x}" IS NULL AND source."{x}" IS NULL))'
for x in merge_cols])}
{match_condition}
WHEN NOT MATCHED THEN
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
VALUES ({', '.join([f'source."{x}"' for x in df.columns])})
"""
else:
sql_statement = f"""
INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])})
SELECT {', '.join([f'"{x}"' for x in df.columns])}
FROM "{database}"."{temp_table}"
"""

query_execution_id: str = _start_query_execution(
sql=sql_statement,
workgroup=workgroup,
wg_config=wg_config,
_merge_iceberg(
df=df,
database=database,
data_source=data_source,
s3_output=s3_output,
encryption=encryption,
table=table,
source_table=temp_table,
merge_cols=merge_cols,
merge_condition=merge_condition,
merge_match_nulls=merge_match_nulls,
kms_key=kms_key,
boto3_session=boto3_session,
s3_output=s3_output,
workgroup=workgroup,
encryption=encryption,
data_source=data_source,
)
wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)

except Exception as ex:
_logger.error(ex)
Expand Down
4 changes: 2 additions & 2 deletions awswrangler/catalog/_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ def create_csv_table(
If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
Related tutorial:
https://aws-sdk-pandas.readthedocs.io/en/3.9.0/tutorials/014%20-%20Schema%20Evolution.html
https://aws-sdk-pandas.readthedocs.io/en/3.9.1/tutorials/014%20-%20Schema%20Evolution.html
sep : str
String of length 1. Field delimiter for the output file.
skip_header_line_count : Optional[int]
Expand Down Expand Up @@ -1300,7 +1300,7 @@ def create_json_table(
If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
Related tutorial:
https://aws-sdk-pandas.readthedocs.io/en/3.9.0/tutorials/014%20-%20Schema%20Evolution.html
https://aws-sdk-pandas.readthedocs.io/en/3.9.1/tutorials/014%20-%20Schema%20Evolution.html
serde_library : Optional[str]
Specifies the SerDe Serialization library which will be used. You need to provide the Class library name
as a string.
Expand Down
Loading

0 comments on commit 65aa46a

Please sign in to comment.