Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark backend #1832

Merged
merged 4 commits into from
Mar 6, 2018
Merged

Spark backend #1832

merged 4 commits into from
Mar 6, 2018

Conversation

johnynek
Copy link
Collaborator

@johnynek johnynek commented Mar 2, 2018

This is a very basic beginning to a spark backend.

It is not complete, but does support map-only operations.

There is one big question: can we really just lie to spark and say we have AnyRef everywhere? I think it may just make serialization worse (kryo writing the classnames), but maybe we can circumvent that later since scalding allows configs to have registered classes named, maybe we can pass that information to spark somehow.

cc @ianoc

@johnynek johnynek changed the title WIP: Spark backend Spark backend Mar 4, 2018
@johnynek
Copy link
Collaborator Author

johnynek commented Mar 4, 2018

@ianoc can you take a look?

This is not finished, but what we have is testable and in the interest of keeping the PRs small, I'd like to merge this and then follow up with more parts:

  1. (this) basic framework, map-only operation support.
  2. reduce operation support.
  3. join support
  4. full Mode/Execution support

@johnynek
Copy link
Collaborator Author

johnynek commented Mar 5, 2018

build fails because spark is not there for 2.12. Will remove spark 2.12 from the CI.

object SparkPlanner {

// TODO, this may be just inefficient, or it may be wrong
implicit private def fakeClassTag[A]: ClassTag[A] = ClassTag(classOf[AnyRef]).asInstanceOf[ClassTag[A]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think they just register these to kryo so i imagine this should just hit the inefficient paths. For a normal execution app though this i guess does drop some sort of performance.

case (ForceToDisk(pipe), rec) =>
rec(pipe).persist(StorageLevel.DISK_ONLY)
case (Fork(pipe), rec) =>
rec(pipe).persist(StorageLevel.MEMORY_ONLY)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to bother updating this here, but i'd leave these to DISK_ONLY until we can figure out how/when we can upgrade the state. (Spark also has the output of shuffles persisted, so we might be able to get the planner to realize when we should have shuffle data and not do the persisting then).

???
case (slk @ SumByLocalKeys(_, _), rec) =>
def sum[K, V](sblk: SumByLocalKeys[K, V]): R[(K, V)] = {
// we can use Algebird's SummingCache https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/SummingCache.scala#L36
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since these partitions are ~usually ondisk or can fit in memory we might just want MapAlgebra.sumByKey possibly (or sort and fold i guess..).

@ianoc
Copy link
Collaborator

ianoc commented Mar 6, 2018

Some comments but they are more of an ongoing discussion than anything else. LGTM

@johnynek
Copy link
Collaborator Author

johnynek commented Mar 6, 2018

thanks for the comments. these seem like good points to keep in mind as we optimize. All of your comments are doable for sure. Will address in the follow ups.

@johnynek johnynek merged commit db64ad3 into develop Mar 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants