Skip to content

Commit

Permalink
Add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Oct 15, 2023
1 parent a7f389a commit f3056b1
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/Storages/DataLakes/IcebergMetadataParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace ErrorCodes
template <typename Configuration, typename MetadataReadHelper>
struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
{
Poco::Logger * log = &Poco::Logger::get("IcebergMetadataParser");

/**
* Useful links:
* - https://iceberg.apache.org/spec/
Expand Down Expand Up @@ -299,7 +301,10 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);

if (status == 2)
keys.erase(file_path);
{
LOG_TEST(log, "Got delete file for {}", file_path);
chassert(!keys.contains(file_path));
}
else
keys.insert(file_path);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/helpers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import nats
import ssl
import meilisearch
import pyspark
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
Expand Down Expand Up @@ -631,6 +630,7 @@ def __init__(
logging.debug(f"Removed :{self.instances_dir}")

if with_spark:
import pyspark
# if you change packages, don't forget to update them in docker/test/integration/runner/dockerd-entrypoint.sh
(
pyspark.sql.SparkSession.builder.appName("spark_test")
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,57 @@ def test_types(started_cluster, format_version):
["e", "Nullable(Bool)"],
]
)


@pytest.mark.parametrize("format_version", ["1", "2"])
def test_delete_files(started_cluster, format_version):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_delete_files_" + format_version

write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode="overwrite",
format_version=format_version,
)

files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)

create_iceberg_table(instance, TABLE_NAME)

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100

spark.sql(f"DELETE FROM {TABLE_NAME} WHERE a >= 0")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 0
assert instance.contains_in_log("Got delete file for")

write_iceberg_from_df(
spark,
generate_data(spark, 100, 200),
TABLE_NAME,
mode="upsert",
format_version=format_version,
)

files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100

spark.sql(f"DELETE FROM {TABLE_NAME} WHERE a >= 150")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 50

0 comments on commit f3056b1

Please sign in to comment.