Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.kafka010

import java.{util => ju}
import java.text.SimpleDateFormat
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.util.regex.Pattern

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -46,6 +47,10 @@ import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
private[spark] object KafkaTokenUtil extends Logging {
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
private val TOKEN_SERVICE_PREFIX = "kafka.server.delegation.token"
private val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm")
.withZone(ZoneId.systemDefault())

private[kafka010] def getTokenService(identifier: String): Text =
new Text(s"$TOKEN_SERVICE_PREFIX.$identifier")
Expand Down Expand Up @@ -220,7 +225,6 @@ private[spark] object KafkaTokenUtil extends Logging {

private def printToken(token: DelegationToken): Unit = {
if (log.isDebugEnabled) {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
val tokenInfo = token.tokenInfo
Expand All @@ -229,9 +233,9 @@ private[spark] object KafkaTokenUtil extends Logging {
REDACTION_REPLACEMENT_TEXT,
tokenInfo.owner,
tokenInfo.renewersAsString,
dateFormat.format(tokenInfo.issueTimestamp),
dateFormat.format(tokenInfo.expiryTimestamp),
dateFormat.format(tokenInfo.maxTimestamp)))
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(tokenInfo.issueTimestamp)),
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(tokenInfo.expiryTimestamp)),
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(tokenInfo.maxTimestamp))))
}
}

Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.deploy.master

import java.text.SimpleDateFormat
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

Expand Down Expand Up @@ -56,10 +57,6 @@ private[deploy] class Master(

private val driverIdPattern = conf.get(DRIVER_ID_PATTERN)
private val appIdPattern = conf.get(APP_ID_PATTERN)

// For application IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)

private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000
private val retainedApplications = conf.get(RETAINED_APPLICATIONS)
private val retainedDrivers = conf.get(RETAINED_DRIVERS)
Expand Down Expand Up @@ -1224,7 +1221,8 @@ private[deploy] class Master(

/** Generate a new app ID given an app's submission date */
private def newApplicationId(submitDate: Date): String = {
val appId = appIdPattern.format(createDateFormat.format(submitDate), nextAppNumber)
val appId = appIdPattern.format(
Master.DATE_TIME_FORMATTER.format(submitDate.toInstant), nextAppNumber)
nextAppNumber += 1
if (moduloAppNumber > 0) {
nextAppNumber %= moduloAppNumber
Expand Down Expand Up @@ -1252,7 +1250,8 @@ private[deploy] class Master(
}

private def newDriverId(submitDate: Date): String = {
val appId = driverIdPattern.format(createDateFormat.format(submitDate), nextDriverNumber)
val appId = driverIdPattern.format(
Master.DATE_TIME_FORMATTER.format(submitDate.toInstant), nextDriverNumber)
nextDriverNumber += 1
appId
}
Expand Down Expand Up @@ -1299,6 +1298,12 @@ private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"

// For application IDs
private val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyyMMddHHmmss", Locale.US)
.withZone(ZoneId.systemDefault())

def main(argStrings: Array[String]): Unit = {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.deploy.worker

import java.io.{File, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale, UUID}
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.util.{Locale, UUID}
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.function.Supplier
Expand Down Expand Up @@ -90,8 +91,6 @@ private[deploy] class Worker(
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))

// For worker and executor IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4

Expand Down Expand Up @@ -821,7 +820,7 @@ private[deploy] class Worker(
}

private def generateWorkerId(): String = {
workerIdPattern.format(createDateFormat.format(new Date), host, port)
workerIdPattern.format(Worker.DATE_TIME_FORMATTER.format(Instant.now()), host, port)
}

override def onStop(): Unit = {
Expand Down Expand Up @@ -937,6 +936,12 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"
private val SSL_NODE_LOCAL_CONFIG_PATTERN = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r

// For worker and executor IDs
private val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyyMMddHHmmss", Locale.US)
.withZone(ZoneId.systemDefault())

def main(argStrings: Array[String]): Unit = {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.internal.io

import java.text.SimpleDateFormat
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}

import scala.util.{DynamicVariable, Random}
Expand All @@ -39,6 +40,12 @@ object SparkHadoopWriterUtils {
private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
private val RAND = new Random()

// For job tracker IDs
private val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyyMMddHHmmss", Locale.US)
.withZone(ZoneId.systemDefault())

/**
* Create a job ID.
*
Expand Down Expand Up @@ -71,7 +78,7 @@ object SparkHadoopWriterUtils {
* @return a string for a job ID
*/
def createJobTrackerID(time: Date): String = {
val base = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
val base = DATE_TIME_FORMATTER.format(time.toInstant)
var l1 = RAND.nextLong()
if (l1 < 0) {
l1 = -l1
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.rdd

import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}

import scala.collection.immutable.Map
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
Expand Down Expand Up @@ -302,7 +302,7 @@ class HadoopRDD[K, V](
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
HadoopRDD.DATE_TIME_FORMATTER.format(createTime.toInstant),
context.stageId(), theSplit.index, context.attemptNumber(), jobConf)

reader =
Expand Down Expand Up @@ -426,6 +426,11 @@ private[spark] object HadoopRDD extends Logging {
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

private val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyyMMddHHmmss", Locale.US)
.withZone(ZoneId.systemDefault())

/**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.rdd

import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.util.Locale

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -104,8 +105,11 @@ class NewHadoopRDD[K, V](
// private val serializableConf = new SerializableWritable(_conf)

private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
formatter.format(new Date())
val dateTimeFormatter =
DateTimeFormatter
.ofPattern("yyyyMMddHHmmss", Locale.US)
.withZone(ZoneId.systemDefault())
dateTimeFormatter.format(Instant.now())
}

@transient protected val jobId = new JobID(jobTrackerId, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package org.apache.spark.mllib.pmml.`export`

import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.util.Locale

import scala.beans.BeanProperty

import org.dmg.pmml.{Application, Header, PMML, Timestamp}

private[mllib] trait PMMLModelExport {

private val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss", Locale.US)
.withZone(ZoneId.systemDefault())

/**
* Holder of the exported model in PMML format
*/
Expand All @@ -34,7 +40,7 @@ private[mllib] trait PMMLModelExport {
val version = getClass.getPackage.getImplementationVersion
val app = new Application("Apache Spark MLlib").setVersion(version)
val timestamp = new Timestamp()
.addContent(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss", Locale.US).format(new Date()))
.addContent(DATE_TIME_FORMATTER.format(Instant.now()))
val header = new Header()
.setApplication(app)
.setTimestamp(timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.text.SimpleDateFormat
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

import com.codahale.metrics.{Gauge, MetricRegistry}

Expand All @@ -42,8 +43,10 @@ class MetricsReporter(
registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
registerGauge("latency", _.durationMs.getOrDefault("triggerExecution", 0L).longValue(), 0L)

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
private val timestampFormat =
DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
.withZone(DateTimeUtils.getZoneId("UTC"))

registerGauge("eventTime-watermark",
progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
Expand All @@ -53,7 +56,8 @@ class MetricsReporter(

private def convertStringDateToMillis(isoUtcDateStr: String) = {
if (isoUtcDateStr != null) {
timestampFormat.parse(isoUtcDateStr).getTime
val zonedDateTime = ZonedDateTime.parse(isoUtcDateStr, timestampFormat)
zonedDateTime.toInstant.toEpochMilli
} else {
0L
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.sql.execution.streaming

import java.text.SimpleDateFormat
import java.util.{Date, Optional, UUID}
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.{Optional, UUID}

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -91,8 +92,10 @@ trait ProgressReporter extends Logging {
// The timestamp we report an event that has not executed anything
private var lastNoExecutionProgressEventTime = Long.MinValue

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
private val timestampFormat =
DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
.withZone(DateTimeUtils.getZoneId("UTC"))

@volatile
protected var currentStatus: StreamingQueryStatus = {
Expand Down Expand Up @@ -435,7 +438,7 @@ trait ProgressReporter extends Logging {
}

protected def formatTimestamp(millis: Long): String = {
timestampFormat.format(new Date(millis))
timestampFormat.format(Instant.ofEpochMilli(millis))
}

/** Updates the message returned in `status`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.continuous
import java.io.{BufferedReader, InputStreamReader, IOException}
import java.net.Socket
import java.sql.Timestamp
import java.util.Calendar
import java.time.Instant
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -185,8 +185,7 @@ class TextSocketContinuousStream(
TextSocketContinuousStream.this.synchronized {
currentOffset += 1
val newData = (line,
Timestamp.valueOf(
TextSocketReader.DATE_FORMAT.format(Calendar.getInstance().getTime()))
Timestamp.valueOf(TextSocketReader.DATE_TIME_FORMATTER.format(Instant.now()))
)
buckets(currentOffset % numPartitions) += toRow(newData)
.copy().asInstanceOf[UnsafeRow]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.execution.streaming.sources

import java.text.SimpleDateFormat
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util
import java.util.Locale

Expand Down Expand Up @@ -108,5 +109,8 @@ object TextSocketReader {
val SCHEMA_REGULAR = StructType(Array(StructField("value", StringType)))
val SCHEMA_TIMESTAMP = StructType(Array(StructField("value", StringType),
StructField("timestamp", TimestampType)))
val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
val DATE_TIME_FORMATTER =
DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.US)
.withZone(ZoneId.systemDefault())
}
Loading