Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrong name for configuration of Repartition output data before write #487

Closed
harryshi10 opened this issue Aug 4, 2020 · 4 comments
Closed

Comments

@harryshi10
Copy link

hi there,

I'm on spark3.0.0 + io.delta:delta-core_2.12:0.7.0, and i'd like to active the Repartition output data before write feature described in https://docs.delta.io/latest/delta-update.html .

it is said to set spark.delta.merge.repartitionBeforeWrite to true can active this feature but as go through the source code and tried, it seems that only when set spark.databricks.delta.merge.repartitionBeforeWrite.enabled to true can make the output file number much less than before. is there some wrong with the name of this configuration?

here's the spark-defaults.conf i been use to start spark session

spark.jars.packages io.delta:delta-core_2.12:0.7.0
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.databricks.delta.merge.repartitionBeforeWrite.enabled true

and i'm using pyspark to run below code

from delta.tables import DeltaTable
import pyspark.sql.functions as F

df = spark.range(100).withColumn(
    "p1",
    F.col("id") % 3
).withColumn(
    "p2",
    F.col("id") % 3
)

df.repartition(1).write.format("delta").partitionBy(["p1"]).save("/data/xxx", mode="overwrite")

df_2 = spark.range(60, 15000).withColumn(
    "p1",
    F.col("id") % 3
).withColumn(
    "p2",
    F.col("id") % 5
)

delta_table = DeltaTable.forPath(spark, "/data/xxx")
table_opr = delta_table.alias("src_df").merge(
    df_2.alias("new_df"),
    "src_df.id = new_df.id and src_df.p1 = new_df.p1"
).whenMatchedUpdateAll().whenNotMatchedInsertAll()

table_opr.execute()

with spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true what i have is

active
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── p1=0
│   ├── part-00000-e851bcaf-c44b-42f9-8f85-d2332eb7ff22.c000.snappy.parquet
│   └── part-00005-cfd2738c-c074-40d3-a488-de840ae27b16.c000.snappy.parquet
├── p1=1
│   ├── part-00000-9fe61cf1-0e6a-475a-b64f-a34687d8cf21.c000.snappy.parquet
│   └── part-00069-4d7219ca-e7fc-4686-b237-00a51c298983.c000.snappy.parquet
└── p1=2
    ├── part-00000-413e874c-5b4c-43f5-8495-b8646ad28406.c000.snappy.parquet
    └── part-00128-4692808a-add8-4351-95fb-c26ebbd7e0d4.c000.snappy.parquet

without spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true what i have is

not_active
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── p1=0
│   ├── part-00000-029a8e87-b042-4e98-b46a-21ea6b88d335.c000.snappy.parquet
│   ├── part-00000-c20f43c9-1498-4d1d-8677-6ffe18b3e1e8.c000.snappy.parquet
│   ├── part-00001-d0724b2a-e244-40c9-b9b7-b76fd921246d.c000.snappy.parquet
│   ├── part-00002-86904f56-1c0f-420d-a4b6-e880ed4c1701.c000.snappy.parquet
│   ├── part-00003-e0bd2652-8904-4f7a-b4f7-b959596613ff.c000.snappy.parquet
│   ├── part-00004-5e27de42-e35f-486d-8119-f25516f5100d.c000.snappy.parquet
│   ├── part-00005-a2fba894-92de-4281-9f63-d32db5a4ad01.c000.snappy.parquet
│   ├── part-00006-f4f24bf5-bdb2-4320-add4-00f9d83c8bc4.c000.snappy.parquet
│   ├── part-00007-f339096d-3408-4714-8575-e0ed6e4e37f1.c000.snappy.parquet
│   ├── part-00008-2bff0161-7fe7-42c0-b4b5-ac27da2bcbed.c000.snappy.parquet
│   ├── part-00009-cdc2b506-7706-46dd-a2a2-55c3b4935d13.c000.snappy.parquet
│   ├── part-00010-a41c4a18-83f8-4345-bfc4-d2ea396dd0f0.c000.snappy.parquet
│   ├── part-00011-da1e5967-fca9-48c8-bb1b-454fdc189bec.c000.snappy.parquet
│   ├── part-00012-3ff91f04-dd21-48c0-a3a4-a257d186be96.c000.snappy.parquet
......
├── p1=1
│   ├── part-00000-0e363784-829d-4ae8-bd88-4d585eabfe17.c000.snappy.parquet
│   ├── part-00000-7b5f5a60-1dad-4561-b5f6-fc1ca28f3899.c000.snappy.parquet
│   ├── part-00001-b2cffca3-9d9c-4f6f-a67b-31086c93ba10.c000.snappy.parquet
│   ├── part-00002-201da08c-84ea-439c-8223-418f86f475f4.c000.snappy.parquet
│   ├── part-00003-9dca2f97-2288-4026-bd4f-682e4e21268c.c000.snappy.parquet
│   ├── part-00004-da0b26f7-6ba6-4ae4-9681-8f489f8c8c95.c000.snappy.parquet
│   ├── part-00005-91bb489b-25e2-467f-98d6-302b10cb9529.c000.snappy.parquet
│   ├── part-00006-70ff2af9-9a60-4060-b3c6-7241ef0e5e7f.c000.snappy.parquet
│   ├── part-00007-f9d00f26-4f18-40ed-98ae-abc090a3a231.c000.snappy.parquet
│   ├── part-00008-8fe193c1-eca9-4e86-8411-0699ebb23a60.c000.snappy.parquet
│   ├── part-00009-8c2c943c-a27a-4112-8947-19faa285020d.c000.snappy.parquet
│   ├── part-00010-96c98442-ea6d-436a-86cd-2bbf59aba37e.c000.snappy.parquet
│   ├── part-00011-da88d436-284c-4969-9c3d-ab19c4956bf1.c000.snappy.parquet
..........
└── p1=2
    ├── part-00000-8bc08e85-3c4e-4d3c-8043-0fbb86bef731.c000.snappy.parquet
    ├── part-00000-db295739-7519-4259-aba0-4e205a986c9a.c000.snappy.parquet
    ├── part-00001-7ea9c927-bd1b-43dd-b83b-3e433f5119f8.c000.snappy.parquet
    ├── part-00002-841266e8-232b-4883-adca-b16d70e990b3.c000.snappy.parquet
    ├── part-00003-956f487d-c9d1-4cf6-a52f-900a4f3b657a.c000.snappy.parquet
    ├── part-00004-34ddd4ad-ac60-4993-a94a-7e135a1b6bc7.c000.snappy.parquet
    ├── part-00005-dbc3d16a-20c3-467e-a011-14269f0c4d74.c000.snappy.parquet
    ├── part-00006-3ebed123-9c05-4b99-a012-3a2f3c0774fb.c000.snappy.parquet
    ├── part-00007-5ebc0ca2-457f-4e21-b41d-80104d25c697.c000.snappy.parquet
    ├── part-00008-044616ef-bb49-4e42-a4f7-0bf40d0c3438.c000.snappy.parquet
    ├── part-00009-1ebb6352-71a6-44fb-ab34-5136135760d7.c000.snappy.parquet
    ├── part-00010-4824e209-404a-404c-ba18-61120d6942fe.c000.snappy.parquet
    ├── part-00011-0321f387-dc36-4922-a6f6-871ec0559c65.c000.snappy.parquet
    ├── part-00012-a2074c8a-e3d6-463b-914b-1e9b725d9131.c000.snappy.parquet
    ├── part-00013-cefbb596-0e33-41ee-81ef-926b787ad9af.c000.snappy.parquet
    ├── part-00014-4750c7cd-6740-41da-84ee-a01ea7fa2dfb.c000.snappy.parquet
........

and only with spark.delta.merge.repartitionBeforeWrite=true without spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true what i have is the same when without spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true that ends up with tons of small parquet files

@harryshi10 harryshi10 changed the title configuration of Repartition output data before write wrong name of configuration of Repartition output data before write Aug 4, 2020
@harryshi10 harryshi10 changed the title wrong name of configuration of Repartition output data before write wrong name for configuration of Repartition output data before write Aug 4, 2020
@zsxwing
Copy link
Member

zsxwing commented Aug 4, 2020

Good catch. The correct config name should be spark.databricks.delta.merge.repartitionBeforeWrite.enabled. We will fix the doc.

@zsxwing
Copy link
Member

zsxwing commented Sep 2, 2020

Closing this. The doc has been fixed.

@zsxwing zsxwing closed this as completed Sep 2, 2020
@SteffenMangold
Copy link

SteffenMangold commented Jan 6, 2021

@zsxwing
Copy link
Member

zsxwing commented Jan 6, 2021

@SteffenMangold please check the latest doc: https://docs.delta.io/latest/delta-update.html#-merge-in-dedup We don't update these archived docs.

tdas pushed a commit to tdas/delta that referenced this issue Jun 6, 2023
* [FlinkSQL_PR_1] Flink Delta Sink - Table API UPDATED (delta-io#389)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* [FlinkSQL_PR_2] - SQL Support for Delta Source connector. (delta-io#487)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_3] - Delta catalog skeleton (delta-io#503)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_4] - Delta catalog - Interactions with DeltaLog. Create and get table. (delta-io#506)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_5] - Delta catalog - DDL option validation. (delta-io#509)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_6] - Delta catalog - alter table + tests. (delta-io#510)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_7] - Delta catalog - Restrict Delta Table factory to work only with Delta Catalog + tests. (delta-io#514)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_8] - Delta Catalog - DDL/Query hint validation + tests. (delta-io#520)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_9] - Delta Catalog - Adding Flink's Hive catalog as decorated catalog. (delta-io#524)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column. (delta-io#528)

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column.

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Scott Sandre <scott.sandre@databricks.com>

* [FlinkSQL_PR_11] - Delta Catalog - cache DeltaLog instances in DeltaCatalog. (delta-io#529)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_12] - UML diagrams. (delta-io#530)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_13] - Remove mergeSchema option from SQL API. (delta-io#531)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_14] - SQL examples. (delta-io#535)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* remove duplicate function after rebasing against master

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Co-authored-by: kristoffSC <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
tdas pushed a commit to tdas/delta that referenced this issue Jun 8, 2023
* [FlinkSQL_PR_1] Flink Delta Sink - Table API UPDATED (delta-io#389)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* [FlinkSQL_PR_2] - SQL Support for Delta Source connector. (delta-io#487)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_3] - Delta catalog skeleton (delta-io#503)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_4] - Delta catalog - Interactions with DeltaLog. Create and get table. (delta-io#506)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_5] - Delta catalog - DDL option validation. (delta-io#509)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_6] - Delta catalog - alter table + tests. (delta-io#510)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_7] - Delta catalog - Restrict Delta Table factory to work only with Delta Catalog + tests. (delta-io#514)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_8] - Delta Catalog - DDL/Query hint validation + tests. (delta-io#520)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_9] - Delta Catalog - Adding Flink's Hive catalog as decorated catalog. (delta-io#524)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column. (delta-io#528)

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column.

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Scott Sandre <scott.sandre@databricks.com>

* [FlinkSQL_PR_11] - Delta Catalog - cache DeltaLog instances in DeltaCatalog. (delta-io#529)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_12] - UML diagrams. (delta-io#530)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_13] - Remove mergeSchema option from SQL API. (delta-io#531)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_14] - SQL examples. (delta-io#535)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* remove duplicate function after rebasing against master

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Co-authored-by: kristoffSC <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants