Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

totalDigest add spark dataframe column / array #65

Closed
thiakx opened this issue Jun 11, 2016 · 5 comments
Closed

totalDigest add spark dataframe column / array #65

thiakx opened this issue Jun 11, 2016 · 5 comments

Comments

@thiakx
Copy link

thiakx commented Jun 11, 2016

Currently, we are loading a spark dataframe column into totalDigest with a hackerish way of using .take() and foreach, as tdigests keep throwing an "object not serializable" error without .take(). Is there a more native way of loading large array / spark dataframe columns directly into totalDigest?

Sample code we using
val totalDigest = TDigest.createDigest(100)
val data =df.select("col").rdd.map(r => r.getDouble(0)).take(numberOfRows)
data.foreach(value => totalDigest.add(value))

Alternative, using array of bytes:
http://apache-spark-user-list.1001560.n3.nabble.com/Percentile-td19978.html#a20032

@thiakx thiakx changed the title totalDigest add spark dataframe column totalDigest add spark dataframe column / array Jun 11, 2016
@tdunning
Copy link
Owner

Yes. This is a real problem.

There is a pull request that I have accepted and which should make this
easier for you in the next release.

Check out https://gist.github.com/RobColeman/7a5ebcb7c155c94b0a62 in the
meantime

On Sat, Jun 11, 2016 at 6:50 PM, thiakx notifications@github.com wrote:

Currently, we are loading a spark dataframe column into totalDigest with a
hackerish way of using .take() and foreach, as tdigests keep throwing an
"object not serializable" error without .take(). Is there a more native way
of loading large array / spark dataframe columns directly into totalDigest?

Sample code we using
val totalDigest = TDigest.createDigest(100)
val data =df.select("col").rdd.map(r => r.getDouble(0)).take(numberOfRows)
data.foreach(value => totalDigest.add(value))

Alternative, using array of bytes:

http://apache-spark-user-list.1001560.n3.nabble.com/Percentile-td19978.html#a20032


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#65, or mute the thread
https://github.com/notifications/unsubscribe/AAPSemy2F8_Aa3owW3Ef1kDKlM4uw11lks5qKudHgaJpZM4Izlzw
.

@Geeber
Copy link
Contributor

Geeber commented Jun 13, 2016

See my comment on #56 (comment) for a significant warning/caveat.

Also, note that the linked TreeDigestHelper scala code will result in a deserialization/serialization for every single element in your RDD, and as such it is incredibly expensive in terms of CPU. Depending on your application's performance characteristics maybe that's OK, but at large scale I can't imagine it being viable.

Here's a different approach that we're currently using in our Spark code:

/** Wrap TreeDigest inside a class that properly handles serialization so that it can be used inside Spark.
  *
  * @param digest The TreeDigest to wrap.
  */
class TreeDigestWrapper(private var digest: TreeDigest) extends Serializable {
  def size(): Long = {
    digest.size()
  }

  def add(x: Double): Unit = {
    digest.add(x)
  }

  // ... expose other wrapper methods as necessary

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = {
    val bytes = new Array[Byte](digest.byteSize)
    out.writeInt(bytes.length)
    digest.asBytes(ByteBuffer.wrap(bytes))
    out.write(bytes)
  }

  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = {
    val length = in.readInt()
    val bytes = new Array[Byte](length)
    in.readFully(bytes)
    digest = TreeDigest.fromBytes(ByteBuffer.wrap(bytes))
  }
}

Then you can perform operations directly on the wrapper, and you'll only pay serialization/deserialization costs when data is moved between JVMs (which Spark should be good at minimizing).

@tdunning
Copy link
Owner

Kevin,

Don't you think it would be better to do without the wrapper entirely?

On Mon, Jun 13, 2016 at 6:03 AM, Kevin Litwack notifications@github.com
wrote:

See my comment on #56 (comment)
#56 (comment) for
a significant warning/caveat.

Also, note that the linked TreeDigestHelper scala code will result in a
deserialization/serialization for every single element in your RDD, and
as such it is incredibly expensive in terms of CPU. Depending on your
application's performance characteristics maybe that's OK, but at large
scale I can't imagine it being viable.

Here's a different approach that we're currently using in our Spark code:

/** Wrap TreeDigest inside a class that properly handles serialization so that it can be used inside Spark. * * @param digest The TreeDigest to wrap. */class TreeDigestWrapper(private var digest: TreeDigest) extends Serializable {
def size(): Long = {
digest.size()
}

def add(x: Double): Unit = {
digest.add(x)
}

// ... expose other wrapper methods as necessary

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = {
val bytes = new ArrayByte
out.writeInt(bytes.length)
digest.asBytes(ByteBuffer.wrap(bytes))
out.write(bytes)
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {
val length = in.readInt()
val bytes = new ArrayByte
in.readFully(bytes)
digest = TreeDigest.fromBytes(ByteBuffer.wrap(bytes))
}
}

Then you can perform operations directly on the wrapper, and you'll only
pay serialization/deserialization costs when data is moved between JVMs
(which Spark should be good at minimizing).


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
#65 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAPSelOy2l93ormoxuUHzHZXWlRcoZnnks5qLNaTgaJpZM4Izlzw
.

@Geeber
Copy link
Contributor

Geeber commented Jun 13, 2016

Oh, yeah just making things natively Serializable is preferable I would think. But I figured I'd post this here as an alternative to those helpers in the meantime.

@tdunning
Copy link
Owner

The digests are all natively Serializable now. This will be improved in the future to allow cross serialization and use more economical formats, but for now it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants