Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Jul 14, 2017

What changes were proposed in this pull request?

Currently, there is no tracking of memory usage of state stores. This JIRA is to expose that through SQL metrics and StreamingQueryProgress.

Additionally, added the ability to expose implementation-specific metrics through the StateStore APIs to the SQLMetrics.

How was this patch tested?

Added unit tests.

import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._

class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
implicit class EqualsIgnoreCRLF(source: String) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaces this with assertJson (see below) because this code made it harded to debug the differences between json strings. With assertJson, scala test would show nice diffs.

[info] - StreamingQueryProgress - prettyJson *** FAILED *** (137 milliseconds)
[info]   "..."numRowsUpdated" : 1[,]
[info]       "memoryUsedByte..." did not equal "..."numRowsUpdated" : 1[]
[info]       "memoryUsedByte..." (StreamingQueryStatusAndProgressSuite.scala:213)
[info]   org.scalatest.exceptions.TestFailedException:

@SparkQA
Copy link

SparkQA commented Jul 14, 2017

Test build #79595 has finished for PR 18629 at commit 667e903.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StateStoreMetrics(val numKeys: Long, val memoryUsedBytes: Long)

@SparkQA
Copy link

SparkQA commented Jul 14, 2017

Test build #79610 has finished for PR 18629 at commit c8226b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StateStoreMetrics(
  • case class StateStoreCustomMetric(name: String, desc: String)


override def numKeys(): Long = mapToUpdate.size()
override def metrics: StateStoreMetrics = {
StateStoreMetrics(mapToUpdate.size(), SizeEstimator.estimate(mapToUpdate), Map.empty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a flag for this? SizeEstimator.estimate will be very slow when there are a lot of states, because it scans all objects using reflection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do some tests to understand how long it will take. For arrays it will just sample, so it should not take that long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Takes < 0.5 ms with a state store with 5 million elements

@zsxwing
Copy link
Member

zsxwing commented Jul 17, 2017

LGTM. Merging to master.

@asfgit asfgit closed this in 9d8c831 Jul 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants