diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala index cfa547b5..e4981b77 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala @@ -25,14 +25,15 @@ import org.apache.s2graph.s2jobs.udfs.UdfOption case class JobDescription( name:String, - udfs: Seq[UdfOption], sources:Seq[Source], processes:Seq[task.Process], - sinks:Seq[Sink] + sinks:Seq[Sink], + udfs: Seq[UdfOption] = Nil, + listener:Option[Map[String, String]] = None ) object JobDescription extends Logger { - val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil) + val dummy = JobDescription("dummy", Nil, Nil, Nil) def apply(jsVal:JsValue):JobDescription = { implicit val TaskConfReader = Json.reads[TaskConf] @@ -41,12 +42,13 @@ object JobDescription extends Logger { logger.debug(s"JobDescription: ${jsVal}") val jobName = (jsVal \ "name").as[String] - val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil) val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf)) val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf)) val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf)) + val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil) + val listenerOpt = (jsVal \ "listener").asOpt[Map[String, String]] - JobDescription(jobName, udfs, sources, processes, sinks) + JobDescription(jobName, sources, processes, sinks, udfs, listenerOpt) } def getSource(conf:TaskConf):Source = { @@ -56,7 +58,24 @@ object JobDescription extends Logger { case "hive" => new HiveSource(conf) case "jdbc" => new JdbcSource(conf) case "s2graph" => new S2GraphSource(conf) - case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}") + case "custom" => + val customClassOpt = conf.options.get("class") + customClassOpt match { + case Some(customClass:String) => + logger.debug(s"custom class init.. $customClass") + + Class.forName(customClass) + .getConstructor(classOf[TaskConf]) + .newInstance(conf) + .asInstanceOf[task.Source] + + case None => throw new IllegalArgumentException(s"custom class name is not exist.. ${conf}") + } + case _ => + val newOptions = conf.options ++ Map("format" -> conf.`type`) + val newConf = conf.copy(options = newOptions) + new DefaultSource(newConf) + // throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}") } } @@ -88,6 +107,7 @@ object JobDescription extends Logger { case "es" => new ESSink(jobName, conf) case "s2graph" => new S2GraphSink(jobName, conf) case "jdbc" => new JdbcSink(jobName, conf) + case "hive" => new HiveSink(jobName, conf) case "custom" => val customClassOpt = conf.options.get("class") customClassOpt match { diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala index 0a762744..a4e0cabd 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala @@ -19,17 +19,20 @@ package org.apache.s2graph.s2jobs +import com.typesafe.config.ConfigFactory import org.apache.s2graph.s2jobs.udfs.Udf +import org.apache.s2graph.s2jobs.utils.{DBTransactionUtil, TransactionUtil} import org.apache.spark.sql.SparkSession import play.api.libs.json.{JsValue, Json} import scala.io.Source case class JobOption( - name:String = "S2BatchJob", - confType:String = "db", - jobId:Int = -1, - confFile:String = "" + name: String = "S2BatchJob", + confType: String = "db", + jobId: Int = -1, + confFile: String = "", + incremental: Boolean = false ) object JobLauncher extends Logger { @@ -90,7 +93,15 @@ object JobLauncher extends Logger { udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty)) } - val job = new Job(ss, jobDescription) - job.run() + if (options.incremental) { + val config = ConfigFactory.load() + val txUtil = DBTransactionUtil(config) + + TransactionUtil.withTx(ss, options.name, jobDescription, txUtil)(_.run()) + } else { + val job = new Job(ss, jobDescription) + + job.run() + } } } diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index d985edc1..bd933339 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -24,7 +24,8 @@ import org.apache.s2graph.s2jobs.Schema import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import play.api.libs.json.{JsObject, Json} @@ -37,6 +38,21 @@ abstract class Source(override val conf:TaskConf) extends Task { def toDF(ss:SparkSession):DataFrame } +class DefaultSource(conf:TaskConf) extends Source(conf) { + override def mandatoryOptions: Set[String] = Set("format") + + override def toDF(ss: SparkSession): DataFrame = { + val isStreamSource = conf.options.getOrElse("is_stream_source", "false").toBoolean + + if (isStreamSource) { + ss.readStream.options(conf.options).load() + } else { + ss.read.options(conf.options).load() + } + + } +} + class KafkaSource(conf:TaskConf) extends Source(conf) { val DEFAULT_FORMAT = "raw" override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe") @@ -176,4 +192,16 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) { ss.sqlContext.createDataFrame(kvs, schema) } -} \ No newline at end of file +} + +case class EmptySource(taskConf: TaskConf) extends Source(taskConf) { + val options = taskConf.options + + override def toDF(ss: SparkSession): DataFrame = { + val schema = DataType.fromJson(options("schema")).asInstanceOf[StructType] + + ss.createDataFrame(ss.sparkContext.parallelize(Seq.empty[Row]), schema) + } + + override def mandatoryOptions: Set[String] = Set("schema") +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index 62081df9..b18ae903 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -43,10 +43,18 @@ object TaskConf { taskConf.options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet) } + def parseMetaStoreConfigs(options: Map[String, String]): Map[String, Any] = { + options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet) + } + def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = { taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt) } + def parseLocalCacheConfigs(options: Map[String, String]): Map[String, Any] = { + options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt) + } + def parseTransformers(taskConf: TaskConf): Seq[Transformer] = { val classes = Json.parse(taskConf.options.getOrElse("transformClasses", """["org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer"]""")).as[Seq[String]] @@ -69,6 +77,8 @@ trait Task extends Serializable with Logger { def isValidate: Boolean = mandatoryOptions.subsetOf(conf.options.keySet) + def getName: String = conf.name + require(isValidate, s"""${LOG_PREFIX} not exists mandatory options '${mandatoryOptions.mkString(",")}' in task options (${conf.options.keySet.mkString(",")}) diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/HDFSUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/HDFSUtil.scala new file mode 100644 index 00000000..9985ce06 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/HDFSUtil.scala @@ -0,0 +1,75 @@ +package org.apache.s2graph.s2jobs.utils + +import org.apache.hadoop.fs.{FileStatus, FileSystem, LocatedFileStatus, Path, RemoteIterator} +import org.apache.spark.sql.SparkSession + + +case class RemoteIteratorWrapper[A](remoteIterator: RemoteIterator[A]) + extends scala.collection.AbstractIterator[A] + with scala.collection.Iterator[A] { + + override def hasNext: Boolean = remoteIterator.hasNext + + def next: A = remoteIterator.next +} + +object HDFSUtil { + def getFilteredFiles(ss: SparkSession, + basePath: String, + recursive: Boolean)(filterFunc: FileStatus => Boolean): Iterator[FileStatus] = { + val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration) + val iter = fs.listFiles(new Path(basePath), recursive) + RemoteIteratorWrapper(iter).filter(filterFunc) + } + + def getFileStatus(ss: SparkSession, path: String): FileStatus = { + val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration) + fs.getFileStatus(new Path(path)) + } + + def getLatestFile(ss: SparkSession, + baseDir: String, + recursive: Boolean): Option[Path] = { + var latestPath: LocatedFileStatus = null + + val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration) + + val iter = fs.listFiles(new Path(baseDir), recursive) + while (iter.hasNext) { + val current = iter.next() + val skipThis = current.getPath.getName.startsWith("_") + if (!skipThis) { + val latestModifiedTime = if (latestPath == null) 0L else latestPath.getModificationTime + if (latestModifiedTime < current.getModificationTime) { + latestPath = current + } + } + } + + Option(latestPath).map(_.getPath) + } + + def isEmptyDirectory(fs: FileSystem, path: Path): Boolean = { + val ls = fs.listStatus(path) + ls.isEmpty || (ls.size == 1 && isMetaFile(ls.head.getPath)) + } + + def isMetaFile(path: Path): Boolean = path.getName.startsWith("_") + + def reducePaths(fs: FileSystem, paths: Set[Path]): Seq[Path] = { + val reduced = paths.groupBy(path => path.getParent).foldLeft(Seq[Path]()){ case (ls, (parentPath, childPaths)) => + val allPathsInParent = fs.listStatus(parentPath).map(_.getPath).filterNot(isMetaFile).toSet + if (allPathsInParent equals childPaths) ls :+ parentPath + else ls ++ childPaths + } + + if (reduced.toSet equals paths) reduced + else reducePaths(fs, reduced.toSet) + } + + def reducePaths(ss: SparkSession, paths: Set[FileStatus]): Seq[FileStatus] = { + val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration) + reducePaths(fs, paths.map(_.getPath)).map(fs.getFileStatus) + } +} + diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/HiveUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/HiveUtil.scala new file mode 100644 index 00000000..a2a223d3 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/HiveUtil.scala @@ -0,0 +1,23 @@ +package org.apache.s2graph.s2jobs.utils + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.types.StructType + +object HiveUtil { + + def getTableModificationTime(ss: SparkSession, + table: String, + dataBase: String): Option[Long] = { + val sessionCatalog = ss.sessionState.catalog + val catalogTable = sessionCatalog.getTableMetadata(TableIdentifier(table = table, database = Some(dataBase))) + catalogTable.properties.get("transient_lastDdlTime").map(_.toLong * 1000) + } + + def getTableSchema(ss: SparkSession, + table: String, + dataBase: String): StructType = { + ss.sql(s"SELECT * FROM $dataBase.$table LIMIT 1").schema + } + +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/TransactionUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/TransactionUtil.scala new file mode 100644 index 00000000..eb7b1abc --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/TransactionUtil.scala @@ -0,0 +1,581 @@ +package org.apache.s2graph.s2jobs.utils + +import java.text.SimpleDateFormat +import java.util.Date + +import com.typesafe.config.{Config, ConfigRenderOptions} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.s2graph.core.Management +import org.apache.s2graph.core.schema.Schema +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.s2jobs._ +import org.apache.s2graph.s2jobs.task.{EmptySource, Source, TaskConf} +import org.apache.spark.sql.SparkSession +import scalikejdbc.WrappedResultSet +import scalikejdbc.interpolation.SQLSyntax + +object TxId { + def apply(rs: WrappedResultSet): TxId = { + TxId( + rs.string("job_name"), + rs.string("app_name"), + rs.string("source_name"), + rs.string("path") + ) + } +} + +case class TxId(jobName: String, + appName: String, + sourceName: String, + path: String) + +object TxLog { + def apply(rs: WrappedResultSet): TxLog = { + TxLog( + rs.long("id"), + TxId(rs), + CommitType(rs.string("commit_type")), + rs.string("commit_id"), + rs.long("last_modified_at"), + rs.booleanOpt("enabled") + ) + } +} + +case class TxLog(id: Long, + txId: TxId, + commitType: CommitType, + commitId: String, + lastModifiedAt: Long, + enabled: Option[Boolean]) + +object CommitType { + def apply(typeName: String): CommitType = typeName match { + case "time" => TimeType + case "path" => PathType + case "hive" => HiveType + } +} + +trait CanCommit[A] { + def toCommitId(a: A): String +} + +object CanCommit { + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val failed: Long = 0L + + implicit val longCommitId: CanCommit[Long] = new CanCommit[Long] { + override def toCommitId(time: Long): String = + if (time == failed) "Failed to fetch modification time" + else dateFormat.format(new Date(time)) + } + + implicit val fileStatusCommitId: CanCommit[FileStatus] = new CanCommit[FileStatus] { + override def toCommitId(fileStatus: FileStatus): String = fileStatus.getPath.toUri.getPath + } +} + +trait CommitType { + def commitId[A: CanCommit](value: A): String = implicitly[CanCommit[A]].toCommitId(value) + + def toColumnValue: String +} + +/** + * Compares numeric ordering of modification time of file sources. + * commit_id is datetime of latest read directory / file's modification time. + */ +case object TimeType extends CommitType { + override def toColumnValue: String = "time" +} + +/** + * Compares lexicographic ordering of paths + * commit_id is path of latest read directory / file. + */ +case object PathType extends CommitType { + override def toColumnValue: String = "path" +} + +/** + * Compares numeric ordering of modification time of hive sources. + * commit_id is datetime of hive sources modification time. + */ +case object HiveType extends CommitType { + override def toColumnValue: String = "hive" +} + + +/** + * TransactionUtil supports incremental processing of spark batch jobs by recording + * each latest path(or partition) processed of Filesource. + * + * Incremental processing of Hive tables are also supported by recording each tables modification time. + * HiveSource in incremental job will be processed only when its modification time has been updated excluding first time read.. + * + */ +object TransactionUtil { + val BaseTxTableKey = "baseTxTable" + val typeKey = "type" + + def apply(config: Config, options: Map[String, String] = Map.empty): TransactionUtil = { + DBTransactionUtil(config, options) + } + + + /** Return filter function according to CommitType + * + * @param fs + * @param lastTx + * @return + */ + def toFileFilterFunc(fs: FileSystem, lastTx: Option[TxLog]): FileStatus => Boolean = (fileStatus: FileStatus) => { + val valid = lastTx match { + case None => true + case Some(tx) => + tx.commitType match { + case PathType => fileStatus.getPath.toUri.getPath > tx.commitId + case TimeType => fileStatus.getModificationTime > tx.lastModifiedAt + } + } + + valid && (!HDFSUtil.isEmptyDirectory(fs, fileStatus.getPath)) + } + + + /** list subdirectory within given partitions with was modified after last transaction log + * + * @param ss + * @param basePath + * @param partitionKeys + * @param lastTx + * @param readLeafFiles + */ + def listAllPath(ss: SparkSession, + basePath: String, + partitionKeys: Seq[String], + lastTx: Option[TxLog] = None, + readLeafFiles: Boolean = false): Seq[FileStatus] = { + val fs = FileSystem.get(ss.sparkContext.hadoopConfiguration) + val filterFunc = toFileFilterFunc(fs, lastTx) + + if (readLeafFiles) HDFSUtil.getFilteredFiles(ss, basePath, recursive = true)(filterFunc).toSeq + else { + val path = + if (partitionKeys.isEmpty) new Path(s"$basePath") + else { + val postfix: String = partitionKeys.map(k => s"$k=*").mkString("/") + new Path(s"$basePath/$postfix") + } + + val ls = fs.globStatus(path, new PathFilter { + override def accept(path: Path): Boolean = + !path.getName.startsWith("_") + }) + + ls.filter(filterFunc) + } + } + + def copySource(source: Source, paths: String): Source = { + val newTaskConf = source.conf.copy(options = source.conf.options ++ Map("paths" -> paths)) + JobDescription.getSource(newTaskConf) + } + + /** + * Map each FileSource and its each path with corresponding + * incremental sub-partitions(directories) after last TxLog + * + * parameters for FileSource in Incremental Jobs + * every parameters are optional + * + * - partition_keys: TxLog will record latest path within given partition. + * if not specified, TxLog will record just the given path + * only when the path has been modified after last TxLog + * + * - initial_offset: file or partition count to read at initial read of source + * + * - incomplete_partitions: if incomplete partition number K is specified, + * incremental job will skip latest K partitions of given path + * + * - max_paths: number of maximum paths count to be read in each interval + * + * - read_leaf_file: given "true", Incremental Job will record leaf file as TxLog + * [Warning] This option can cause too many tasks in spark job + * trying to merge multiple paths in to one data frame + * when reading the source. + * + * - skip_mapping: given "true", Incremental Job will skip to map sources each paths. + * + * + * + * @param fileSources + * @param ss + * @param jobName + * @param jobDesc + * @param txUtil + */ + def toTargetsPerFileSourcePath(fileSources: Seq[Source], + ss: SparkSession, + jobName: String, + jobDesc: JobDescription, + txUtil: TransactionUtil): Map[Source, Map[String, Seq[FileStatus]]] = { + fileSources.map { source => + val sourceName = source.getName + val paths = source.conf.options("paths").split(",") + val partitionKeys = source.conf.options.get("partition_keys") + .map { s => s.split(",").map(_.trim).filter(_.nonEmpty).toSeq }.getOrElse(Nil) + + val initialOffset = source.conf.options.get("initial_offset").map(_.toInt) + val incompletePartitions = source.conf.options.get("incomplete_partitions").map(_.toInt) + val maxPaths = source.conf.options.get("max_paths").map(_.toInt) + + val readLeafFile = source.conf.options.getOrElse("read_leaf_file", "false") == "true" + val skipMapping = source.conf.options.getOrElse("skip_mapping", "false") == "true" + + val innerMap = paths.map { path => + if (skipMapping) path -> Seq(HDFSUtil.getFileStatus(ss, path)) + else { + val txId = TxId(jobName, jobDesc.name, sourceName, path) + val lastTx = txUtil.readTx(txId) + + val filteredTargets = + listAllPath(ss, path, partitionKeys, lastTx, readLeafFile) + .dropRight(incompletePartitions.getOrElse(0)) + .take(maxPaths.getOrElse(Int.MaxValue)) + + val targets = lastTx match { + case Some(_) => filteredTargets + case None => initialOffset.map(k => filteredTargets.takeRight(k)).getOrElse(filteredTargets) + } + + path -> targets + } + }.toMap + + source -> innerMap + }.toMap + } + + /** + * Replace each FileSources to read paths specified at targetsPerFileSourcePath. + * + * @param targetsPerFileSourcePath + * @param ss + * @param jobName + * @param jobDesc + * @param txUtil + */ + def toNewFileSources(targetsPerFileSourcePath: Map[Source, Map[String, Seq[FileStatus]]], + ss: SparkSession, + jobName: String, + jobDesc: JobDescription, + txUtil: TransactionUtil): Seq[Source] = { + targetsPerFileSourcePath.toSeq.map { case (source, innerMap) => + val isEmptySource = innerMap.values.forall(_.isEmpty) + val sourceName = source.getName + + if (isEmptySource) { + val (path, _) = innerMap.head + val txId = TxId(jobName, jobDesc.name, sourceName, path) + val lastTx = txUtil.readTx(txId) + + val lastPath = lastTx.map(_.commitId) + .orElse(HDFSUtil.getLatestFile(ss, path, recursive = true).map(_.toUri.getPath)) + .getOrElse(throw new IllegalStateException(s"last commitId or latest path on source's paths should return any file.")) + + val tmpSource = copySource(source, lastPath) + + val schemaInJson = tmpSource.toDF(ss).schema.json + val newTaskConf = source.conf.copy( + `type` = "custom", + options = source.conf.options ++ Map("schema" -> schemaInJson, "class" -> classOf[EmptySource].getName) + ) + + EmptySource(newTaskConf) + } else { + copySource(source, innerMap.values.flatten.map(_.getPath.toUri.getPath).mkString(",")) + } + } + } + + /** + * Write TxLog of FileSources + * + * @param targetsPerSourcePath + * @param jobName + * @param jobDesc + * @param txUtil + */ + def writeFileTxLog(targetsPerSourcePath: Map[Source, Map[String, Seq[FileStatus]]], + jobName: String, + jobDesc: JobDescription, + txUtil: TransactionUtil): Unit = { + import CanCommit._ + + targetsPerSourcePath.foreach { case (source, innerMap) => + val sourceName = source.getName + + + innerMap.foreach { case (path, targets) => + val txId = TxId(jobName, jobDesc.name, sourceName, path) + + targets.lastOption.foreach { target => + val lastModifiedAt = target.getModificationTime + val commitType = CommitType(source.conf.options.getOrElse("commit_type", "path")) + val commitId = commitType.commitId(target) + + val txLog = TxLog(0, txId, commitType = commitType, commitId = commitId, + lastModifiedAt = lastModifiedAt, enabled = None) + + txUtil.writeTx(txLog) + } + } + } + } + + /** + * Map each HiveSources with its modification time + * + * @param hiveSources + * @param ss + * @param jobName + * @param jobDesc + * @param txUtil + */ + def toHiveSourcesModTimeMap(hiveSources: Seq[Source], + ss: SparkSession, + jobName: String, + jobDesc: JobDescription, + txUtil: TransactionUtil): Map[Source, Long] = { + hiveSources.map { source => + val table = source.conf.options("table") + val database = source.conf.options("database") + val sourceName = source.getName + + val txId = TxId(jobName, jobDesc.name, sourceName, s"$database.$table") + val lastTx = txUtil.readTx(txId) + + val tableSchemaJson = HiveUtil.getTableSchema(ss, table, database).json + + val newTaskConf = source.conf.copy( + `type` = "custom", + options = source.conf.options ++ Map("schema" -> tableSchemaJson, "class" -> classOf[EmptySource].getName) + ) + + val emptySource = EmptySource(newTaskConf) + + lastTx match { + case Some(txLog: TxLog) => + val txLastModifiedAt = txLog.lastModifiedAt + val tableLastModifiedAt = HiveUtil.getTableModificationTime(ss, table, database) + + tableLastModifiedAt.map { time => + if (time > txLastModifiedAt) source -> time + else emptySource -> time + }.getOrElse(source -> CanCommit.failed) + + case None => source -> HiveUtil.getTableModificationTime(ss, table, database).getOrElse(0L) + } + }.toMap + } + + /** + * Write TxLog of HiveSources + * + * @param hiveSourcesMap + * @param jobName + * @param jobDesc + * @param txUtil + */ + def writeHiveTxLog(hiveSourcesMap: Map[Source, Long], + jobName: String, + jobDesc: JobDescription, + txUtil: TransactionUtil): Unit = { + import CanCommit._ + + hiveSourcesMap.foreach { case (source, time) => + val table = source.conf.options("table") + val database = source.conf.options("database") + val sourceName = source.getName + + val txId = TxId(jobName, jobDesc.name, sourceName, s"$database.$table") + val commitId = HiveType.commitId(time) + + val txLog = TxLog(0, txId, commitType = HiveType, commitId = commitId, + lastModifiedAt = time, enabled = None) + + txUtil.writeTx(txLog) + } + } + + + /** + * Main function of TransactionUtil + * + * @param ss + * @param jobName + * @param jobDesc + * @param txUtil + * @param func + */ + def withTx(ss: SparkSession, + jobName: String, + jobDesc: JobDescription, + txUtil: TransactionUtil)(func: Job => Unit): Unit = { + val (incrementalSources, skipSources) = jobDesc.sources.partition { source => + source.conf.options.getOrElse("skip_tx", "false") != "true" + } + + val (fileSources, __otherSources) = incrementalSources.partition { source => + source.conf.options.contains("paths") + } + + val (hiveSources, _otherSources) = __otherSources.partition { source => + source.conf.`type` == "hive" + } + + val otherSources = _otherSources ++ skipSources + + val targetsPerSourcePath: Map[Source, Map[String, Seq[FileStatus]]] = toTargetsPerFileSourcePath(fileSources, ss, jobName, jobDesc, txUtil) + val newFileSources: Seq[Source] = toNewFileSources(targetsPerSourcePath, ss, jobName, jobDesc, txUtil) + val hiveSourceMap = toHiveSourcesModTimeMap(hiveSources, ss, jobName, jobDesc, txUtil) + val newHiveSources = hiveSourceMap.keySet + + val newJobDesc = jobDesc.copy(sources = newFileSources ++ newHiveSources ++ otherSources) + val newJob = new Job(ss, newJobDesc) + + func(newJob) + + writeFileTxLog(targetsPerSourcePath, jobName, jobDesc, txUtil) + writeHiveTxLog(hiveSourceMap, jobName, jobDesc, txUtil) + } +} + +trait TransactionUtil { + + import TransactionUtil._ + + val config: Config + + val baseTxTableRaw = + if (config.hasPath(BaseTxTableKey)) config.getString(BaseTxTableKey) + else "tx_logs" + + val baseTxTable = SQLSyntax.createUnsafely(baseTxTableRaw) + + /** + * read last committed transaction using context provided from spark session. + * + * @param txId + * @return + */ + def readTx(txId: TxId): Option[TxLog] + + /** + * write latest target as last committed transaction + * + * @param txLog + */ + def writeTx(txLog: TxLog): Unit +} + +/** + * the underlying JDBC Driver is singleton, so be sure to initialize singleton instance of this. + * currently only expect FileSource. need to add HiveSource support. + * + * @param config + */ +case class DBTransactionUtil(config: Config, + options: Map[String, String] = Map.empty) extends TransactionUtil { + + import scalikejdbc._ + + var initialized = false + + init(config, options) + + private def init(config: Config, options: Map[String, String] = Map.empty): Boolean = { + val optionConf = Management.toConfig(TaskConf.parseMetaStoreConfigs(options) ++ TaskConf.parseLocalCacheConfigs(options)) + val mergedConfig = optionConf.withFallback(config) + + synchronized { + if (!initialized) { + logger.info(s"[SCHEMA]: initializing...") + logger.info(s"[SCHEMA]: $mergedConfig") + + logger.info(s"[SCHEMA]: ${config.root().render(ConfigRenderOptions.concise())}") + + Schema.apply(mergedConfig) + + initialized = true + } + + initialized + } + } + + /** + * read last committed transaction using context provided from spark session. + * + * @param txId + * @return + */ + override def readTx(txId: TxId): Option[TxLog] = { + DB readOnly { implicit session => + readTxInner(txId) + } + } + + private def readTxInner(txId: TxId)(implicit session: DBSession): Option[TxLog] = { + sql""" + SELECT * + FROM ${baseTxTable} + WHERE job_name = ${txId.jobName} + AND app_name = ${txId.appName} + AND source_name = ${txId.sourceName} + AND path = ${txId.path} + """.map { rs => + TxLog.apply(rs) + }.single().apply() + } + + private def writeTxInner(txLog: TxLog)(implicit session: DBSession): Unit = { + sql""" + INSERT INTO + ${baseTxTable}( + job_name, app_name, source_name, path, commit_type, commit_id, + last_modified_at, enabled, updated_at + ) + VALUES + ( + ${txLog.txId.jobName}, ${txLog.txId.appName}, ${txLog.txId.sourceName}, ${txLog.txId.path}, + ${txLog.commitType.toColumnValue}, ${txLog.commitId}, ${txLog.lastModifiedAt}, ${txLog.enabled}, now() + ) + ON DUPLICATE KEY UPDATE + commit_id = ${txLog.commitId}, + last_modified_at = ${txLog.lastModifiedAt}, + enabled = ${txLog.enabled}, + updated_at = now() + """.execute().apply() + } + + /** + * write latest target as last committed transaction + * + * @param txLog + */ + override def writeTx(txLog: TxLog): Unit = { + DB localTx { implicit session => + try { + writeTxInner(txLog) + } catch { + case ex: Exception => + logger.error(s"write tx failed. $txLog") + } + } + } +} +