From 337ad489bb43ef93a651bdd4952bd7f0738698dc Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Sun, 3 Sep 2017 20:17:45 +0200 Subject: [PATCH 1/2] [SPARK-21901][SS] Define toString for StateOperatorProgress --- .../apache/spark/sql/streaming/progress.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3000c4233cfb..273a134f3257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -55,6 +55,8 @@ class StateOperatorProgress private[sql]( ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) } + + override def toString: String = prettyJson } /** @@ -177,11 +179,11 @@ class SourceProgress protected[sql]( } ("description" -> JString(description)) ~ - ("startOffset" -> tryParse(startOffset)) ~ - ("endOffset" -> tryParse(endOffset)) ~ - ("numInputRows" -> JInt(numInputRows)) ~ - ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ - ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + ("startOffset" -> tryParse(startOffset)) ~ + ("endOffset" -> tryParse(endOffset)) ~ + ("numInputRows" -> JInt(numInputRows)) ~ + ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) } private def tryParse(json: String) = try { @@ -200,7 +202,7 @@ class SourceProgress protected[sql]( */ @InterfaceStability.Evolving class SinkProgress protected[sql]( - val description: String) extends Serializable { + val description: String) extends Serializable { /** The compact JSON representation of this progress. */ def json: String = compact(render(jsonValue)) @@ -211,6 +213,6 @@ class SinkProgress protected[sql]( override def toString: String = prettyJson private[sql] def jsonValue: JValue = { - ("description" -> JString(description)) + "description" -> JString(description) } } From 0e2b9c6006593b14b9bda652225252ed475ebbfe Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 5 Sep 2017 11:21:24 +0200 Subject: [PATCH 2/2] After review: reverting unrelated/stylistic changes --- .../org/apache/spark/sql/streaming/progress.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 273a134f3257..cedc1dce4a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -179,11 +179,11 @@ class SourceProgress protected[sql]( } ("description" -> JString(description)) ~ - ("startOffset" -> tryParse(startOffset)) ~ - ("endOffset" -> tryParse(endOffset)) ~ - ("numInputRows" -> JInt(numInputRows)) ~ - ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ - ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + ("startOffset" -> tryParse(startOffset)) ~ + ("endOffset" -> tryParse(endOffset)) ~ + ("numInputRows" -> JInt(numInputRows)) ~ + ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) } private def tryParse(json: String) = try { @@ -202,7 +202,7 @@ class SourceProgress protected[sql]( */ @InterfaceStability.Evolving class SinkProgress protected[sql]( - val description: String) extends Serializable { + val description: String) extends Serializable { /** The compact JSON representation of this progress. */ def json: String = compact(render(jsonValue)) @@ -213,6 +213,6 @@ class SinkProgress protected[sql]( override def toString: String = prettyJson private[sql] def jsonValue: JValue = { - "description" -> JString(description) + ("description" -> JString(description)) } }