Skip to content

Conversation

@staple
Copy link
Contributor

@staple staple commented Sep 11, 2014

When running an iterative learning algorithm, it makes sense that the input RDD be cached for improved performance. When learning is applied to a python RDD, previously the python RDD was always cached, then in scala that cached RDD was mapped to an uncached deserialized RDD, and the uncached RDD was passed to the learning algorithm. Since the RDD with deserialized data was uncached, learning algorithms would implicitly deserialize the same data repeatedly, on every iteration.

This patch moves RDD caching after deserialization for learning algorithms that should be called with a cached RDD. For algorithms that implement their own caching internally, the input RDD is no longer cached. Below I’ve listed the different learning routines accessible from python, the location where caching was previously enabled, and the location (if any) where caching is now enabled by this patch.

LogisticRegressionWithSGD:
was: python (in _regression_train_wrapper/_get_unmangled_labeled_point_rdd)
now: jvm (trainRegressionModel)

SVMWithSGD:
was: python (in _regression_train_wrapper/_get_unmangled_labeled_point_rdd)
now: jvm (trainRegressionModel)

NaiveBayes:
was: python (in _get_unmangled_labeled_point_rdd)
now: none

KMeans:
was: python (in _get_unmangled_double_vector_rdd)
now: jvm (trainKMeansModel)

ALS:
was: python (in _get_unmangled_rdd)
now: none

LinearRegressionWithSGD:
was: python (in _regression_train_wrapper/_get_unmangled_labeled_point_rdd)
now: jvm (trainRegressionModel)

LassoWithSGD:
was: python (in _regression_train_wrapper/_get_unmangled_labeled_point_rdd)
now: jvm (trainRegressionModel)

RidgeRegressionWithSGD:
was: python (in _regression_train_wrapper/_get_unmangled_labeled_point_rdd)
now: jvm (trainRegressionModel)

DecisionTree:
was: python (in _get_unmangled_labeled_point_rdd)
now: none

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

Can one of the admins verify this patch?

@davies
Copy link
Contributor

davies commented Sep 11, 2014

Can your do some benchmark to show the difference?

I'm in doubt that caching the serialized data will better than caching the original objects, the former can release the GC pressure a lot. So we do in this way in Spark SQL, the columns are serialized (maybe compressed) for caching.

Also, there are some cases that the cache is "none" after this patch, what does it mean?

@staple
Copy link
Contributor Author

staple commented Sep 11, 2014

Hi, I implemented this per discussion here #2347 (comment), assuming I understood the comment correctly. The context is that we are supposed to log a warning when running an iterative learning algorithm on an uncached rdd. What originally led me to identify SPARK-3488 is that if the deserialized python rdds are always uncached, a warning will always be logged.

Obviously a meaningful performance difference would trump the implementation of this warning message, and I haven't measured performance - just discussed options in the above referenced pull request. But by way of comparison, is there any significant difference in memory pressure between caching a LabeledPoint rdd deserialized from python and caching a LabeledPoint rdd created natively in scala (which is the typical use case with a scala rather than python client)?

If I should do some performance testing, are there any examples of tests and infrastructure you'd suggest as a starting point?

'none' means the rdd is not cached within the python -> scala mllib interface, where previously it was cached. The learning algorithms for which rdds are no longer cached implement their own caching internally (or are not iterative).

@davies
Copy link
Contributor

davies commented Sep 11, 2014

I think you could pick any algorithm that you think will have most difference.

For repeated warning, maybe it's not hard to make it show only once.

@staple
Copy link
Contributor Author

staple commented Sep 14, 2014

I ran a simple logistic regression performance test on my local machine (ubuntu desktop w/ 8gb ram, ssd disk). I used two data sizes: 2m records, which was not memory constrained, and 10m records which was memory constrained (generating log messages such as CacheManager: Not enough space to cache partition). I tested without this patch, with this patch, and with a modified version of this patch using MEMORY_ONLY_SER to persist the deserialized objects. Here are the results (each reported runtime is the mean of 3 runs):

2m records:
master: 47.9099563758
w/ patch: 32.1143682798
w/ MEMORY_ONLY_SER: 79.4589416981

10m records:
master: 2130.3178509871
w/ patch: 3232.856136322
w/ MEMORY_ONLY_SER: 2772.3923886617

It looks like, running in memory, this patch provides a 33% speed improvement, while the MEMORY_ONLY_SER version is 66% slower than master. In the test case with insufficient memory to keep all the cache()-ed training rdd partitions cached at once, this patch is 52% slower while MEMORY_ONLY_SER is 30% slower.

I’m not that familiar with the typical mllib memory profile. Do you think the in-memory result here would be similar to a real world run?

Finally, here is the test script. Let me know if it seems reasonable. The data generation was roughly inspired by your mllib perf test in spark-perf.

Data generation:

import random

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint

class NormalGenerator:
    def __init__(self):
        self.mu = random.random()
        self.sigma = random.random()

    def __call__(self, rnd):
        return rnd.normalvariate(self.mu,self.sigma)

class PointGenerator:
    def __init__(self):
        self.generators = [[NormalGenerator() for _ in range(5)] for _ in range(2)]

    def __call__(self, rnd):
        label = rnd.choice([0, 1])
        return LabeledPoint(float(label),[g(rnd) for g in self.generators[label]])

pointGenerator = PointGenerator()
sc = SparkContext()

def generatePoints(n):
    def generateData(index):
        rnd = random.Random(hash(str(index)))
        for _ in range(n / 10):
            yield pointGenerator(rnd)

    points = sc.parallelize(range(10), 10).flatMap(generateData)
    print points.count()
    points.saveAsPickleFile('logistic%.0e' % n)

generatePoints(int(2e6))
generatePoints(int(1e7))

Test:

import time
import sys

from pyspark import SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD

sc = SparkContext()
points = sc.pickleFile(sys.argv[1])
start = time.time()
model = LogisticRegressionWithSGD.train(points, 100)
print 'Runtime: ' + `(time.time() - start)`
print model.weights

@davies
Copy link
Contributor

davies commented Sep 15, 2014

The benchmark result sounds reasonable, thanks for confirm it. Cache the RDD after serialization will reduce the memory usage and GC pressure, but have some CPU overhead. Also caching the serialized data from Python is better than serialize them again in JVM (master is always better than w/ MEMORY_ONLY_SER)

I think the memory usage (sometimes out of memory, means stability) is more important than CPU, right now, so I would like to hold off this change, maybe revisit it in the future.

@staple
Copy link
Contributor Author

staple commented Sep 15, 2014

@davies understood, thanks for your feedback. It sounds like for now the preference is to continue caching the python serialized version because the reduced memory footprint is currently worth the cpu cost of repeated deserialization.

Would it make sense to preserve the portions of this patch that drop caching for the NaiveBayes, ALS, and DecisionTree learners, which I do not believe require external caching to prevent repeated RDD re-evaluation during learning? NavieBayes only evaluates its input RDD once, while ALS and DecisionTree internally persist transformations of their input RDDs.

@davies
Copy link
Contributor

davies commented Sep 15, 2014

Would it make sense to preserve the portions of this patch that drop caching for the NaiveBayes, ALS, and DecisionTree learners, which I do not believe require external caching to prevent repeated RDD re-evaluation during learning? NavieBayes only evaluates its input RDD once, while ALS and DecisionTree internally persist transformations of their input RDDs.

These are helpful, you could do it in another PR.

@mengxr
Copy link
Contributor

mengxr commented Sep 16, 2014

@staple How many iterations did you run? Did you generate data or load from disk/hdfs? Did you cache the Python RDD? When the dataset is not fully cached, I still expect similar performance. But your result shows a big gap. Maybe it is rotating cached blocks.

10m records:
master: 2130.3178509871
w/ patch: 3232.856136322

@staple
Copy link
Contributor Author

staple commented Sep 16, 2014

@mengxr I ran for 100 iterations. Loaded data from disk using python's SparkContext.pickleFile() (disk is ssd). I did not do any manual caching. For more details, you can also see the test script I included in my description above.

I also saved the logs from my test runs if those are helpful to see. During the 10m record run I saw many log messages about 'CacheManager: Not enough space to cache partition' which I interpreted as indicating lack of caching due to memory exhaustion. But I haven't diagnosed the slowdown beyond that.

@staple
Copy link
Contributor Author

staple commented Sep 16, 2014

@mengxr If I understand correctly, I think you are saying that you don't expect performance degradation with this patch when rdds can't all be cached. I'll look at the cache manager log messages and see if there is any evidence of partitions cycling through there.

@staple
Copy link
Contributor Author

staple commented Sep 16, 2014

For the PR code, it looks like on each training iteration there are messages about not being able to cache partitions rdd_4_5 - rdd_4_27

For the master code, it looks like on each training iteration there are messages about not being able to cache partitions rdd_3_13, rdd_3_15 - rdd_3_27

It looks to me like a greater proportion of the data can be cached in master, I would guess the remainder needing to be pulled from disk. The set of cached partitions seems consistent across all training iterations for a given performance test run. But caveat, this is my first exposure to the caching algorithm in spark.

@staple
Copy link
Contributor Author

staple commented Sep 16, 2014

@davies I created a separate PR for disabling automatic caching for some learners: #2412

@staple staple closed this Sep 25, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants