Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v1.26.0 merge back #2721

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.x'
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
17 changes: 3 additions & 14 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 1.26.0 (TBD)
## 1.26.0 (2024-12-05)

### Snowpark Python API Updates

Expand All @@ -23,7 +23,6 @@
- Added support for following functions in `functions.py`:
- `size` to get size of array, object, or map columns.
- `collect_list` an alias of `array_agg`.
- `concat_ws_ignore_nulls` to concatenate strings with a separator, ignoring null values.
- `substring` makes `len` argument optional.
- Added parameter `ast_enabled` to session for internal usage (default: `False`).

Expand Down Expand Up @@ -51,7 +50,6 @@
dynamic pivot is now generally available.
- Fixed a bug in `session.read.options` where `False` Boolean values were incorrectly parsed as `True` in the generated file format.


#### Dependency Updates

- Added a runtime dependency on `python-dateutil`.
Expand All @@ -71,24 +69,15 @@
- Added partial support for the dataframe interchange protocol method
`DataFrame.__dataframe__()`.

#### Dependency Updates

#### Bug Fixes
- Fixed a bug in `df.loc` where setting a single column from a series results in unexpected `None` values.

- Fixed a bug in `df.loc` where setting a single column from a series results in unexpected `None` values.

#### Improvements

- Use UNPIVOT INCLUDE NULLS for unpivot operations in pandas instead of sentinel values.
- Improved documentation for pd.read_excel.

### Snowpark Local Testing Updates

#### New Features


#### Bug Fixes


## 1.25.0 (2024-11-14)

### Snowpark Python API Updates
Expand Down
35 changes: 30 additions & 5 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@
"_themes",
]

# Override default RTD css to get a larger width
# def setup(app):
# app.add_stylesheet('theme_overrides.css')

html_theme_options = {
# 'analytics_id': 'UA-XXXXXXX-1',
}
Expand Down Expand Up @@ -287,6 +283,30 @@ def process_modin_accessors(args):
return list(map(process_modin_accessors, items))
return items

def process_signature(app, what, name, obj, options, signature, return_annotation):

# Names to remove from signature (AST related):
names_to_remove = ['_emit_ast', '_ast']

def remove_from_signature(signature, name_to_remove):
if name_to_remove not in signature:
return signature

if signature.startswith('(') and signature.endswith(')'):
# temporarily remove parentheses, add after removing name_to_remove parts.
signature = signature[1:-1]
parts = [p for p in signature.split(',') if name_to_remove not in p]
signature = ','.join(parts)

return f'({signature})'
else:
return signature

if signature:
for name_to_remove in names_to_remove:
signature = remove_from_signature(signature, name_to_remove)

return (signature, return_annotation)

def setup(app):
# Make sure modin.pandas namespace is properly set up
Expand All @@ -300,7 +320,7 @@ def setup(app):
# WARNING: [autosummary] failed to import modin.pandas.Series.str.slice.
# Possible hints:
# * AttributeError: 'property' object has no attribute 'slice'
# * ImportError:
# * ImportError:
# * ModuleNotFoundError: No module named 'modin.pandas.Series'
#
# Because we're replacing the `property` object, we also need to set the __doc__ of the new
Expand All @@ -323,6 +343,10 @@ def setup(app):
app.add_autodocumenter(ModinAccessorAttributeDocumenter)
app.add_directive("autosummary", ModinAutosummary)

# For Snowpark IR, in phase0 a hidden parameter _emit_ast is introduced. Once phase1 completes,
# this parameter will be removed. Automatically remove _emit_ast for now from docs to avoid confusion.
app.connect("autodoc-process-signature", process_signature)


# We overwrite the existing "autosummary" directive in order to properly resolve names for modin
# accessor classes. We cannot simply declare an alternative directive like "automodinsummary" to use
Expand Down Expand Up @@ -370,3 +394,4 @@ def linkcode_resolve(domain, info):
f"https://github.com/snowflakedb/snowpark-python/blob/"
f"v{release}/{os.path.relpath(fn, start=os.pardir)}{linespec}"
)

1 change: 0 additions & 1 deletion docs/source/snowpark/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ Functions
column
concat
concat_ws
concat_ws_ignore_nulls
contains
convert_timezone
corr
Expand Down
4 changes: 2 additions & 2 deletions recipe/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% set name = "snowflake-snowpark-python" %}
{% set version = "1.25.0" %}
{% set version = "1.26.0" %}
{% set noarch_build = (os.environ.get('SNOWFLAKE_SNOWPARK_PYTHON_NOARCH_BUILD', 'false')) == 'true' %}

package:
Expand Down Expand Up @@ -33,7 +33,7 @@ requirements:
run:
- python
- cloudpickle >=1.6.0,<=2.2.1,!=2.1.0,!=2.2.0
- snowflake-connector-python >=3.10.0,<4.0.0
- snowflake-connector-python >=3.12.0,<4.0.0
- typing-extensions >=4.1.0,<5.0.0
# need to pin libffi because of problems in cryptography.
# This might no longer hold true but keep it just to avoid it from biting us again
Expand Down
193 changes: 105 additions & 88 deletions src/snowflake/snowpark/_internal/compiler/plan_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import copy
import logging
import time
from typing import Any, Dict, List

Expand Down Expand Up @@ -32,6 +33,8 @@
from snowflake.snowpark._internal.utils import random_name_for_temp_object
from snowflake.snowpark.mock._connection import MockServerConnection

_logger = logging.getLogger(__name__)


class PlanCompiler:
"""
Expand Down Expand Up @@ -77,98 +80,112 @@ def should_start_query_compilation(self) -> bool:
)

def compile(self) -> Dict[PlanQueryType, List[Query]]:
# initialize the queries with the original queries without optimization
final_plan = self._plan
queries = {
PlanQueryType.QUERIES: final_plan.queries,
PlanQueryType.POST_ACTIONS: final_plan.post_actions,
}

if self.should_start_query_compilation():
session = self._plan.session
# preparation for compilation
# 1. make a copy of the original plan
start_time = time.time()
complexity_score_before_compilation = get_complexity_score(self._plan)
logical_plans: List[LogicalPlan] = [copy.deepcopy(self._plan)]
plot_plan_if_enabled(self._plan, "original_plan")
plot_plan_if_enabled(logical_plans[0], "deep_copied_plan")
deep_copy_end_time = time.time()

# 2. create a code generator with the original plan
query_generator = create_query_generator(self._plan)

extra_optimization_status: Dict[str, Any] = {}
# 3. apply each optimizations if needed
# CTE optimization
cte_start_time = time.time()
if session.cte_optimization_enabled:
repeated_subquery_eliminator = RepeatedSubqueryElimination(
logical_plans, query_generator
try:
# preparation for compilation
# 1. make a copy of the original plan
start_time = time.time()
complexity_score_before_compilation = get_complexity_score(self._plan)
logical_plans: List[LogicalPlan] = [copy.deepcopy(self._plan)]
plot_plan_if_enabled(self._plan, "original_plan")
plot_plan_if_enabled(logical_plans[0], "deep_copied_plan")
deep_copy_end_time = time.time()

# 2. create a code generator with the original plan
query_generator = create_query_generator(self._plan)

extra_optimization_status: Dict[str, Any] = {}
# 3. apply each optimizations if needed
# CTE optimization
cte_start_time = time.time()
if session.cte_optimization_enabled:
repeated_subquery_eliminator = RepeatedSubqueryElimination(
logical_plans, query_generator
)
elimination_result = repeated_subquery_eliminator.apply()
logical_plans = elimination_result.logical_plans
# add the extra repeated subquery elimination status
extra_optimization_status[
CompilationStageTelemetryField.CTE_NODE_CREATED.value
] = elimination_result.total_num_of_ctes

cte_end_time = time.time()
complexity_scores_after_cte = [
get_complexity_score(logical_plan) for logical_plan in logical_plans
]
for i, plan in enumerate(logical_plans):
plot_plan_if_enabled(plan, f"cte_optimized_plan_{i}")

# Large query breakdown
breakdown_failure_summary, skipped_summary = {}, {}
if session.large_query_breakdown_enabled:
large_query_breakdown = LargeQueryBreakdown(
session,
query_generator,
logical_plans,
session.large_query_breakdown_complexity_bounds,
)
breakdown_result = large_query_breakdown.apply()
logical_plans = breakdown_result.logical_plans
breakdown_failure_summary = breakdown_result.breakdown_summary
skipped_summary = breakdown_result.skipped_summary

large_query_breakdown_end_time = time.time()
complexity_scores_after_large_query_breakdown = [
get_complexity_score(logical_plan) for logical_plan in logical_plans
]
for i, plan in enumerate(logical_plans):
plot_plan_if_enabled(plan, f"large_query_breakdown_plan_{i}")

# 4. do a final pass of code generation
queries = query_generator.generate_queries(logical_plans)

# log telemetry data
deep_copy_time = deep_copy_end_time - start_time
cte_time = cte_end_time - cte_start_time
large_query_breakdown_time = (
large_query_breakdown_end_time - cte_end_time
)
elimination_result = repeated_subquery_eliminator.apply()
logical_plans = elimination_result.logical_plans
# add the extra repeated subquery elimination status
extra_optimization_status[
CompilationStageTelemetryField.CTE_NODE_CREATED.value
] = elimination_result.total_num_of_ctes

cte_end_time = time.time()
complexity_scores_after_cte = [
get_complexity_score(logical_plan) for logical_plan in logical_plans
]
for i, plan in enumerate(logical_plans):
plot_plan_if_enabled(plan, f"cte_optimized_plan_{i}")

# Large query breakdown
breakdown_failure_summary, skipped_summary = {}, {}
if session.large_query_breakdown_enabled:
large_query_breakdown = LargeQueryBreakdown(
session,
query_generator,
logical_plans,
session.large_query_breakdown_complexity_bounds,
total_time = time.time() - start_time
summary_value = {
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: session.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: session.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: session.large_query_breakdown_complexity_bounds,
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_LARGE_QUERY_BREAKDOWN.value: large_query_breakdown_time,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BEFORE_COMPILATION.value: complexity_score_before_compilation,
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_CTE_OPTIMIZATION.value: complexity_scores_after_cte,
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_LARGE_QUERY_BREAKDOWN.value: complexity_scores_after_large_query_breakdown,
CompilationStageTelemetryField.BREAKDOWN_FAILURE_SUMMARY.value: breakdown_failure_summary,
CompilationStageTelemetryField.TYPE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_SKIPPED.value: skipped_summary,
}
# add the extra optimization status
summary_value.update(extra_optimization_status)
session._conn._telemetry_client.send_query_compilation_summary_telemetry(
session_id=session.session_id,
plan_uuid=self._plan.uuid,
compilation_stage_summary=summary_value,
)
breakdown_result = large_query_breakdown.apply()
logical_plans = breakdown_result.logical_plans
breakdown_failure_summary = breakdown_result.breakdown_summary
skipped_summary = breakdown_result.skipped_summary

large_query_breakdown_end_time = time.time()
complexity_scores_after_large_query_breakdown = [
get_complexity_score(logical_plan) for logical_plan in logical_plans
]
for i, plan in enumerate(logical_plans):
plot_plan_if_enabled(plan, f"large_query_breakdown_plan_{i}")

# 4. do a final pass of code generation
queries = query_generator.generate_queries(logical_plans)

# log telemetry data
deep_copy_time = deep_copy_end_time - start_time
cte_time = cte_end_time - cte_start_time
large_query_breakdown_time = large_query_breakdown_end_time - cte_end_time
total_time = time.time() - start_time
summary_value = {
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: session.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: session.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: session.large_query_breakdown_complexity_bounds,
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_LARGE_QUERY_BREAKDOWN.value: large_query_breakdown_time,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BEFORE_COMPILATION.value: complexity_score_before_compilation,
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_CTE_OPTIMIZATION.value: complexity_scores_after_cte,
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_LARGE_QUERY_BREAKDOWN.value: complexity_scores_after_large_query_breakdown,
CompilationStageTelemetryField.BREAKDOWN_FAILURE_SUMMARY.value: breakdown_failure_summary,
CompilationStageTelemetryField.TYPE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_SKIPPED.value: skipped_summary,
}
# add the extra optimization status
summary_value.update(extra_optimization_status)
session._conn._telemetry_client.send_query_compilation_summary_telemetry(
session_id=session.session_id,
plan_uuid=self._plan.uuid,
compilation_stage_summary=summary_value,
)
else:
final_plan = self._plan
queries = {
PlanQueryType.QUERIES: final_plan.queries,
PlanQueryType.POST_ACTIONS: final_plan.post_actions,
}
except Exception as e:
# if any error occurs during the compilation, we should fall back to the original plan
_logger.debug(f"Skipping optimization due to error: {e}")
session._conn._telemetry_client.send_query_compilation_stage_failed_telemetry(
session_id=session.session_id,
plan_uuid=self._plan.uuid,
error_type=type(e).__name__,
error_message=str(e),
)
pass

return self.replace_temp_obj_placeholders(queries)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ class CompilationStageTelemetryField(Enum):
"snowpark_large_query_breakdown_optimization_skipped"
)
TYPE_COMPILATION_STAGE_STATISTICS = "snowpark_compilation_stage_statistics"
TYPE_COMPILATION_STAGE_FAILED = "snowpark_compilation_stage_failed"
TYPE_LARGE_QUERY_BREAKDOWN_UPDATE_COMPLEXITY_BOUNDS = (
"snowpark_large_query_breakdown_update_complexity_bounds"
)

# keys
KEY_REASON = "reason"
PLAN_UUID = "plan_uuid"
ERROR_TYPE = "error_type"
ERROR_MESSAGE = "error_message"
TIME_TAKEN_FOR_COMPILATION = "time_taken_for_compilation_sec"
TIME_TAKEN_FOR_DEEP_COPY_PLAN = "time_taken_for_deep_copy_plan_sec"
TIME_TAKEN_FOR_CTE_OPTIMIZATION = "time_taken_for_cte_optimization_sec"
Expand Down
Loading
Loading