Skip to content

Commit

Permalink
Add export to scala client (#1658)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Mar 22, 2021
1 parent d3bd55c commit 39323ff
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 0 deletions.
1 change: 1 addition & 0 deletions clients/spark/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
gen/
.bsp/
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ class ApiClient(apiUrl: String, accessKey: String, secretKey: String) {
}
URI.create(getStorageNamespace(repoName) + "/" + resp.header("Location").get).normalize().toString
}

def getBranchHEADCommit(repoName: String, branch: String): String = {
val getBranchURI = URI.create("%s/repositories/%s/branches/%s".format(apiUrl, repoName, branch)).normalize()
val resp = Http(getBranchURI.toString).header("Accept", "application/json").auth(accessKey, secretKey).asString
if (resp.isError) {
throw new RuntimeException(s"failed to get branch ${getBranchURI}: [${resp.code}] ${resp.body}")
}
val branchResp = parse(resp.body)

branchResp \ "commit_id" match {
case JString(commitID) => commitID
case _ =>
throw new RuntimeException(s"expected string commit_id in ${resp.body}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.treeverse.clients

import org.apache.spark.SerializableWritable
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

class Exporter(spark : SparkSession, apiClient: ApiClient, repoName: String, dstRoot: String) {
def exportAllFromBranch(branch: String): Unit = {
val commitID = apiClient.getBranchHEADCommit(repoName, branch)
exportAllFromCommit(commitID)
}

def exportAllFromCommit(commitID: String): Unit = {
val ns = apiClient.getStorageNamespace(repoName)
val df = LakeFSContext.newDF(spark, repoName, commitID)
df.createOrReplaceTempView("commit")

// pin Exporter field to avoid serialization
val dst = dstRoot
val actionsDF = spark.sql("SELECT 'copy' as action, * FROM commit")

export(ns, dst, actionsDF)
spark.sparkContext.stop()
}

private def export(ns: String, rel: String, actionsDF: DataFrame) = {
val hadoopConf = spark.sparkContext.hadoopConfiguration
val serializedConf = new SerializableWritable(hadoopConf)

actionsDF.foreach { row =>
Exporter.handleRow(ns, rel, serializedConf, row)
}
}
def exportFrom(branch: String, prevCommitID: String): Unit = {
val commitID = apiClient.getBranchHEADCommit(repoName, branch)
val ns = apiClient.getStorageNamespace(repoName)

val newDF = LakeFSContext.newDF(spark, repoName, commitID)
val prevDF = LakeFSContext.newDF(spark, repoName, prevCommitID)

newDF.createOrReplaceTempView("new_commit")
prevDF.createOrReplaceTempView("prev_commit")

// pin Exporter field to avoid serialization
val dst = dstRoot
val actionsDF = spark.sql("SELECT 'copy' as action, new_commit.* FROM new_commit " +
"LEFT JOIN prev_commit " +
"ON (new_commit.etag = prev_commit.etag AND new_commit.key = prev_commit.key) " +
"WHERE prev_commit.key is NULL " +
"UNION " +
"SELECT 'delete' as action, prev_commit.* FROM prev_commit " +
"LEFT OUTER JOIN new_commit " +
"ON (new_commit.key = prev_commit.key) " +
"WHERE new_commit.key is NULL")

export(ns, dst, actionsDF)
spark.sparkContext.stop()
}
}

object Exporter {
private def handleRow(ns: String, rel: String, serializedConf: SerializableWritable[org.apache.hadoop.conf.Configuration], row: Row): Unit = {
val action = row(0)
val key = row(1)
val address = row(2).toString()
val conf = serializedConf.value

val srcPath = new org.apache.hadoop.fs.Path( if (address.contains("://")) address else ns + "/" + address)
val dstPath = new org.apache.hadoop.fs.Path(rel + "/" + key)

val dstFS = dstPath.getFileSystem(conf)

action match {
case "delete" => {
dstFS.delete(dstPath,false) : Unit
}

case "copy" =>{
org.apache.hadoop.fs.FileUtil.copy(

srcPath.getFileSystem(conf),
srcPath,
dstFS,
dstPath,
false,
conf
) : Unit
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.treeverse.clients.examples

import io.treeverse.clients.{ApiClient, Exporter}
import org.apache.spark.sql.SparkSession

// This example Export program copies all files from a lakeFS branch in a lakeFS repository
// to the specified s3 bucket. When the export ends, file structure under the bucket will match
// the one in the branch.
// This example supports continuous exports - provided with <prev_commit_id>, it will handle only the files
// that were changed since that commit and avoid copying unnecessary data.
object Export extends App {
override def main(args: Array[String]) {
if (args.length != 4) {
Console.err.println("Usage: ... <repo_name> <branch_id> <prev_commit_id> s3://path/to/output/du")
System.exit(1)
}

val endpoint = "http://<LAKEFS_ENDPOINT>/api/v1"
val accessKey = "<LAKEFS_ACCESS_KEY_ID>"
val secretKey = "<LAKEFS_SECRET_ACCESS_KEY>"

val repo = args(0)
val branch = args(1)
val prevCommitID = args(2)
val rootLocation = args(3)

val spark = SparkSession.builder().appName("I can list").master("local").getOrCreate()

val sc = spark.sparkContext
sc.hadoopConfiguration.set("lakefs.api.url", endpoint)
sc.hadoopConfiguration.set("lakefs.api.access_key", accessKey)
sc.hadoopConfiguration.set("lakefs.api.secret_key", secretKey)

val apiClient = new ApiClient(endpoint, accessKey, secretKey)
val exporter = new Exporter(spark, apiClient, repo, rootLocation)

exporter.exportAllFromBranch(branch)
// exporter.exportAllFromCommit(prevCommitID)
// exporter.exportFrom(branch, prevCommitID)
}
}

0 comments on commit 39323ff

Please sign in to comment.