-
Notifications
You must be signed in to change notification settings - Fork 1
Home
[TODO TOC]
This is the repository for Delta Lake Connectors. It includes the Delta Standalone Reader (DSR) library for querying Delta Lake metadata, as well as a connector to Apache Hive. Future connectors to other popular big-data engines will go here, too. Please refer to the main Delta Lake repository if you want to learn more about the Delta Lake project.
The DSR is a JVM library to read Delta Lake tables. Unlike https://github.com/delta-io/delta, this project doesn't use Spark to read tables and it has only a few transitive dependencies. It can be used by any application that cannot use a Spark cluster.
You can add the Delta Standalone Reader library as a dependency using your favorite build tool. Please note that the DSR expects packages hadoop-client
and parquet-hadoop
to be provided. Please see file build.sbt
for more details.
Scala 2.12:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.2.0-SNAPSHOT</version>
</dependency>
Scala 2.11:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.11</artifactId>
<version>0.2.0-SNAPSHOT</version>
</dependency>
libraryDependencies += "io.delta" %% "delta-standalone" % "0.2.0-SNAPSHOT"
The DSR provides a number of classes and entities with which to interact and use to query your metadata. A few of them are highlighted here and some of their key interfaces are mentioned. For more details, please see the official API docs at TODO.
Main class for programmatically interacting with metadata in a read-only manner.
- to create a
DeltaLog
, use theDeltaLog.forTable
method, passing in thedataPath
of the root location of table data - to get the current snapshot of the log, use the
snapshot
method - to get the latest snapshot with any new data files added to the log, use the
update
method - to get a snapshot at some historical state of the log, use the
getSnapshotForTimestampAsOf
orgetSnapshotForVersionAsOf
methods
An immutable snapshot of the state of the log at some delta version.
- to get a list of the metadata files stored in this snapshot, use the
getAllFiles
method - to read the actual data that the metadata files reference, use the
open
method. It returns an iterator over the rows of data located in the DeltaLog'sdataPath
In this example, Delta data has already been written to /tmp/delta_standalone_test/
using Apache Spark via the following commands:
scala> val table = "/tmp/delta_standalone_test/"
scala> for (i <- 0 to 2) {
| spark.range(i * 100, (i + 1) * 100)
| .map(x => (x, x % 5, s"foo-${x % 2}"))
| .toDF("c1", "c2", "c3")
| .write
| .mode("append")
| .format("delta")
| .save(table)
| }
scala> spark.read.format("delta").load(table).count
res3: Long = 300
scala> spark.read.format("delta").load(table).limit(5).show
+---+---+-----+
| c1| c2| c3|
+---+---+-----+
| 0| 0|foo-0|
| 1| 1|foo-1|
| 2| 2|foo-0|
| 3| 3|foo-1|
| 4| 4|foo-0|
+---+---+-----+
This produces the following files which we will read later:
$ ls /tmp/delta_standalone_test
_delta_log
part-00000-195768ae-bad8-4c53-b0c2-e900e0f3eaee-c000.snappy.parquet
part-00000-53c3c553-f74b-4384-b9b5-7aa45bc2291b-c000.snappy.parquet
part-00000-b9afbcf5-b90d-4f92-97fd-a2522aa2d4f6-c000.snappy.parquet
part-00001-1aa1f14f-bd7a-4b3f-800e-d7e608d75dbb-c000.snappy.parquet
part-00001-c0569730-5008-42fa-b6cb-5a152c133fde-c000.snappy.parquet
...
$ ls /tmp/delta_standalone_test/_delta_log/*.json
00000000000000000000.json
00000000000000000001.json
00000000000000000002.json
The following SBT project configuration is used:
// <project-root>/build.sbt
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.7.2",
"org.apache.parquet" % "parquet-hadoop" % "1.10.1",
"io.delta" %% "delta-standalone" % "0.2.0-SNAPSHOT"
)
// <project-root>/src/main/java/HelloWorld.java
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.data.RowRecord;
import org.apache.hadoop.conf.Configuration;
public class HelloWold {
public static void printSnapshotDetails(String title, Snapshot snapshot) {
System.out.println("===== " + title + " =====");
System.out.println("version: " + snapshot.getVersion());
System.out.println("number data files: " + snapshot.getNumOfFiles());
System.out.println("data files:");
snapshot.getAllFiles().forEach(file -> System.out.println(file.getPath()));
CloseableIterator<RowRecord> iter = snapshot.open();
System.out.println("\ndata rows:");
RowRecord row = null;
int numRows = 0;
while (iter.hasNext()) {
row = iter.next();
numRows++;
long c1 = row.getLong("c1");
long c2 = row.getLong("c2");
String c3 = row.getString("c3");
System.out.println(c1 + " " + c2 + " " + c3);
}
System.out.println("\nnumber rows: " + numRows);
System.out.println("data schema:");
System.out.println(row.getSchema().getTreeString());
System.out.println("\n");
}
public static void main(String[] args) {
DeltaLog log = DeltaLog.forTable(new Configuration(), "/tmp/delta_standalone_test/");
printSnapshotDetails("current snapshot", log.snapshot());
printSnapshotDetails("version 0 snapshot", log.getSnapshotForVersionAsOf(0));
printSnapshotDetails("version 1 snapshot", log.getSnapshotForVersionAsOf(1));
printSnapshotDetails("version 2 snapshot", log.getSnapshotForVersionAsOf(2));
}
}
===== current snapshot =====
version: 2
number data files: 48
data files:
part-00012-3e601227-c42a-4aa2-8d4e-47b346e3768b-c000.snappy.parquet
part-00001-e6bc1a0d-5165-40e4-bfea-04287348e239-c000.snappy.parquet
part-00011-b6e0f1da-083f-43eb-9eda-ad7a77727844-c000.snappy.parquet
part-00003-72411df0-2f8f-4ff2-8679-31984d378454-c000.snappy.parquet
part-00003-d24b9cae-1bbd-4a3d-9c80-c835ee2839da-c000.snappy.parquet
...
data rows:
175 0 foo-1
176 1 foo-0
177 2 foo-1
178 3 foo-0
179 4 foo-1
...
number rows: 300
data schema:
root
|-- c1: long (nullable = true)
|-- c2: long (nullable = true)
|-- c3: string (nullable = true)
===== version 0 snapshot =====
version: 0
number data files: 16
data files:
part-00011-b6e0f1da-083f-43eb-9eda-ad7a77727844-c000.snappy.parquet
part-00003-d24b9cae-1bbd-4a3d-9c80-c835ee2839da-c000.snappy.parquet
part-00005-b0136635-773d-47ba-8e79-650831e5ba59-c000.snappy.parquet
part-00001-c0569730-5008-42fa-b6cb-5a152c133fde-c000.snappy.parquet
part-00007-49af50ad-a270-4c55-9b33-82127ef21943-c000.snappy.parquet
...
data rows:
68 3 foo-0
69 4 foo-1
70 0 foo-0
71 1 foo-1
72 2 foo-0
...
number rows: 100
data schema:
root
|-- c1: long (nullable = true)
|-- c2: long (nullable = true)
|-- c3: string (nullable = true)
===== version 1 snapshot =====
version: 1
number data files: 32
data files:
part-00012-3e601227-c42a-4aa2-8d4e-47b346e3768b-c000.snappy.parquet
part-00011-b6e0f1da-083f-43eb-9eda-ad7a77727844-c000.snappy.parquet
part-00003-72411df0-2f8f-4ff2-8679-31984d378454-c000.snappy.parquet
part-00003-d24b9cae-1bbd-4a3d-9c80-c835ee2839da-c000.snappy.parquet
part-00005-b0136635-773d-47ba-8e79-650831e5ba59-c000.snappy.parquet
...
data rows:
175 0 foo-1
176 1 foo-0
177 2 foo-1
178 3 foo-0
179 4 foo-1
...
number rows: 200
data schema:
root
|-- c1: long (nullable = true)
|-- c2: long (nullable = true)
|-- c3: string (nullable = true)
===== version 2 snapshot =====
version: 2
number data files: 48
data files:
part-00012-3e601227-c42a-4aa2-8d4e-47b346e3768b-c000.snappy.parquet
part-00001-e6bc1a0d-5165-40e4-bfea-04287348e239-c000.snappy.parquet
part-00011-b6e0f1da-083f-43eb-9eda-ad7a77727844-c000.snappy.parquet
part-00003-72411df0-2f8f-4ff2-8679-31984d378454-c000.snappy.parquet
part-00003-d24b9cae-1bbd-4a3d-9c80-c835ee2839da-c000.snappy.parquet
...
data rows:
175 0 foo-1
176 1 foo-0
177 2 foo-1
178 3 foo-0
179 4 foo-1
...
number rows: 300
data schema:
root
|-- c1: long (nullable = true)
|-- c2: long (nullable = true)
|-- c3: string (nullable = true)