Skip to content

Commit 1390711

Browse files
author
Tyson Condie
committed
Remove serialize/deserialize data methods in CompactibleFileStreamLog... everything is JSON serialized.
1 parent f1010ee commit 1390711

File tree

4 files changed

+11
-29
lines changed

4 files changed

+11
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import scala.io.{Source => IOSource}
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.hadoop.fs.{Path, PathFilter}
27+
import org.json4s.NoTypeHints
28+
import org.json4s.jackson.Serialization
2729

2830
import org.apache.spark.sql.SparkSession
2931

@@ -37,14 +39,19 @@ import org.apache.spark.sql.SparkSession
3739
* compact log files every 10 batches by default into a big file. When
3840
* doing a compaction, it will read all old log files and merge them with the new batch.
3941
*/
40-
abstract class CompactibleFileStreamLog[T: ClassTag](
42+
abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
4143
metadataLogVersion: String,
4244
sparkSession: SparkSession,
4345
path: String)
4446
extends HDFSMetadataLog[Array[T]](sparkSession, path) {
4547

4648
import CompactibleFileStreamLog._
4749

50+
private implicit val formats = Serialization.formats(NoTypeHints)
51+
52+
/** Needed to serialize type T into JSON */
53+
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
54+
4855
/**
4956
* If we delete the old files after compaction at once, there is a race condition in S3: other
5057
* processes may see the old files are deleted but still cannot see the compaction file using
@@ -58,16 +65,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
5865

5966
protected def compactInterval: Int
6067

61-
/**
62-
* Serialize the data into encoded string.
63-
*/
64-
protected def serializeData(t: T): String
65-
66-
/**
67-
* Deserialize the string into data object.
68-
*/
69-
protected def deserializeData(encodedString: String): T
70-
7168
/**
7269
* Filter out the obsolete logs.
7370
*/
@@ -99,7 +96,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
9996
out.write(metadataLogVersion.getBytes(UTF_8))
10097
logData.foreach { data =>
10198
out.write('\n')
102-
out.write(serializeData(data).getBytes(UTF_8))
99+
out.write(Serialization.write(data).getBytes(UTF_8))
103100
}
104101
}
105102

@@ -112,7 +109,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
112109
if (version != metadataLogVersion) {
113110
throw new IllegalStateException(s"Unknown log version: ${version}")
114111
}
115-
lines.map(deserializeData).toArray
112+
lines.map(Serialization.read[T]).toArray
116113
}
117114

118115
override def add(batchId: Long, logs: Array[T]): Boolean = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,6 @@ class FileStreamSinkLog(
9393
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
9494
"to a positive value.")
9595

96-
protected override def serializeData(data: SinkFileStatus): String = {
97-
write(data)
98-
}
99-
100-
protected override def deserializeData(encodedString: String): SinkFileStatus = {
101-
read[SinkFileStatus](encodedString)
102-
}
103-
10496
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
10597
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
10698
if (deletedFiles.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,6 @@ class FileStreamSourceLog(
6060
}
6161
}
6262

63-
protected override def serializeData(data: FileEntry): String = {
64-
Serialization.write(data)
65-
}
66-
67-
protected override def deserializeData(encodedString: String): FileEntry = {
68-
Serialization.read[FileEntry](encodedString)
69-
}
70-
7163
def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
7264
logs
7365
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
5050

5151
private implicit val formats = Serialization.formats(NoTypeHints)
5252

53+
/** Needed to serialize type T into JSON */
5354
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
5455

5556
// Avoid serializing generic sequences, see SPARK-17372

0 commit comments

Comments
 (0)