Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.nio.file.FileAlreadyExistsException
import java.util.UUID
import java.util.regex.Pattern
import org.projectnessie.client.{NessieClient, NessieConfigConstants}
import org.projectnessie.error.NessieNotFoundException
import org.projectnessie.model.{CommitMeta, ContentsKey, DeltaLakeTable, ImmutableDeltaLakeTable, ImmutableOperations, Operations, Reference}
import org.projectnessie.error.{NessieConflictException, NessieNotFoundException}
import org.projectnessie.model.{CommitMeta, ContentsKey, DeltaLakeTable, ImmutableDeltaLakeTable, ImmutableOperations, Reference}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand All @@ -35,6 +35,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.projectnessie.model.Operation.Put

import java.util
import scala.collection.JavaConverters._
import scala.util.Try

Expand All @@ -51,11 +52,17 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
NessieClient.builder().fromConfig(c => hadoopConf.get(c)).build()
}

private def getOrCreate(): Reference = {
/**
* Keeps a mapping of reference name to current hash.
*/
private val referenceMap: util.Map[String, Reference] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a java map and not a scala map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just need a mutable map here - and using a Java HashMap felt to be the easiest way to go.

val requestedRef = hadoopConf.get(NessieConfigConstants.CONF_NESSIE_REF)

try {
Option(requestedRef).map(client.getTreeApi.getReferenceByName(_)).getOrElse(client.getTreeApi.getDefaultBranch)
val ref = Option(requestedRef).map(client.getTreeApi.getReferenceByName(_)).getOrElse(client.getTreeApi.getDefaultBranch)
val map: util.Map[String, Reference] = new util.HashMap[String, Reference]
map.put(requestedRef, ref)
map
} catch {
case ex: NessieNotFoundException =>
if (requestedRef != null) throw new IllegalArgumentException(s"Nessie ref $requestedRef provided " +
Expand All @@ -65,7 +72,25 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
}
}

private var reference: Reference = getOrCreate()
private def configuredRef(): Reference = {
val refName = hadoopConf.get(NessieConfigConstants.CONF_NESSIE_REF)
referenceByName(refName)
}

private def referenceByName(refName: String): Reference = {
var ref = referenceMap.get(refName)
if (ref == null) {
ref = client.getTreeApi.getReferenceByName(refName)
referenceMap.put(refName, ref)
}
ref
}

private def updateReference(ref: Reference): Unit = {
referenceMap.put(ref.getName, ref)
}

private def refreshReference(refName: String): Unit = referenceMap.remove(refName)

override def listFrom(path: Path): Iterator[FileStatus] = {
throw new UnsupportedOperationException("listFrom from Nessie does not work.")
Expand All @@ -89,7 +114,7 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)

override def write(path: Path, actions: Iterator[String], overwrite: Boolean = false): Unit = {
if (path.getName.equals("_last_checkpoint")) {
commit(path, reference.getName, reference.getHash, lastCheckpoint = actions.mkString)
commit(path, configuredRef().getName, configuredRef().getHash, lastCheckpoint = actions.mkString)
return
}
val parent = path.getParent
Expand Down Expand Up @@ -148,18 +173,32 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
}

private def commit(path: Path, ref: String, hash: String, message: String = "delta commit", lastCheckpoint: String = null): Boolean = {
val targetRef = if (ref == null) reference.getName else ref
val targetHash = if (hash == null) reference.getHash else hash
val table = updateDeltaTable(path, targetRef, lastCheckpoint)
val put = Put.of(pathToKey(path.getParent), table)
val meta = CommitMeta.builder()
.message(message)
.putProperties("spark.app.id", sparkConf.get("spark.app.id"))
.putProperties("application.type", "delta")
.build()
val op = ImmutableOperations.builder().addOperations(put).commitMeta(meta).build()
client.getTreeApi.commitMultipleOperations(targetRef, targetHash, op)
reference = client.getTreeApi.getReferenceByName(reference.getName)
// if no expected-hash is given and the commit runs into a conflict, let the operation retry once
var retries = if (hash == null) 1 else 0
while (true) {
val targetRef = if (ref == null) configuredRef().getName else ref
val targetHash = if (hash == null) referenceByName(targetRef).getHash else hash
val table = updateDeltaTable(path, targetRef, lastCheckpoint)
val put = Put.of(pathToKey(path.getParent), table)
val meta = CommitMeta.builder()
.message(message)
.putProperties("spark.app.id", sparkConf.get("spark.app.id"))
.putProperties("application.type", "delta")
.build()
val op = ImmutableOperations.builder().addOperations(put).commitMeta(meta).build()
try {
val updated : Reference = client.getTreeApi.commitMultipleOperations(targetRef, targetHash, op)
updateReference(if (updated != null) updated else client.getTreeApi.getReferenceByName(targetRef))
return true
} catch {
case ex: NessieConflictException =>
refreshReference(targetRef)
if (retries <= 0) {
throw ex
}
retries = retries - 1
}
}
true
}

Expand Down Expand Up @@ -252,7 +291,7 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
} else if (ref != null) {
name = ref
} else {
name = reference.getName
name = configuredRef().getName
}

val currentTable = getTable(new Path(tableName).getParent, name)
Expand All @@ -270,7 +309,7 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
.sortBy(_.version)
val maxExpected: Option[Long] = if (currentPath.nonEmpty) Some(currentPath.map(extractVersion).max) else None
val maxFound: Option[Long] = if (filteredFiles.nonEmpty) Some(filteredFiles.map(_.version).max) else None
require(maxFound.getOrElse(0L) == maxExpected.getOrElse(0L))
require(maxFound.getOrElse(0L) == maxExpected.getOrElse(0L), s"maxFound(${maxFound.getOrElse(0L)}) != maxExpected(${maxExpected.getOrElse(0L)})")
if (filteredFiles.map(_.fileType).count(_ == DeltaFileType.CHECKPOINT) == filteredFiles.length) {
(filteredFiles ++ Seq(emptyCheckpoint(requestedVersion, filteredFiles.head))).iterator
} else filteredFiles.iterator
Expand All @@ -294,7 +333,7 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)

override def read(path: Path): Seq[String] = {
if (path.getName.equals("_last_checkpoint")) {
val table = getTable(path.getParent, reference.getName)
val table = getTable(path.getParent, configuredRef().getName)
val data = table.map(_.getLastCheckpoint).getOrElse(throw new FileNotFoundException())
if (data == null) throw new FileNotFoundException()
Seq(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.projectnessie.deltalake;

import static org.projectnessie.client.NessieConfigConstants.CONF_NESSIE_REF;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -32,8 +34,12 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.io.TempDir;
import org.projectnessie.client.tests.AbstractSparkTest;
import org.projectnessie.model.Branch;
import org.projectnessie.model.ImmutableMerge;
import org.projectnessie.model.Reference;

import io.delta.tables.DeltaTable;
import scala.Tuple2;
Expand All @@ -51,6 +57,91 @@ protected static void createDelta() {
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog");
}

@Test
// Delta < 0.8 w/ Spark 2.x doesn't support multiple branches well (warnings when changing the configuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my benefit can you explain a bit more? If i recall correctly 0.6.0 is Spark2 only and >0.7.0 is Spark3 only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's delta/spark2 - it complains with a warning that the configuration has changed and "might not have been applied completely".

@DisabledIfSystemProperty(named = "skip-multi-branch-tests", matches = "true")
void testMultipleBranches() throws Exception {
String csvSalaries1 = ITDeltaLog.class.getResource("/salaries1.csv").getPath();
String csvSalaries2 = ITDeltaLog.class.getResource("/salaries2.csv").getPath();
String csvSalaries3 = ITDeltaLog.class.getResource("/salaries3.csv").getPath();
String pathSalaries = new File(tempPath, "salaries").getAbsolutePath();

spark.sql(String.format("CREATE TABLE IF NOT EXISTS test_multiple_branches (Season STRING, Team STRING, Salary STRING, "
+ "Player STRING) USING delta LOCATION '%s'", pathSalaries));
Dataset<Row> salariesDf1 = spark.read().option("header", true).csv(csvSalaries1);
salariesDf1.write().format("delta").mode("overwrite").save(pathSalaries);

Dataset<Row> count1 = spark.sql("SELECT COUNT(*) FROM test_multiple_branches");
Assertions.assertEquals(15L, count1.collectAsList().get(0).getLong(0));

Reference mainBranch = nessieClient.getTreeApi().getReferenceByName("main");

Reference devBranch = nessieClient.getTreeApi().createReference(Branch.of("testMultipleBranches", mainBranch.getHash()));

spark.sparkContext().getConf().set("spark.hadoop." + CONF_NESSIE_REF, devBranch.getName());
spark.sparkContext().hadoopConfiguration().set(CONF_NESSIE_REF, devBranch.getName());

Dataset<Row> salariesDf2 = spark.read().option("header", true).csv(csvSalaries2);
salariesDf2.write().format("delta").mode("append").save(pathSalaries);

Dataset<Row> count2 = spark.sql("SELECT COUNT(*) FROM test_multiple_branches");
Assertions.assertEquals(30L, count2.collectAsList().get(0).getLong(0));

spark.sparkContext().getConf().set("spark.hadoop.nessie.ref", "main");
spark.sparkContext().hadoopConfiguration().set("nessie.ref", "main");

Dataset<Row> salariesDf3 = spark.read().option("header", true).csv(csvSalaries3);
salariesDf3.write().format("delta").mode("append").save(pathSalaries);

Dataset<Row> count3 = spark.sql("SELECT COUNT(*) FROM test_multiple_branches");
Assertions.assertEquals(35L, count3.collectAsList().get(0).getLong(0));
}

@Test
// Delta < 0.8 w/ Spark 2.x doesn't support multiple branches well (warnings when changing the configuration)
@DisabledIfSystemProperty(named = "skip-multi-branch-tests", matches = "true")
void testCommitRetry() throws Exception {
String csvSalaries1 = ITDeltaLog.class.getResource("/salaries1.csv").getPath();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use csvs when everywhere else in spark tests we create simple few row in memory datasets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mixture of "laziness and reproduction of the original issue". I wanted this test to be close to what the demo in nessie-demos looks like.

String csvSalaries2 = ITDeltaLog.class.getResource("/salaries2.csv").getPath();
String csvSalaries3 = ITDeltaLog.class.getResource("/salaries3.csv").getPath();
String pathSalaries = new File(tempPath, "salaries").getAbsolutePath();

spark.sql(String.format("CREATE TABLE IF NOT EXISTS test_commit_retry (Season STRING, Team STRING, Salary STRING, "
+ "Player STRING) USING delta LOCATION '%s'", pathSalaries));
Dataset<Row> salariesDf1 = spark.read().option("header", true).csv(csvSalaries1);
salariesDf1.write().format("delta").mode("overwrite").save(pathSalaries);

Dataset<Row> count1 = spark.sql("SELECT COUNT(*) FROM test_commit_retry");
Assertions.assertEquals(15L, count1.collectAsList().get(0).getLong(0));

Reference mainBranch = nessieClient.getTreeApi().getReferenceByName("main");

Reference devBranch = nessieClient.getTreeApi().createReference(Branch.of("testCommitRetry", mainBranch.getHash()));

spark.sparkContext().getConf().set("spark.hadoop." + CONF_NESSIE_REF, devBranch.getName());
spark.sparkContext().hadoopConfiguration().set(CONF_NESSIE_REF, devBranch.getName());

Dataset<Row> salariesDf2 = spark.read().option("header", true).csv(csvSalaries2);
salariesDf2.write().format("delta").mode("append").save(pathSalaries);

Dataset<Row> count2 = spark.sql("SELECT COUNT(*) FROM test_commit_retry");
Assertions.assertEquals(30L, count2.collectAsList().get(0).getLong(0));

String toHash = nessieClient.getTreeApi().getReferenceByName("main").getHash();
String fromHash = nessieClient.getTreeApi().getReferenceByName("testCommitRetry").getHash();

nessieClient.getTreeApi().mergeRefIntoBranch("main", toHash, ImmutableMerge.builder().fromHash(fromHash).build());

spark.sparkContext().getConf().set("spark.hadoop.nessie.ref", "main");
spark.sparkContext().hadoopConfiguration().set("nessie.ref", "main");

Dataset<Row> salariesDf3 = spark.read().option("header", true).csv(csvSalaries3);
salariesDf3.write().format("delta").mode("append").save(pathSalaries);

Dataset<Row> count3 = spark.sql("SELECT COUNT(*) FROM test_commit_retry");
Assertions.assertEquals(50L, count3.collectAsList().get(0).getLong(0));
}

@Test
void testWithoutCondition() {
Dataset<Row> targetTable = createKVDataSet(Arrays.asList(tuple2(1, 10), tuple2(2, 20), tuple2(3, 30), tuple2(4, 40)), "key", "value");
Expand Down
1 change: 1 addition & 0 deletions clients/deltalake/core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.apache.spark.sql.delta" level="${test.log.level:-INFO}"/>
<root>
<level value="${test.log.level:-INFO}"/>
<appender-ref ref="console"/>
Expand Down
16 changes: 16 additions & 0 deletions clients/deltalake/core/src/test/resources/salaries1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Season,Team,Salary,Player
2003-04,Cleveland Cavaliers,$4018920,Lebron James
2004-05,Cleveland Cavaliers,$4320360,Lebron James
2005-06,Cleveland Cavaliers,$4621800,Lebron James
2006-07,Cleveland Cavaliers,$5828090,Lebron James
2007-08,Cleveland Cavaliers,$13041250,Lebron James
2008-09,Cleveland Cavaliers,$14410581,Lebron James
2009-10,Cleveland Cavaliers,$15779912,Lebron James
2010-11,Miami Heat,$14500000,Lebron James
2011-12,Miami Heat,$16022500,Lebron James
2012-13,Miami Heat,$17545000,Lebron James
2013-14,Miami Heat,$19067500,Lebron James
2014-15,Cleveland Cavaliers,$20644400,Lebron James
2015-16,Cleveland Cavaliers,$22971000,Lebron James
2016-17,Cleveland Cavaliers,$30963450,Lebron James
2017-18,Cleveland Cavaliers,$33285709,Lebron James
16 changes: 16 additions & 0 deletions clients/deltalake/core/src/test/resources/salaries2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Season,Team,Salary,Player
1984-85,Chicago Bulls,$550000,Michael Jordan
1985-86,Chicago Bulls,$630000,Michael Jordan
1987-88,Chicago Bulls,$845000,Michael Jordan
1988-89,Chicago Bulls,$2000000,Michael Jordan
1989-90,Chicago Bulls,$2500000,Michael Jordan
1990-91,Chicago Bulls,$2500000,Michael Jordan
1991-92,Chicago Bulls,$3250000,Michael Jordan
1992-93,Chicago Bulls,$4000000,Michael Jordan
1993-94,Chicago Bulls,$4000000,Michael Jordan
1994-95,Chicago Bulls,$3850000,Michael Jordan
1995-96,Chicago Bulls,$3850000,Michael Jordan
1996-97,Chicago Bulls,$30140000,Michael Jordan
1997-98,Chicago Bulls,$33140000,Michael Jordan
2001-02,Washington Wizards,$1000000,Michael Jordan
2002-03,Washington Wizards,$1030000,Michael Jordan
21 changes: 21 additions & 0 deletions clients/deltalake/core/src/test/resources/salaries3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Season,Team,Salary,Player
1996-97,Los Angeles Lakers,$1015000,Kobe Bryant
1997-98,Los Angeles Lakers,$1167240,Kobe Bryant
1998-99,Los Angeles Lakers,$1319000,Kobe Bryant
1999-00,Los Angeles Lakers,$9000000,Kobe Bryant
2000-01,Los Angeles Lakers,$10130000,Kobe Bryant
2001-02,Los Angeles Lakers,$11250000,Kobe Bryant
2002-03,Los Angeles Lakers,$12375000,Kobe Bryant
2003-04,Los Angeles Lakers,$13500000,Kobe Bryant
2004-05,Los Angeles Lakers,$14175000,Kobe Bryant
2005-06,Los Angeles Lakers,$15946875,Kobe Bryant
2006-07,Los Angeles Lakers,$17718750,Kobe Bryant
2007-08,Los Angeles Lakers,$19490625,Kobe Bryant
2008-09,Los Angeles Lakers,$21262500,Kobe Bryant
2009-10,Los Angeles Lakers,$23034375,Kobe Bryant
2010-11,Los Angeles Lakers,$24806250,Kobe Bryant
2011-12,Los Angeles Lakers,$25244493,Kobe Bryant
2012-13,Los Angeles Lakers,$27849149,Kobe Bryant
2013-14,Los Angeles Lakers,$30453805,Kobe Bryant
2014-15,Los Angeles Lakers,$23500000,Kobe Bryant
2015-16,Los Angeles Lakers,$25000000,Kobe Bryant
4 changes: 4 additions & 0 deletions clients/deltalake/spark2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@
<configuration>
<systemPropertyVariables>
<quarkus.http.test-port>${quarkus.http.test-port}</quarkus.http.test-port>
<skip-multi-branch-tests>true</skip-multi-branch-tests>
</systemPropertyVariables>
<additionalClasspathElements>
${project.build.directory}/test-sources
</additionalClasspathElements>
<failIfNoTests>true</failIfNoTests>
</configuration>
<executions>
Expand Down
31 changes: 0 additions & 31 deletions clients/deltalake/spark2/src/test/resources/logback-test.xml

This file was deleted.

3 changes: 3 additions & 0 deletions clients/deltalake/spark3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@
<systemPropertyVariables>
<quarkus.http.test-port>${quarkus.http.test-port}</quarkus.http.test-port>
</systemPropertyVariables>
<additionalClasspathElements>
${project.build.directory}/test-sources
</additionalClasspathElements>
<failIfNoTests>true</failIfNoTests>
</configuration>
<executions>
Expand Down
Loading