Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] add WHEN NOT MATCHED BY SOURCE/TARGET clause suppoort #1364

Closed
1 of 3 tasks
geoHeil opened this issue Aug 31, 2022 · 10 comments
Closed
1 of 3 tasks
Labels
enhancement New feature or request
Milestone

Comments

@geoHeil
Copy link

geoHeil commented Aug 31, 2022

Feature request

https://delta-users.slack.com/archives/CJ70UCSHM/p1661955032288519

Overview

WHEN NOT MATCHED BY SOURCE/TARGET clause support

Motivation

feature parity with popular other SQL databases, ease of use

Further details

Each day I get a full dump of a table. However, this data needs to be cleaned and in particular, compressed using the SCD2 style approach to be easily consumable downstream.
Unfortunately, I do not get changesets or a NULL value for a key in case of deletions. I only receive NO LONGER a row (including the key).
The links:

MERGE <target_table> [AS TARGET]
USING <table_source> [AS SOURCE]
ON <search_condition>
[WHEN MATCHED 
   THEN <merge_matched> ]
[WHEN NOT MATCHED [BY TARGET]
   THEN <merge_not_matched> ]
[WHEN NOT MATCHED BY SOURCE
   THEN <merge_matched> ];

will not work with Delta/Spark as the WHEN NOT MATCHED clause does not seem to support the BY SOURCE | TARGET extension.
How can I still calculate the SCD2 representation?

  1. If the table is empty simply take all the data (for the initial load)
  2. When a new day/full copy of the data arrives:
  • INSERT any new keys into the table
  • For EXISTING keys perform an update (set the OLD value to be no longer valid (set end-date) and produce a new row in SCD2 with the contents of the new row and validity until infinity (end-date null))
  • In case a previously present key Is no longer present close the SCD2 valid_from/valid_to interval by setting end-date
    • In case a new record arrives for this key in the future start a new fresh SCD2 row valid until infinity for this new row/values.

An example case/dataset:

import pandas as pd
import numpy as np
# assumes a running spark session including support for deltalake to be available

d1 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3], 'value':[4,5,6],'value2':["a", "b", "c"], 'date':[1,1,1]}))
#d1.show()

# notice one entry is MISSING (it should be deleted) or rather SCD2 invalidated
d2 = spark.createDataFrame(pd.DataFrame({'key':[1,2], 'value':[4,5], 'date':[2,2],'value2':["a", "b"]}))

# d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
d3 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3], 'value':[4,5, 66], 'date':[3,3,3], 'value2':["a", "b", "c"]}))

# a new record is added
d4 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3, 4], 'value':[4,5, 66, 44], 'date':[4,4,4,4], 'value2':["a", "b", "c", "d"]}))

# a new record is added, one removed and one updated
d5 = spark.createDataFrame(pd.DataFrame({'key':[2,3, 4, 5], 'value':[5, 67, 44, 55], 'date':[5,5,5,5], 'value2':["b", "c", "d", "e"]}))

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • Yes. I can contribute this feature independently.
  • Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • No. I cannot contribute this feature at this time.
@geoHeil geoHeil added the enhancement New feature or request label Aug 31, 2022
@geoHeil
Copy link
Author

geoHeil commented Aug 31, 2022

Is there any workaround / current solution (perhaps not in the SQL but scala/Python API?

@nkarpov
Copy link
Collaborator

nkarpov commented Sep 1, 2022

Thanks for creating this issue @geoHeil! Copying over a suggested approach from the Slack thread:

  • No out-of-the-box way to do this in a single atomic command today
  • You can LEFT join the target to the source and derive a new column for rows to DELETE, then run a MERGE with multiple WHEN MATCHED clauses to cover all the cases

Otherwise, the feature request is a good one, so we'll leave it open here if anyone from the community would like to work on it.

@geoHeil
Copy link
Author

geoHeil commented Sep 2, 2022

A small example:

step 1: initial write

# state cleanup
!rm -rf dummy_delta

# initial first write of data (first load of data)"
d1.withColumn("is_current", F.lit(True)).withColumn("valid_to", F.lit(None).astype("int")).write.mode("overwrite").format("delta").save("dummy_delta")
#.option("readChangeFeed", "true")
delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.printSchema()
delta_state.show()

First UPSERT (including SCD2 soft delete):

# JOIN the existing state with the data

# DELETE
current_dt = 1.0
print(delta_state.count())
r = delta_state.join(d2.withColumnRenamed("valid_to", "valid_to_c").withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='left').withColumn("is_current", F.when(F.col("value_c").isNull(), F.lit(False)).otherwise(F.col("is_current"))).withColumn("valid_to", F.when(F.col("value_c").isNull(), F.lit(current_dt)).otherwise(F.col("valid_to")))
# shrink back (i.e. manual MERGE INTO)
r = r.select("key", "value", "date", "valid_to", "is_current")
print(r.count())
r.show()

# SCD2 not relevant: in this example no additional rows/update rows are present 


# overwrite the latest state
r.write.mode("overwrite").format("delta").save("dummy_delta")

Next (2nd) UPSERT (including SCD2 soft delete):

d2 had (3) as missing - this entry is back now (and should start a new SCD2 row with fresh valid_from valid until infinity (null for valid_to)

delta_state = spark.read.format("delta").load("dummy_delta")
current_dt = 2.0

print(delta_state.count())
# WARNING: must do outer join (and filter accordingly) otherwise it is unneccessarily many joins later
r = delta_state.join(d3.withColumnRenamed("valid_to", "valid_to_c").withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='left').withColumn("is_current", F.when(F.col("value_c").isNull(), F.lit(False)).otherwise(F.col("is_current"))).withColumn("valid_to", F.when(F.col("value_c").isNull(), F.lit(current_dt)).otherwise(F.col("valid_to")))

# shrink back (i.e. manual MERGE INTO)
r = r.select("key", "value", "date", "valid_to", "is_current")
print(r.count())
r.show()

# store deletions
r.write.mode("overwrite").format("delta").save("dummy_delta")
# how to do the SCD2 (add row or insert) in a nice way? Perhaps using MERGE INTO?

now try to achieve the SCD2 (closing old row inserting new row) via MERGE INTO:

# state:
+---+-----+----+--------+----------+
|key|value|date|valid_to|is_current|
+---+-----+----+--------+----------+
|  1|    4|   1|    null|      true|
|  3|    6|   1|     1.0|     false|
|  2|    5|   1|    null|      true|
+---+-----+----+--------+----------+

# current update:
+---+-----+----+
|key|value|date|
+---+-----+----+
|  1|   40|   3|
|  2|   50|   3|
|  3|   66|   3|
+---+-----+----+

NOTICE: d2 had (3) as missing - this entry is back now (and should start a new SCD2 row

delta_state = DeltaTable.forPath(spark, 'dummy_delta')
stagedUpdates = d3.drop("valid_to")

delta_state.toDF().filter(F.col("is_current") == True).join(stagedUpdates.withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='outer').show()

+---+-----+----+--------+----------+-------+------+
|key|value|date|valid_to|is_current|value_c|date_c|
+---+-----+----+--------+----------+-------+------+
|  1|    4|   1|    null|      true|      4|     3|
|  2|    5|   1|    null|      true|      5|     3|
|  3| null|null|    null|      null|     66|     3|
+---+-----+----+--------+----------+-------+------+

now trying the merge into:

# Apply SCD Type 2 operation using merge
delta_state.alias("s").merge(stagedUpdates.withColumnRenamed("key", "mergeKey").alias("c"),
  "s.key = mergeKey and s.is_current = true") \
.whenMatchedUpdate(
  condition = "s.is_current = true AND s.value <> c.value",
  set = {                                      # Set current to false and endDate to source's effective date.
    "is_current": "false",
    "valid_to": "c.date"
  }
).whenNotMatchedInsert(
  values = {
    "key": "c.mergeKey",
    "value": "c.value",
    "is_current": "true",
    "date": "c.date",  # Set current to true along with the new address and its effective date.
    "valid_to": "null"
  }
).execute()

delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.show(50)

+---+-----+----+--------+----------+
|key|value|date|valid_to|is_current|
+---+-----+----+--------+----------+
|  1|    4|   1|    null|      true|
|  2|    5|   1|    null|      true|
|  3|   66|   3|    null|      true|
|  3|    6|   1|     1.0|     false|
+---+-----+----+--------+----------+

questions:

  • how to handle multiple columns more efficiently this is super clumsy.
  • in particular in the above workaround for the deletion
  • performance: I am using one additional join (to handle the deletion). Couldn`t this be simplified with an OUTER join (to only join once?

@geoHeil
Copy link
Author

geoHeil commented Sep 3, 2022

Some of the questions (around easing the handling of deletions) can be partially handled with:

def perform_rename(df, mapping):
    for old_value, new_value in mapping.items():
        df = df.withColumnRenamed(old_value, new_value)
    return df


def handle_deletions(updates_df, keys, current_dt, date_col='date',columns_to_drop=[], is_current_col='is_current', valid_to_col='valid_to'):
    def _(df_current_state):
        original_columns = delta_state.columns        
        mapping = dict([(c, f'{c}_c') for c in updates_df.drop(*keys).drop(*columns_to_drop).columns])
        value_cols = updates_df.drop(*keys).drop(*columns_to_drop).columns
        r = df_current_state.join(perform_rename(updates_df, mapping), on=keys, how='left')
        
        # it is good enough to look only at a single value (all wil be NULL anyways)
        v = f"{value_cols[0]}_c"
        r = r.withColumn(is_current_col, F.when(F.col(v).isNull(), F.lit(False)).otherwise(F.col(is_current_col))).withColumn(valid_to_col, F.when(F.col(v).isNull(), F.lit(current_dt)).otherwise(F.col(valid_to_col)))
        return r.select(original_columns)

    return _

def handle_merge(path, updates, current_dt):
    delta_state = spark.read.format("delta").load(path)
    deletions_handled = delta_state.transform(handle_deletions(updates, keys=['key'], current_dt=current_dt))
    deletions_handled.write.mode("overwrite").format("delta").save("dummy_delta")
    
    #-------------------------------------------------
    # Apply SCD Type 2 operation using merge
    delta_state = DeltaTable.forPath(spark, path)
    delta_state.alias("s").merge(updates.alias("c"),
      "s.key = c.key and s.is_current = true") \
    .whenMatchedUpdate(
      condition = "s.is_current = true AND s.value <> c.value",
      set = {                                      # Set current to false and endDate to source's effective date.
        "is_current": "false",
        "valid_to": "c.date"
      }
    ).whenNotMatchedInsert(
      values = {
        "key": "c.key",
        "value": "c.value",
        "value2": "c.value2",
        "is_current": "true",
        "date": "c.date",  # Set current to true along with the new address and its effective date.
        "valid_to": "null"
      }
    ).execute()

    delta_state = spark.read.format("delta").load("dummy_delta")
    delta_state.show(50)

now when applying this function:

# notice one entry is MISSING (it should be deleted) or rather SCD2 invalidated
handle_merge('dummy_delta', d2, current_dt=2)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|      true|    null|
|  2|    5|     b|   1|      true|    null|
|  3|    6|     c|   1|     false|       2|
+---+-----+------+----+----------+--------+

Works as expected

# d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
handle_merge('dummy_delta', d3, current_dt=3)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|      true|    null|
|  2|    5|     b|   1|      true|    null|
|  3|   66|     c|   3|      true|    null|
|  3|    6|     c|   1|     false|       2|
+---+-----+------+----+----------+--------+

works as expected

# a new record is added
handle_merge('dummy_delta', d4, current_dt=4)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|      true|    null|
|  2|    5|     b|   1|      true|    null|
|  3|   66|     c|   3|      true|    null|
|  3|    6|     c|   1|     false|       2|
|  4|   44|     d|   4|      true|    null|
+---+-----+------+----+----------+--------+

works as expected

d5.show()
+---+-----+----+------+
|key|value|date|value2|
+---+-----+----+------+
|  2|    5|   5|     b|
|  3|   67|   5|     c|
|  4|   44|   5|     d|
|  5|   55|   5|     e|
+---+-----+----+------+


# a new record is added, one removed and one updated
handle_merge('dummy_delta', d5, current_dt=5)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|     false|       5|
|  2|    5|     b|   1|      true|    null|
|  3|   66|     c|   3|     false|       5|
|  3|    6|     c|   1|     false|       2|
|  4|   44|     d|   4|      true|    null|
|  5|   55|     e|   5|      true|    null|
+---+-----+------+----+----------+--------+

this is not working as expected. The old record is Invalidated. Notice however, that the new record is not written (opening a new fresh row starting from that date)

When re-executing for a 2nd time:

handle_merge('dummy_delta', d5, current_dt=5)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|     false|       5|
|  2|    5|     b|   1|      true|    null|
|  3|   67|     c|   5|      true|    null|
|  3|   66|     c|   3|     false|       5|
|  3|    6|     c|   1|     false|       2|
|  4|   44|     d|   4|      true|    null|
|  5|   55|     e|   5|      true|    null|
+---+-----+------+----+----------+--------+

the 67 appears. How can I fix the MERGE INTO conditions so the value appears straight away? Is there perhaps some additional MATCH condition required?

How can I specify a whenNotMatchedInsert for whenMatchedUpdate to insert the fresh row after invalidating the current?

@scottsand-db
Copy link
Collaborator

This feature seems neat! Unfortunately, we are already fully booked in our H2 roadmap, and since you've indicated you are unable to contribute a solution, there's not much we can do right now.

When we begin our 2023 roadmap planning, please bring up this issue again so that we can get the community's input on how desired it is!

Cheers.

@seddonm1
Copy link

FYI I actually did all the work a couple of years ago and have a branch with this implemented for the Scala API only here: https://github.com/tripl-ai/delta

At the time the PR was rejected to this repo but if you are motivated the code could be updated for latest Delta (not by me).

@johanl-db
Copy link
Collaborator

johanl-db commented Dec 6, 2022

I created a design doc to implement support for WHEN NOT MATCHED BY SOURCE clauses: [Design Doc] WHEN NOT MATCHED BY SOURCE. This enables selectively updating or deleting target rows that have no matches in the source table based on the merge condition.

API

A new whenNotMatchedBySource(condition) method is added to the Delta Table API, similar to the existing whenMatched(condition) and whenNotMatched(condition) methods. It returns a builder that allows specifying the action to apply using update(set) or delete(). whenNotMatchedBySource(condition) accepts an optional condition that needs to be satisfied for the corresponding action to be applied.

Usage example:

targetDeltaTable.as(“t”)
  .merge(sourceTable.as(“s”), condition = “t.key = s.key”)
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .whenNotMatchedBySource(condition = “t.value > 0”).update(set = “t.value = 0”)
  .whenNotMatchedBySource().delete()

This merge invocation will:

  • update all target rows that have a match in the source table using the source value.
  • Insert all source rows that have no match in the target into the target table.
  • Update all target rows that have no match in the source if t.value is strictly positive, otherwise delete the target row.

More details on the API and the implementation proposal can be found in the design doc. The SQL API will be shipped with Spark 3.4, see apache/spark#38400.

Project Plan

Task Description Status PR
Delta API Scala Support Implement support for the clause in Delta using the Scala DeltaTable API. Done #1511
Delta API Python Support Implement support for the clause in Delta using the Python API. Done #1533
SQL Support After Spark 3.4 release / upgrading to Spark 3.4, make necessary changes to support the clause in SQL. Done #1740

scottsand-db pushed a commit that referenced this issue Jan 5, 2023
… BY SOURCE clause in merge commands.

Support for the clause was introduced in #1511 using the Scala Delta Table API, this patch extends the Python API to support the new clause.

See corresponding feature request: #1364

Adding python tests covering WHEN NOT MATCHED BY SOURCE to test_deltatable.py.

The extended API for NOT MATCHED BY SOURCE mirrors existing clauses (MATCHED/NOT MATCHED).
Usage:
```
        dt.merge(source, "key = k")
            .whenNotMatchedBySourceDelete(condition="value > 0")
            .whenNotMatchedBySourceUpdate(set={"value": "value + 0"})
            .execute()
```

Closes #1533

GitOrigin-RevId: 76c7aea481fdbbf47af36ef7251ed555749954ac
@geoHeil
Copy link
Author

geoHeil commented Mar 29, 2023

solved by #1511

@scottsand-db
Copy link
Collaborator

Just posting for visibility, from #1511: Support for WHEN NOT MATCHED BY SOURCE using SQL will be available with Spark 3.4 release and python support will follow up in a different PR.

@allisonport-db
Copy link
Collaborator

SQL support is in #1740

@allisonport-db allisonport-db added this to the 2.4.0 milestone May 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants