Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.kafka010

import java.io.Writer

import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.io._
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand Down Expand Up @@ -111,7 +113,22 @@ private[kafka010] case class KafkaSource(
* `KafkaConsumer.poll` may hang forever (KAFKA-1894).
*/
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
val metadataLog =
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
out.write(bytes.length)
out.write(bytes)
}

override def deserialize(in: InputStream): KafkaSourceOffset = {
val length = in.read()
val bytes = new Array[Byte](length)
in.read(bytes)
KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
}
}

metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.Offset
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
override def toString(): String = {
partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
}

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}

/** Companion object of the [[KafkaSourceOffset]] */
Expand All @@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset {
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
offset match {
case o: KafkaSourceOffset => o.partitionToOffsets
case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets
case _ =>
throw new IllegalArgumentException(
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
Expand All @@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset {
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap)
}

/**
* Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]]
*/
def apply(offset: SerializedOffset): KafkaSourceOffset =
KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.sql.kafka010

import java.io.File

import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.OffsetSuite
import org.apache.spark.sql.test.SharedSQLContext

class KafkaSourceOffsetSuite extends OffsetSuite {
class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {

compare(
one = KafkaSourceOffset(("t", 0, 1L)),
Expand All @@ -36,4 +40,53 @@ class KafkaSourceOffsetSuite extends OffsetSuite {
compare(
one = KafkaSourceOffset(("t", 0, 1L)),
two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))


val kso1 = KafkaSourceOffset(("t", 0, 1L))
val kso2 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L))
val kso3 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L), ("t", 1, 4L))

compare(KafkaSourceOffset(SerializedOffset(kso1.json)),
KafkaSourceOffset(SerializedOffset(kso2.json)))

test("basic serialization - deserialization") {
assert(KafkaSourceOffset.getPartitionOffsets(kso1) ==
KafkaSourceOffset.getPartitionOffsets(SerializedOffset(kso1.json)))
}


testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
withTempDir { temp =>
// use non-existent directory to test whether log make the dir
val dir = new File(temp, "dir")
val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
val batch0 = OffsetSeq.fill(kso1)
val batch1 = OffsetSeq.fill(kso2, kso3)

val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
SerializedOffset(o.json))): _*)

val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
SerializedOffset(o.json))): _*)

assert(metadataLog.add(0, batch0))
assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
assert(metadataLog.get(0) === Some(batch0Serialized))

assert(metadataLog.add(1, batch1))
assert(metadataLog.get(0) === Some(batch0Serialized))
assert(metadataLog.get(1) === Some(batch1Serialized))
assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
assert(metadataLog.get(None, Some(1)) ===
Array(0 -> batch0Serialized, 1 -> batch1Serialized))

// Adding the same batch does nothing
metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
assert(metadataLog.get(0) === Some(batch0Serialized))
assert(metadataLog.get(1) === Some(batch1Serialized))
assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
assert(metadataLog.get(None, Some(1)) ===
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}
}
12 changes: 6 additions & 6 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,15 @@ def __str__(self):
triggerId: 5
Source statuses [1 source]:
Source 1 - MySource1
Available offset: #0
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [#1, -]
Committed offsets: [1, -]
"""
return self._jsqs.toString()

Expand Down Expand Up @@ -366,7 +366,7 @@ def __str__(self):

>>> print(sqs.sourceStatuses[0])
Status of source MySource1
Available offset: #0
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
Expand Down Expand Up @@ -396,7 +396,7 @@ def offsetDesc(self):
Description of the current offset if known.

>>> sqs.sourceStatuses[0].offsetDesc
u'#0'
u'0'
"""
return self._jss.offsetDesc()

Expand Down Expand Up @@ -457,7 +457,7 @@ def __str__(self):

>>> print(sqs.sinkStatus)
Status of sink MySink
Committed offsets: [#1, -]
Committed offsets: [1, -]
"""
return self._jss.toString()

Expand All @@ -481,7 +481,7 @@ def offsetDesc(self):
Description of the current offsets up to which data has been written by the sink.

>>> sqs.sinkStatus.offsetDesc
u'[#1, -]'
u'[1, -]'
"""
return self._jss.offsetDesc()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.io.{Source => IOSource}
import scala.reflect.ClassTag

import org.apache.hadoop.fs.{Path, PathFilter}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.SparkSession

Expand All @@ -37,14 +39,19 @@ import org.apache.spark.sql.SparkSession
* compact log files every 10 batches by default into a big file. When
* doing a compaction, it will read all old log files and merge them with the new batch.
*/
abstract class CompactibleFileStreamLog[T: ClassTag](
abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this AnyRef needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required by json4s that T be an AnyRef.

metadataLogVersion: String,
sparkSession: SparkSession,
path: String)
extends HDFSMetadataLog[Array[T]](sparkSession, path) {

import CompactibleFileStreamLog._

private implicit val formats = Serialization.formats(NoTypeHints)

/** Needed to serialize type T into JSON when using Jackson */
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)

/**
* If we delete the old files after compaction at once, there is a race condition in S3: other
* processes may see the old files are deleted but still cannot see the compaction file using
Expand All @@ -58,16 +65,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag](

protected def compactInterval: Int

/**
* Serialize the data into encoded string.
*/
protected def serializeData(t: T): String

/**
* Deserialize the string into data object.
*/
protected def deserializeData(encodedString: String): T

/**
* Filter out the obsolete logs.
*/
Expand Down Expand Up @@ -99,7 +96,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
out.write(metadataLogVersion.getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
out.write(serializeData(data).getBytes(UTF_8))
out.write(Serialization.write(data).getBytes(UTF_8))
}
}

Expand All @@ -112,7 +109,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
lines.map(deserializeData).toArray
lines.map(Serialization.read[T]).toArray
}

override def add(batchId: Long, logs: Array[T]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ class FileStreamSinkLog(
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")

protected override def serializeData(data: SinkFileStatus): String = {
write(data)
}

protected override def deserializeData(encodedString: String): SinkFileStatus = {
read[SinkFileStatus](encodedString)
}

override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class FileStreamSource(
* Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this change that start and end Offset will not the custom Offset class defined by the source, i think it is important to document this in the Source.getBatch class that the user should not make that assumption.

val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
val endId = end.asInstanceOf[LongOffset].offset
val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset
val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset

assert(startId <= endId)
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,6 @@ class FileStreamSourceLog(
}
}

protected override def serializeData(data: FileEntry): String = {
Serialization.write(data)
}

protected override def deserializeData(encodedString: String): FileEntry = {
Serialization.read[FileEntry](encodedString)
}

def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
logs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
import java.io._
import java.nio.charset.StandardCharsets
import java.util.{ConcurrentModificationException, EnumSet, UUID}

import scala.reflect.ClassTag
Expand All @@ -26,9 +27,10 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.UninterruptibleThread

Expand All @@ -44,9 +46,14 @@ import org.apache.spark.util.UninterruptibleThread
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {

private implicit val formats = Serialization.formats(NoTypeHints)

/** Needed to serialize type T into JSON when using Jackson */
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed now? This wasnt needed earlier when we were using json serialization in FilestreamSinkLog, then why is the manifest needed now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it why this is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you added "Needed by Jackson to serialize/deserialize... "

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Avoid serializing generic sequences, see SPARK-17372
require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
"Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
Expand All @@ -67,8 +74,6 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
override def accept(path: Path): Boolean = isBatchFile(path)
}

private val serializer = new JavaSerializer(sparkSession.sparkContext.conf).newInstance()

protected def batchIdToPath(batchId: Long): Path = {
new Path(metadataPath, batchId.toString)
}
Expand All @@ -88,14 +93,13 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)

protected def serialize(metadata: T, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
val outStream = serializer.serializeStream(out)
outStream.writeObject(metadata)
Serialization.write(metadata, out)
}

protected def deserialize(in: InputStream): T = {
// called inside a try-finally where the underlying stream is closed in the caller
val inStream = serializer.deserializeStream(in)
inStream.readObject[T]()
val reader = new InputStreamReader(in, StandardCharsets.UTF_8)
Serialization.read[T](reader)
}

/**
Expand Down
Loading