Важное замечание. Программное обеспечение Delta Lake и кастомизации для него, предлагаемые в этом репозитории, не являются частью сервиса Yandex Data Processing, и не сопровождаются командой разработки и службой поддержки этого сервиса.
Delta Lake - программное обеспечение с открытым исходным кодом, расширяющее возможности платформы Apache Spark оптимизированным слоем хранения данных в формате таблиц с поддержкой ACID-транзакций, масштабируемой обработкой метаданных и управлением версиями данных.
Delta Lake решает задачу обновления данных в аналитических таблицах, хранимых в виде набора файлов в формате Parquet в HDFS или S3-совместимом хранилище. Таблица Delta Lake может одновременно использоваться пакетными запросами, а также в качестве источника и приемника операций потоковой передачи данных.
Таблицы Delta Lake удобно регистрировать в Hive Metastore, что позволяет работать в парадигме логических баз данных (или схем) и таблиц, без необходимости явно управлять размещением файлов. В среде Yandex Data Proc следует использовать внешний по отношению к рабочим кластерам сервис Hive Metastore, например в форме управляемых экземпляров Metastore.
Important
Использование внешнего Hive Metastore необходимо для обеспечения сохранности метаданных. Для исполнения рабочей нагрузки часто используются временные кластера DataProc, в которых выполненные настройки, а также данные, записанные на файловую систему HDFS или в сервис Hive Metastore, расположенный непоредственно на вычислительном кластере, теряются при удалении кластера. Для постоянных кластеров YDP операция пересоздания кластера является достаточно частой, и требуется, например, для обновления версии образа программного обеспечения или для переноса кластера в другую зону доступности.
Для использования развёрнутых внешних экземпляров Metastore необходимо настроить на уровне кластера либо задания следующие свойства Spark:
spark.hive.metastore.uris
, установленное в значениеthrift://Хост:9083
, хдеХост
- FDQN-имя либо IP-адрес сервиса Hive Metastore, а9083
- используемый по умолчанию номер порта сервиса;spark.sql.catalogImplementation
, установленное в значениеhive
. Альтернативой установке этого свойства является явный вызов операцииSparkSession.builder.enableHiveSupport()
при настройке контекста Spark.
Кроме того, для установки пути по умолчанию для размещения файлов Delta Lake в заданный путь внутри бакета объектного хранилища необходимо настроить следующее свойство:
spark.sql.warehouse.dir
, установленное в значениеs3a://Бакет/Путь
, гдеБакет
- используемое имя бакета объектного хранилища, аПуть
- относительный путь внутри бакета (например, значениеwarehouse
).
Версии 2.0.x образов YDP основаны на Apache Spark 3.0.3. В соответствии с официальной таблицей совместимости, в такой конфигурации может использоваться Delta Lake версии 0.8.0.
Основным ограничением этой версии Delta Lake является запрет на конкурентную модификацию таблиц в S3-совместимом хранилище (см. примечание в официальной документации). Это ограничение означает, что при работе с таблицами Delta Lake необходимо координировать операции записи таким образом, чтобы исключить одновременную модификацию данных в одной таблице из разных кластеров Data Proc, а также из разных заданий Spark в рамках одного кластера. Нарушение этого требования может привести к неожиданной потере хранимой в таблице информации.
Для использования Delta Lake в сочетании с YDP 2.0 достаточно в зависимостях заданий добавить библиотеку delta-core_2.12-0.8.0.jar
, а также установить следующие свойства Spark:
spark.sql.extensions
в значениеio.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog
в значениеorg.apache.spark.sql.delta.catalog.DeltaCatalog
Библиотека delta-core_2.12-0.8.0.jar
может быть добавлена в зависимости заданий любым из следующих способов:
- через установку свойства
spark.jars
для конкретного задания либо для кластера в целом, значением является URL файла библиотеки, файл должен быть доступен по указанному URL; - через установку свойства
spark.jars.packages
в значениеio.delta:delta-core_2.12:0.8.0
, должен быть настроен сетевой доступ в Maven Central в используемой сетевой группе безопасности, либо сконфигурирован альтернативный репозиторий Maven; - через установку свойств
spark.driver.extraClassPath
иspark.executor.extraClassPath
в полный путь к файлу библиотеки. Файл должен быть скопирован на все узлы кластера вручную либо с использованием скриптов инициализации.
Если все данные заданий используют формат хранения Delta Lake, то настройка S3A Committers не требуется, поскольку Delta Lake реализует собственный алгоритм управления записью в S3-совместимые хранилища, функционально эквивалентный S3A Committers. Задание может одновременно работать с таблицами Delta Lake и "обычными" данными Spark в форматах Parquet, ORC, JSON или CSV; в этом случае для эффективной записи "обычных" данных необходимо использовать S3A Committers.
Пример запуска и использования сессии Spark SQL для работы с таблицами Delta Lake:
$ spark-sql --executor-memory 20g --executor-cores 4 \
--conf spark.executor.heartbeatInterval=10s \
--conf spark.jars.packages=io.delta:delta-core_2.12:0.8.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
...
Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d1295d4c-d10d-464c-aa11-e645cedd5da9;1.0
confs: [default]
found io.delta#delta-core_2.12;0.8.0 in central
found org.antlr#antlr4;4.7 in central
...
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/0.8.0/delta-core_2.12-0.8.0.jar ...
[SUCCESSFUL ] io.delta#delta-core_2.12;0.8.0!delta-core_2.12.jar (168ms)
...
23/02/26 14:57:37 INFO SharedState: Warehouse path is 's3a://dproc-wh/wh'.
23/02/26 14:57:38 INFO SessionState: Created HDFS directory: /tmp/hive/ubuntu
23/02/26 14:57:38 INFO SessionState: Created local directory: /tmp/ubuntu
23/02/26 14:57:38 INFO SessionState: Created HDFS directory: /tmp/hive/ubuntu/8e4fa862-2e74-4423-ad8a-241082ec2c88
23/02/26 14:57:38 INFO SessionState: Created local directory: /tmp/ubuntu/8e4fa862-2e74-4423-ad8a-241082ec2c88
23/02/26 14:57:38 INFO SessionState: Created HDFS directory: /tmp/hive/ubuntu/8e4fa862-2e74-4423-ad8a-241082ec2c88/_tmp_space.db
23/02/26 14:57:38 INFO SparkContext: Running Spark version 3.0.3
...
23/02/26 14:57:59 INFO metastore: Connected to metastore.
Spark master: yarn, Application Id: application_1677423247688_0004
23/02/26 14:58:00 INFO SparkSQLCLIDriver: Spark master: yarn, Application Id: application_1677423247688_0004
spark-sql> CREATE DATABASE testdelta;
Time taken: 4.248 seconds
spark-sql> USE testdelta;
Time taken: 0.056 seconds
spark-sql> CREATE TABLE tab1(a INTEGER NOT NULL, b VARCHAR(100)) USING DELTA;
23/02/26 15:04:07 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
...
23/02/26 15:04:09 INFO CreateDeltaTableCommand: Table is path-based table: false. Update catalog with mode: Create
23/02/26 15:04:09 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `testdelta`.`tab1` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
Time taken: 3.286 seconds
spark-sql> INSERT INTO tab1 VALUES (1,'One'), (2,'Two'), (3,'Three');
...
Time taken: 3.684 seconds
spark-sql> UPDATE tab1 SET b=b || ' ** ' || CAST(a AS VARCHAR(10));
...
Time taken: 3.803 seconds
spark-sql> SELECT * FROM tab1;
3 Three ** 3
2 Two ** 2
1 One ** 1
Time taken: 1.166 seconds, Fetched 3 row(s)
spark-sql>
При настройке перечисленных выше свойств Spark для использования Delta Lake на уровне кластера Data Proc, работа с таблицами Delta Lake может производиться через Spark Thrift Server, включение которого осуществляется свойством dataproc:hive.thrift.impl=spark
, как указано в документации Yandex Data Processing. Для подключения к Spark Thrift Server может использоваться любая SQL IDE, совместипая с протоколом Thrift, например DBeaver.
Образы YDP версии 2.1.x комплектуются компонентами Apache Spark версии 3.2.1 (образы до версии 2.1.3 включительно) и 3.3.2 (образы версии 2.1.4 и более поздние). Применительно к Delta Lake:
- Spark 3.2.1 (Data Proc 2.1.0-2.1.3) совместим с Delta Lake версии 2.0.2;
- Spark 3.3.2 (Data Proc 2.1.4+) совместим с Delta Lake версии 2.3.0.
Основными преимуществами Delta Lake 2.x, по сравнению с версией 0.8.0, является:
- поддержка мультикластерного режима, обеспечивающего автоматическую координацию изменений данных в одной таблице из разных заданий Spark и кластеров Data Proc;
- функция идемпотентных операций модификации данных, позволяющая решить задачу однократного применения изменений при потоковой обработке данных;
- функция Change Data Feed для таблиц Delta Lake, обеспечивающая отслеживание изменений в данных;
- функция Z-Ordering, реализующая многомерную кластеризацию таблиц Delta Lake для ускорения запросов с ограничениями на колонки, используемые для кластеризации;
- поддержка динамической перезаписи партиций, dynamic partition overwrite;
- автоматизированная компактификация мелких файлов в более крупные для увеличения производительности запросов;
- возможность отката таблицы к предыдущему состоянию.
При использовании однокластерного режима настройки для использования Delta Lake версии 2.0.2 аналогичны приведённым выше для версии 0.8.0. При настройке зависимостей необходимо учитывать следующие изменения в DeltaLake:
- начиная с DeltaLake 1.2, из библиотеки
delta-core
были выделены дополнительные библиотекиdelta-storage
иdelta-storage-s3-dynamodb
; - начиная с DeltaLake 3.0, библиотека
delta-core
была переименована вdelta-spark
.
Состав необходимых зависимостей приведён в таблице ниже.
Мультикластерный режим Delta Lake использует дополнительный ресурс - базу данных DynamoDB - для координации доступа к таблицам Delta Lake перед внесением в них изменений. В среде Yandex Cloud мультикластерный режим Delta Lake можно использовать, задействовав управляемый сервис YDB в качестве прозрачной замены DynamoDB. Для использования YDB вместо DynamoDB требуется специальное расширение DeltaLake, которое подготовлено и доступно для скачивания.
Примечание. Для выполнения приведённых ниже примеров команд требуется наличие установленного и настроенного YC CLI, а также утилита
jq
для обработки данных в формате JSON.
Если вы хотите задействовать мультикластерный режим Delta Lake, выполните перечисленные ниже шаги для подготовки среды выполнения:
-
Создайте бессерверную базу данных YDB, которая используется как замена DynamoDB для координации доступа к таблицам Delta Lake. Пример команд YC CLI по созданию базы данных, в результате выполнения которых информация о созданной базе данных будет сохранена в файле
info-delta1.json
:yc ydb database create delta1 --serverless yc ydb database get delta1 --format json >info-delta1.json
-
Создайте сервисную учётную запись для доступа к базе данных YDB, и настройте для неё права доступа путём присвоения роли
ydb.editor
в каталоге Облака, содержащем созданную ранее базу данных YDB:yc iam service-account create delta1-sa1 cur_profile=`yc config profile list | grep -E ' ACTIVE$' | (read x y && echo $x)` cur_folder=`yc config profile get ${cur_profile} | grep -E '^folder-id: ' | (read x y && echo $y)` yc resource-manager folder add-access-binding --id ${cur_folder} --service-account-name delta1-sa1 --role ydb.editor
-
Создайте статический ключ для подключения от имени созданной сервисной учётной записи, временно сохранив данные ключа в файле
info-sa1key.json
:yc iam access-key create --service-account-name delta1-sa1 --format json > info-sa1key.json
-
Поместите данные созданного статического ключа в запись сервиса Yandex Lockbox, установив атрибут
key-id
в идентификатор ключа, а атрибутkey-secret
в его секретную часть. Сохраните идентификационную информацию созданной записи в файлеinfo-lb1.json
:key_id=`cat info-sa1key.json | jq -r .access_key.key_id` key_val=`cat info-sa1key.json | jq -r .secret` yc lockbox secret create --name delta1-lb1 \ --payload '[{"key": "key-id", "text_value": "'${key_id}'"}, {"key": "key-secret", "text_value": "'${key_val}'"}]' \ --format json > info-lb1.json
-
Запишите идентификатор записи Lockbox для последующего использования:
lb_id=`cat info-lb1.json | jq -r .id`
-
Разрешите сервисным учётным записям, используемым при работе кластеров Data Proc (в примере ниже -
dp1
), доступ к данным Lockbox (на уровне каталога в целом либо на уровне конкретной записи Lockbox):yc resource-manager folder add-access-binding --id ${cur_folder} --service-account-name dp1 --role lockbox.payloadViewer
-
Необходимые библиотеки Delta Lake 2.x, а также классы-надстройки для подключения к YDB доступны в виде собранных архивов и исходных кодов:
- для DeltaLake 2.0.2 архив, исходный код;
- для DeltaLake 2.3.0 архив, исходный код;
- для DeltaLake 2.4.0 архив, исходный код;
- для DeltaLake 3.2.0 архив, исходный код.
Собранный архив необходимо разместить в бакете Yandex Object Storage. Права доступа к бакету должны обеспечивать возможность чтения файла архива сервисными учётными записями кластеров Yandex Data Processing.
-
Установите необходимые настройки для функционирования Delta Lake, на уровне отдельного задания либо на уровне кластера Yandex Data Processing:
- зависимость от архива из пункта 7 выше, через свойство
spark.jars
, либо через сочетание свойствspark.executor.extraClassPath
иspark.driver.extraClassPath
; spark.sql.extensions
в значениеio.delta.sql.DeltaSparkSessionExtension
;spark.sql.catalog.spark_catalog
в значениеorg.apache.spark.sql.delta.catalog.YcDeltaCatalog
;spark.delta.logStore.s3a.impl
в значениеru.yandex.cloud.custom.delta.YcS3YdbLogStore
;spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint
в значение точки подключения к Document API YDB для созданной на шаге (1) базы данных (значение поляdocument_api_endpoint
из файлаinfo-delta1.json
);spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox
в значение идентификатора элемента Lockbox, полученного на шаге (5).
- зависимость от архива из пункта 7 выше, через свойство
-
Проверьте работоспособность Delta Lake, создав тестовую таблицу аналогично показанному ниже примеру:
$ spark-sql --executor-memory 20g --executor-cores 4 \ --jars yc-delta23-multi-dp21-1.2-fatjar.jar \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.YcDeltaCatalog \ --conf spark.delta.logStore.s3a.impl=ru.yandex.cloud.custom.delta.YcS3YdbLogStore \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint=https://docapi.serverless.yandexcloud.net/ru-central1/b1gfvslmokutuvt2g019/etngt3b6eh9qfc80vt54/ \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox=e6qr20sbgn3ckpalh54p ... spark-sql> CREATE DATABASE testdelta; ... spark-sql> USE testdelta; ... spark-sql> CREATE TABLE tab1(a INTEGER NOT NULL, b VARCHAR(100)) USING DELTA; ... spark-sql> INSERT INTO tab1 VALUES (1,'One'), (2,'Two'), (3,'Three'); ... spark-sql> SELECT * FROM tab1; ...
Yandex DataSphere предоставляет среду для работы с Python ноутбуками, и поддерживает интеграцию с Yandex Data Processing через коннектор Spark. Для использования DeltaLake в заданиях Spark, управляемых из ноутбука DataSphere через коннектор Spark, описанную в предыдущем разделе процедуру настройки необходимо модифицировать:
- сервисная учетная запись, создаваемая на шаге (2), должна обладать полномочиями для чтения и записи S3-бакета, связанного с кластером Yandex Data Processing, а также других S3-бакетов, в которых хранятся таблицы Delta Lake;
- вместо шагов (4), (5) и (6) по размещению ключа доступа в Yandex Lockbox, необходимо указать идентификатор и секретную часть созданного ключа при настройке коннектора Spark в среде DataSphere;
- на шаге (8) зависимости следует указывать строго в свойстве
spark.jars
, и обязательно в виде ссылки на Object Storage в форматеs3a://BUCKET/path/file1.jar
; - также на шаге (8) не следует указывать значение свойства
spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox
.
В результате доступ к YDB будет организован с использованием ключа, установленного в настройках коннектора Spark, вместо чтения из ресурса в Yandex Lockbox. В остальном логика работы с DeltaLake в мультикластерном режиме остается неизменной.
Приведем также типичные свойства, которые должны быть установлены на уровне коннектора Spark в среде DataSphere для корректной работы:
spark.hadoop.fs.s3a.fast.upload.buffer=bytebuffer
- требуется для корректной инициализации коннектора Spark;spark.sql.catalogImplementation=hive
- необходимо для работы с внешним Hive Metastore;spark.sql.repl.eagerEval.enabled=true
- выводит результаты вычислений Spark в читаемом формате для отображения в ноутбуке.
Документация на DeltaLake содержит ряд рекомендаций по настройкам мультикластерного режима для производственных конфигураций. Ниже приведён вариант этих рекомендаций, адаптированный для работы в среде Yandex Cloud.
По умолчанию бессерверная база данных YDB создаётся с ограниченной пропускной способностью (например, 10 Request Units в секунду). При интенсивной работе с DeltaLake эта пропускная способность может оказаться недостаточной. Рекомендуется контролировать достаточность пропускной способности по графику "Document API units overflow" в разделе мониторинга YDB, и при необходимости увеличить установленный предел пропускной способности.
Важно помнить, что на все базы данных YDB, созданные в определённом облаке, действует суммарное ограничение на пропускную способность, установленное в виде квоты на уровне облака. Соответствующее ограничение также может потребовать корректировки для исключения задержек при доступе со стороны DeltaLake к YDB.
При работе DeltaLake в таблице YDB и в используемых бакетах объектного хранилища могут накапливаться старые неиспользуемые версии метаданных. Для повышения эффективности работы DeltaLake рекомендуется наладить регулярную автоматическую очистку этих ненужных метаданных.
Скрипт очистки мусора реализован в виде облачной (бессерверной) функции на языке Python. Для работы функции необходима сервисная учётная запись с правами, позволяющими получить доступ на чтение и запись в используемую базу данных YDB, а также в используемый бакет (или бакеты) объектного хранилища.
Код скрипта приведён в каталоге cf-cleanup. Для установки скрипта подготовлены файлы командных сценариев, использующие YC CLI:
- ddb-maint-config.sh - параметры установки;
- ddb-maint-setup.sh - сценарий установки;
- ddb-maint-remove.sh - сценарий удаления.
Скрипт очистки устанавливается в виде двух облачных функций:
- функция очистки данных в таблице YDB, запускаемая автоматически один раз в час;
- функция очистки данных в объектном хранилище, запускаемая автоматически один раз в сутки.
Время жизни записей метаданных регулируется параметром Spark spark.io.delta.storage.S3DynamoDBLogStore.ddb.ttl
, устанавливаемым в секундах. По умолчанию используется значение 86400
, соответствующее одним суткам. Фактическое время жизни для конкретной записи может оказаться больше, поскольку определяется моментом запуска скрипта очистки.
Для очистки временных файлов DeltaLake необходимо создать файл настроек и разместить его в объектном хранилище, указав полный путь к файлу настроек в переменной s3_prefix_file
(см. параметры установки в ddb-maint-config.sh). Каждая строка файла настроек определяет бакет, префикс для сканирования внутри бакета и режим сканирования в следующем формате: ИмяБакета Режим ПрефиксПути
. Поле "Режим" может принимать одно из следующих значений:
W
, Warehouse - путь хранения для нескольких баз данных;D
, Database - путь хранения одной базы данных;T
, Table - путь хранения одной конкретной таблицы.
В зависимости от установленного режима меняется ожидаемая структура пути размещения временных файлов DeltaLake.
Образы Yandex Data Processing версии 2.2.x комплектуются компонентами Apache Spark версии 3.5.0. Чтобы получить доступ к бета-версиям образов Yandex Data Processing версии 2.2, запросите его через службу технической поддержки Yandex Cloud. Использование Spark 3.5.0 в образах линейки 2.2.x позволяет задействовать DeltaLake версий 2.4.0 и 3.2.0, используя следующие дополнительные возможности:
- поддержка Deletion Vectors для более эффективного удаления отдельных записей;
- улучшения в мультикластерном режиме работы, повышающие стабильность выполнения конкурентных операций чтения и записи данных;
- поддержка Universal Format, позволяющего читать таблицы DeltaLake клиентами Iceberg и Hudi;
- доступ к источникам данных, использующим протокол Delta Sharing;
- поддержка Liquid Clustering как более эффективной замены классического партиционирования таблиц в стиле Apache Hive;
- многочисленные улучшения производительности.
Оператор OPTIMIZE
в Delta Lake 2.0.2 и выше запускает несколько параллельных заданий для объединения более мелких файлов в более крупные. Количество параллельных заданий регулируется свойством spark.databricks.delta.optimize.maxThreads
, по умолчанию используется значение 10
. При обработке очень больших таблиц для ускорения можно использовать значительно большие значения, например 100
или даже 1000
, если ресурсы кластера позволяют запустить такое количество параллельных операций.
Delta Lake поддерживает оператор CONVERT TO DELTA
для преобразования обычных таблиц Spark SQL в формат Delta Lake. При работе с партиционированными таблицами у этого оператора используется специальный вариант синтаксиса с указанием колонок партиционирования, как показано в примере ниже:
-- Пример запроса для преобразования обычной партиционированной таблицы в формат Delta Lake
CONVERT TO DELTA megatab_1g PARTITIONED BY (tv_year INT, tv_month INT);
По умолчанию Delta Lake хранит все изменения таблицы за период, определяемый параметром delta.logRetentionDuration
, устанавливаемым на уровне таблицы командой ALTER TABLE SET TBLPROPERTIES
(см. синтаксис для управления параметрами таблиц в документации Delta Lake). По умолчанию срок хранения истории изменений составляет 30 суток.
Иногда требуется принудительно очистить историю изменения таблицы, например после выполнения первоначальной загрузки данных с помощью сложного ETL/ELT процесса. Для этого можно воспользоваться следующей последовательностью операций:
-- Реорганизовать данные в таблице для повышения эффективности доступа
OPTIMIZE ИмяТаблицы;
-- Разрешить удаление всей истории изменений
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
-- Очистить историю изменений
VACUUM ИмяТаблицы RETAIN 0 HOURS;
В файле delta-sample.sql приведен набор команд, который может использоваться для проверки стабильности функционирования кластера Data Proc с библиотеками Delta Lake под нагрузкой, связанной с генерацией и записью значительного объёма данных.