Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Donghan Zhang committed Feb 21, 2024
1 parent e2873ce commit 766e04d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 25 deletions.
32 changes: 14 additions & 18 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,26 +249,22 @@ object Driver {
)
val df = join.computeJoin(args.stepDays.toOption, args.startPartitionOverride.toOption)

df match {
case None => {
logger.info("Selected join parts are populated. No final join is required.")
return
}
case Some(df) => {
if (args.shouldExport()) {
args.exportTableToLocal(args.joinConf.metaData.outputTable, tableUtils)
}

if (args.shouldPerformValidate()) {
val keys = CompareJob.getJoinKeys(args.joinConf, tableUtils)
args.validateResult(df, keys, tableUtils)
}
if (args.selectedJoinParts.isDefined) {
logger.info("Selected join parts are populated successfully. No final join is required. Exiting.")
return
}
if (args.shouldExport()) {
args.exportTableToLocal(args.joinConf.metaData.outputTable, tableUtils)
}

df.show(numRows = 3, truncate = 0, vertical = true)
logger.info(
s"\nShowing three rows of output above.\nQuery table `${args.joinConf.metaData.outputTable}` for more.\n")
}
if (args.shouldPerformValidate()) {
val keys = CompareJob.getJoinKeys(args.joinConf, tableUtils)
args.validateResult(df, keys, tableUtils)
}

df.show(numRows = 3, truncate = 0, vertical = true)
logger.info(
s"\nShowing three rows of output above.\nQuery table `${args.joinConf.metaData.outputTable}` for more.\n")
}
}

Expand Down
12 changes: 5 additions & 7 deletions spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ abstract class JoinBase(joinConf: api.Join,

def computeRange(leftDf: DataFrame, leftRange: PartitionRange, bootstrapInfo: BootstrapInfo): Option[DataFrame]

def computeJoin(stepDays: Option[Int] = None, overrideStartPartition: Option[String] = None): Option[DataFrame] = {
def computeJoin(stepDays: Option[Int] = None, overrideStartPartition: Option[String] = None): DataFrame = {

assert(Option(joinConf.metaData.team).nonEmpty,
s"join.metaData.team needs to be set for join ${joinConf.metaData.name}")
Expand Down Expand Up @@ -338,7 +338,7 @@ abstract class JoinBase(joinConf: api.Join,
def finalResult: DataFrame = tableUtils.sql(rangeToFill.genScanQuery(null, outputTable))
if (unfilledRanges.isEmpty) {
logger.info(s"\nThere is no data to compute based on end partition of ${rangeToFill.end}.\n\n Exiting..")
return Some(finalResult)
return finalResult
}

stepDays.foreach(metrics.gauge("step_days", _))
Expand All @@ -362,9 +362,9 @@ abstract class JoinBase(joinConf: api.Join,
val finalDf = computeRange(leftDfInRange, range, bootstrapInfo)
if (selectedJoinParts.isDefined) {
assert(finalDf.isEmpty, "finalDf should be empty")
logger.info(s"Skipping final join for range: ${range.toString} $progress")
logger.info(s"Skipping writing to the output table for range: ${range.toString} $progress")
} else {
finalDf.save(outputTable, tableProps, autoExpand = true)
finalDf.get.save(outputTable, tableProps, autoExpand = true)
val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000)
metrics.gauge(Metrics.Name.LatencyMinutes, elapsedMins)
metrics.gauge(Metrics.Name.PartitionCount, range.partitions.length)
Expand All @@ -375,11 +375,9 @@ abstract class JoinBase(joinConf: api.Join,
}
if (selectedJoinParts.isDefined) {
logger.info(s"Completed join parts: ${selectedJoinParts.get.mkString(", ")}")
logger.info(s"Skipping final join...")
None
} else {
logger.info(s"Wrote to table $outputTable, into partitions: $unfilledRanges")
Some(finalResult)
}
finalResult
}
}

0 comments on commit 766e04d

Please sign in to comment.