Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import org.apache.s2graph.s2jobs.udfs.UdfOption

case class JobDescription(
name:String,
udfs: Seq[UdfOption],
sources:Seq[Source],
processes:Seq[task.Process],
sinks:Seq[Sink]
sinks:Seq[Sink],
udfs: Seq[UdfOption] = Nil,
listener:Option[Map[String, String]] = None
)

object JobDescription extends Logger {
val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil)
val dummy = JobDescription("dummy", Nil, Nil, Nil)

def apply(jsVal:JsValue):JobDescription = {
implicit val TaskConfReader = Json.reads[TaskConf]
Expand All @@ -41,12 +42,13 @@ object JobDescription extends Logger {
logger.debug(s"JobDescription: ${jsVal}")

val jobName = (jsVal \ "name").as[String]
val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf))
val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
val listenerOpt = (jsVal \ "listener").asOpt[Map[String, String]]

JobDescription(jobName, udfs, sources, processes, sinks)
JobDescription(jobName, sources, processes, sinks, udfs, listenerOpt)
}

def getSource(conf:TaskConf):Source = {
Expand All @@ -56,7 +58,24 @@ object JobDescription extends Logger {
case "hive" => new HiveSource(conf)
case "jdbc" => new JdbcSource(conf)
case "s2graph" => new S2GraphSource(conf)
case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}")
case "custom" =>
val customClassOpt = conf.options.get("class")
customClassOpt match {
case Some(customClass:String) =>
logger.debug(s"custom class init.. $customClass")

Class.forName(customClass)
.getConstructor(classOf[TaskConf])
.newInstance(conf)
.asInstanceOf[task.Source]

case None => throw new IllegalArgumentException(s"custom class name is not exist.. ${conf}")
}
case _ =>
val newOptions = conf.options ++ Map("format" -> conf.`type`)
val newConf = conf.copy(options = newOptions)
new DefaultSource(newConf)
// throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}")
}
}

Expand Down Expand Up @@ -88,6 +107,7 @@ object JobDescription extends Logger {
case "es" => new ESSink(jobName, conf)
case "s2graph" => new S2GraphSink(jobName, conf)
case "jdbc" => new JdbcSink(jobName, conf)
case "hive" => new HiveSink(jobName, conf)
case "custom" =>
val customClassOpt = conf.options.get("class")
customClassOpt match {
Expand Down
23 changes: 17 additions & 6 deletions s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@

package org.apache.s2graph.s2jobs

import com.typesafe.config.ConfigFactory
import org.apache.s2graph.s2jobs.udfs.Udf
import org.apache.s2graph.s2jobs.utils.{DBTransactionUtil, TransactionUtil}
import org.apache.spark.sql.SparkSession
import play.api.libs.json.{JsValue, Json}

import scala.io.Source

case class JobOption(
name:String = "S2BatchJob",
confType:String = "db",
jobId:Int = -1,
confFile:String = ""
name: String = "S2BatchJob",
confType: String = "db",
jobId: Int = -1,
confFile: String = "",
incremental: Boolean = false
)

object JobLauncher extends Logger {
Expand Down Expand Up @@ -90,7 +93,15 @@ object JobLauncher extends Logger {
udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
}

val job = new Job(ss, jobDescription)
job.run()
if (options.incremental) {
val config = ConfigFactory.load()
val txUtil = DBTransactionUtil(config)

TransactionUtil.withTx(ss, options.name, jobDescription, txUtil)(_.run())
} else {
val job = new Job(ss, jobDescription)

job.run()
}
}
}
32 changes: 30 additions & 2 deletions s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.s2graph.s2jobs.Schema
import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import play.api.libs.json.{JsObject, Json}


Expand All @@ -37,6 +38,21 @@ abstract class Source(override val conf:TaskConf) extends Task {
def toDF(ss:SparkSession):DataFrame
}

class DefaultSource(conf:TaskConf) extends Source(conf) {
override def mandatoryOptions: Set[String] = Set("format")

override def toDF(ss: SparkSession): DataFrame = {
val isStreamSource = conf.options.getOrElse("is_stream_source", "false").toBoolean

if (isStreamSource) {
ss.readStream.options(conf.options).load()
} else {
ss.read.options(conf.options).load()
}

}
}

class KafkaSource(conf:TaskConf) extends Source(conf) {
val DEFAULT_FORMAT = "raw"
override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe")
Expand Down Expand Up @@ -176,4 +192,16 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {

ss.sqlContext.createDataFrame(kvs, schema)
}
}
}

case class EmptySource(taskConf: TaskConf) extends Source(taskConf) {
val options = taskConf.options

override def toDF(ss: SparkSession): DataFrame = {
val schema = DataType.fromJson(options("schema")).asInstanceOf[StructType]

ss.createDataFrame(ss.sparkContext.parallelize(Seq.empty[Row]), schema)
}

override def mandatoryOptions: Set[String] = Set("schema")
}
10 changes: 10 additions & 0 deletions s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ object TaskConf {
taskConf.options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet)
}

def parseMetaStoreConfigs(options: Map[String, String]): Map[String, Any] = {
options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet)
}

def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt)
}

def parseLocalCacheConfigs(options: Map[String, String]): Map[String, Any] = {
options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt)
}

def parseTransformers(taskConf: TaskConf): Seq[Transformer] = {
val classes = Json.parse(taskConf.options.getOrElse("transformClasses",
"""["org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer"]""")).as[Seq[String]]
Expand All @@ -69,6 +77,8 @@ trait Task extends Serializable with Logger {

def isValidate: Boolean = mandatoryOptions.subsetOf(conf.options.keySet)

def getName: String = conf.name

require(isValidate,
s"""${LOG_PREFIX} not exists mandatory options '${mandatoryOptions.mkString(",")}'
in task options (${conf.options.keySet.mkString(",")})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.apache.s2graph.s2jobs.utils

import org.apache.hadoop.fs.{FileStatus, FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.spark.sql.SparkSession


case class RemoteIteratorWrapper[A](remoteIterator: RemoteIterator[A])
extends scala.collection.AbstractIterator[A]
with scala.collection.Iterator[A] {

override def hasNext: Boolean = remoteIterator.hasNext

def next: A = remoteIterator.next
}

object HDFSUtil {
def getFilteredFiles(ss: SparkSession,
basePath: String,
recursive: Boolean)(filterFunc: FileStatus => Boolean): Iterator[FileStatus] = {
val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration)
val iter = fs.listFiles(new Path(basePath), recursive)
RemoteIteratorWrapper(iter).filter(filterFunc)
}

def getFileStatus(ss: SparkSession, path: String): FileStatus = {
val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration)
fs.getFileStatus(new Path(path))
}

def getLatestFile(ss: SparkSession,
baseDir: String,
recursive: Boolean): Option[Path] = {
var latestPath: LocatedFileStatus = null

val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration)

val iter = fs.listFiles(new Path(baseDir), recursive)
while (iter.hasNext) {
val current = iter.next()
val skipThis = current.getPath.getName.startsWith("_")
if (!skipThis) {
val latestModifiedTime = if (latestPath == null) 0L else latestPath.getModificationTime
if (latestModifiedTime < current.getModificationTime) {
latestPath = current
}
}
}

Option(latestPath).map(_.getPath)
}

def isEmptyDirectory(fs: FileSystem, path: Path): Boolean = {
val ls = fs.listStatus(path)
ls.isEmpty || (ls.size == 1 && isMetaFile(ls.head.getPath))
}

def isMetaFile(path: Path): Boolean = path.getName.startsWith("_")

def reducePaths(fs: FileSystem, paths: Set[Path]): Seq[Path] = {
val reduced = paths.groupBy(path => path.getParent).foldLeft(Seq[Path]()){ case (ls, (parentPath, childPaths)) =>
val allPathsInParent = fs.listStatus(parentPath).map(_.getPath).filterNot(isMetaFile).toSet
if (allPathsInParent equals childPaths) ls :+ parentPath
else ls ++ childPaths
}

if (reduced.toSet equals paths) reduced
else reducePaths(fs, reduced.toSet)
}

def reducePaths(ss: SparkSession, paths: Set[FileStatus]): Seq[FileStatus] = {
val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration)
reducePaths(fs, paths.map(_.getPath)).map(fs.getFileStatus)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.s2graph.s2jobs.utils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types.StructType

object HiveUtil {

def getTableModificationTime(ss: SparkSession,
table: String,
dataBase: String): Option[Long] = {
val sessionCatalog = ss.sessionState.catalog
val catalogTable = sessionCatalog.getTableMetadata(TableIdentifier(table = table, database = Some(dataBase)))
catalogTable.properties.get("transient_lastDdlTime").map(_.toLong * 1000)
}

def getTableSchema(ss: SparkSession,
table: String,
dataBase: String): StructType = {
ss.sql(s"SELECT * FROM $dataBase.$table LIMIT 1").schema
}

}
Loading