diff --git a/clients/spark/.gitignore b/clients/spark/.gitignore index 2f2c74da8da..53463fdec7c 100644 --- a/clients/spark/.gitignore +++ b/clients/spark/.gitignore @@ -1,2 +1,3 @@ target/ gen/ +.bsp/ diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala index e87289f3e79..7ceb1fdf334 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala @@ -65,4 +65,19 @@ class ApiClient(apiUrl: String, accessKey: String, secretKey: String) { } URI.create(getStorageNamespace(repoName) + "/" + resp.header("Location").get).normalize().toString } + + def getBranchHEADCommit(repoName: String, branch: String): String = { + val getBranchURI = URI.create("%s/repositories/%s/branches/%s".format(apiUrl, repoName, branch)).normalize() + val resp = Http(getBranchURI.toString).header("Accept", "application/json").auth(accessKey, secretKey).asString + if (resp.isError) { + throw new RuntimeException(s"failed to get branch ${getBranchURI}: [${resp.code}] ${resp.body}") + } + val branchResp = parse(resp.body) + + branchResp \ "commit_id" match { + case JString(commitID) => commitID + case _ => + throw new RuntimeException(s"expected string commit_id in ${resp.body}") + } + } } diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/Exporter.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/Exporter.scala new file mode 100644 index 00000000000..2fbe488acde --- /dev/null +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/Exporter.scala @@ -0,0 +1,90 @@ +package io.treeverse.clients + +import org.apache.spark.SerializableWritable +import org.apache.spark.sql.{DataFrame, Row, SparkSession} + +class Exporter(spark : SparkSession, apiClient: ApiClient, repoName: String, dstRoot: String) { + def exportAllFromBranch(branch: String): Unit = { + val commitID = apiClient.getBranchHEADCommit(repoName, branch) + exportAllFromCommit(commitID) + } + + def exportAllFromCommit(commitID: String): Unit = { + val ns = apiClient.getStorageNamespace(repoName) + val df = LakeFSContext.newDF(spark, repoName, commitID) + df.createOrReplaceTempView("commit") + + // pin Exporter field to avoid serialization + val dst = dstRoot + val actionsDF = spark.sql("SELECT 'copy' as action, * FROM commit") + + export(ns, dst, actionsDF) + spark.sparkContext.stop() + } + + private def export(ns: String, rel: String, actionsDF: DataFrame) = { + val hadoopConf = spark.sparkContext.hadoopConfiguration + val serializedConf = new SerializableWritable(hadoopConf) + + actionsDF.foreach { row => + Exporter.handleRow(ns, rel, serializedConf, row) + } +} +def exportFrom(branch: String, prevCommitID: String): Unit = { + val commitID = apiClient.getBranchHEADCommit(repoName, branch) + val ns = apiClient.getStorageNamespace(repoName) + + val newDF = LakeFSContext.newDF(spark, repoName, commitID) + val prevDF = LakeFSContext.newDF(spark, repoName, prevCommitID) + + newDF.createOrReplaceTempView("new_commit") + prevDF.createOrReplaceTempView("prev_commit") + + // pin Exporter field to avoid serialization + val dst = dstRoot + val actionsDF = spark.sql("SELECT 'copy' as action, new_commit.* FROM new_commit " + + "LEFT JOIN prev_commit " + + "ON (new_commit.etag = prev_commit.etag AND new_commit.key = prev_commit.key) " + + "WHERE prev_commit.key is NULL " + + "UNION " + + "SELECT 'delete' as action, prev_commit.* FROM prev_commit " + + "LEFT OUTER JOIN new_commit " + + "ON (new_commit.key = prev_commit.key) " + + "WHERE new_commit.key is NULL") + + export(ns, dst, actionsDF) + spark.sparkContext.stop() + } +} + +object Exporter { + private def handleRow(ns: String, rel: String, serializedConf: SerializableWritable[org.apache.hadoop.conf.Configuration], row: Row): Unit = { + val action = row(0) + val key = row(1) + val address = row(2).toString() + val conf = serializedConf.value + + val srcPath = new org.apache.hadoop.fs.Path( if (address.contains("://")) address else ns + "/" + address) + val dstPath = new org.apache.hadoop.fs.Path(rel + "/" + key) + + val dstFS = dstPath.getFileSystem(conf) + + action match { + case "delete" => { + dstFS.delete(dstPath,false) : Unit + } + + case "copy" =>{ + org.apache.hadoop.fs.FileUtil.copy( + + srcPath.getFileSystem(conf), + srcPath, + dstFS, + dstPath, + false, + conf + ) : Unit + } + } + } +} diff --git a/clients/spark/examples/src/main/scala/io/treeverse/clients/examples/Export.scala b/clients/spark/examples/src/main/scala/io/treeverse/clients/examples/Export.scala new file mode 100644 index 00000000000..09b0f33527d --- /dev/null +++ b/clients/spark/examples/src/main/scala/io/treeverse/clients/examples/Export.scala @@ -0,0 +1,41 @@ +package io.treeverse.clients.examples + +import io.treeverse.clients.{ApiClient, Exporter} +import org.apache.spark.sql.SparkSession + +// This example Export program copies all files from a lakeFS branch in a lakeFS repository +// to the specified s3 bucket. When the export ends, file structure under the bucket will match +// the one in the branch. +// This example supports continuous exports - provided with , it will handle only the files +// that were changed since that commit and avoid copying unnecessary data. +object Export extends App { + override def main(args: Array[String]) { + if (args.length != 4) { + Console.err.println("Usage: ... s3://path/to/output/du") + System.exit(1) + } + + val endpoint = "http:///api/v1" + val accessKey = "" + val secretKey = "" + + val repo = args(0) + val branch = args(1) + val prevCommitID = args(2) + val rootLocation = args(3) + + val spark = SparkSession.builder().appName("I can list").master("local").getOrCreate() + + val sc = spark.sparkContext + sc.hadoopConfiguration.set("lakefs.api.url", endpoint) + sc.hadoopConfiguration.set("lakefs.api.access_key", accessKey) + sc.hadoopConfiguration.set("lakefs.api.secret_key", secretKey) + + val apiClient = new ApiClient(endpoint, accessKey, secretKey) + val exporter = new Exporter(spark, apiClient, repo, rootLocation) + + exporter.exportAllFromBranch(branch) +// exporter.exportAllFromCommit(prevCommitID) +// exporter.exportFrom(branch, prevCommitID) + } +}