Skip to content

Conversation

@niketanpansare
Copy link
Contributor

@niketanpansare niketanpansare commented Aug 5, 2016

  1. Fixed bugs in scala LogisticRegression wrapper (handling of raw predictions and passing dfam).
  2. Extended java MLContext to accept MatrixBlock. Also added utility function in Python file. (Also created https://issues.apache.org/jira/browse/SYSTEMML-846. It should be a good migration task to learn Python MLContext).
  3. Added mllearn class to allow scikit-learn and MLPipeline users to use SystemML.

Using SystemML's Logistic Regression (scikit-learn way):

from sklearn import datasets, neighbors
import SystemML as sml
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
digits = datasets.load_digits()
X_digits = digits.data
y_digits = digits.target + 1
n_samples = len(X_digits)
X_train = X_digits[:.9 * n_samples]
y_train = y_digits[:.9 * n_samples]
X_test = X_digits[.9 * n_samples:]
y_test = y_digits[.9 * n_samples:]
import time
t1 = time.time()
logistic = sml.mllearn.LogisticRegression(sqlCtx)
print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test))
t2 = time.time()
# Convert to DataFrame for i/o: current way to transfer data
logistic = sml.mllearn.LogisticRegression(sqlCtx, transferUsingDF=True)
print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test))
t3 = time.time()
print('Execution time: without DF: %f and with DF: %f' % (t2-t1, t3-t2))

Points to note:

  • The execution time without DF is 1-2 seconds whereas with DF is 18-20 seconds.
  • The current version only supports numeric labels/features.
  • The above interface is especially useful if the input/output fit on the node, but the intermediate data doesn't.

Using SystemML's Logistic Regression (MLPipeline way):

from pyspark.ml import Pipeline
import SystemML as sml
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
training = sqlCtx.createDataFrame([
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 2.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 2.0),
    (4L, "b spark who", 1.0),
    (5L, "g d a y", 2.0),
    (6L, "spark fly", 1.0),
    (7L, "was mapreduce", 2.0),
    (8L, "e spark program", 1.0),
    (9L, "a e c l", 2.0),
    (10L, "spark compile", 1.0),
    (11L, "hadoop software", 2.0)
], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20)
lr = sml.mllearn.LogisticRegression(sqlCtx)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
test = sqlCtx.createDataFrame([
    (12L, "spark i j k"),
    (13L, "l m n"),
    (14L, "mapreduce spark"),
    (15L, "apache hadoop")], ["id", "text"])
prediction = model.transform(test)
prediction.show()

Using SystemML's Linear Regression (scikit-learn way):

import numpy as np
from sklearn import datasets
import SystemML as sml
from pyspark.sql import SQLContext
# Load the diabetes dataset
diabetes = datasets.load_diabetes()
# Use only one feature
diabetes_X = diabetes.data[:, np.newaxis, 2]
# Split the data into training/testing sets
diabetes_X_train = diabetes_X[:-20]
diabetes_X_test = diabetes_X[-20:]
# Split the targets into training/testing sets
diabetes_y_train = diabetes.target[:-20]
diabetes_y_test = diabetes.target[-20:]
# Create linear regression object
regr = sml.mllearn.LinearRegression(sqlCtx)
# Train the model using the training sets
regr.fit(diabetes_X_train, diabetes_y_train)
# The mean square error
print("Residual sum of squares: %.2f" % np.mean((regr.predict(diabetes_X_test) - diabetes_y_test) ** 2))

Using SystemML's SVM (scikit-learn way):

from sklearn import datasets, neighbors
import SystemML as sml
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
digits = datasets.load_digits()
X_digits = digits.data
y_digits = digits.target 
n_samples = len(X_digits)
X_train = X_digits[:.9 * n_samples]
y_train = y_digits[:.9 * n_samples]
X_test = X_digits[.9 * n_samples:]
y_test = y_digits[.9 * n_samples:]
svm = sml.mllearn.SVM(sqlCtx, is_multi_class=True)
print('LogisticRegression score: %f' % svm.fit(X_train, y_train).score(X_test, y_test))

Using SystemML's SVM (MLPipeline way):

from pyspark.ml import Pipeline
import SystemML as sml
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
training = sqlCtx.createDataFrame([
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 2.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 2.0),
    (4L, "b spark who", 1.0),
    (5L, "g d a y", 2.0),
    (6L, "spark fly", 1.0),
    (7L, "was mapreduce", 2.0),
    (8L, "e spark program", 1.0),
    (9L, "a e c l", 2.0),
    (10L, "spark compile", 1.0),
    (11L, "hadoop software", 2.0)
], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20)
svm = sml.mllearn.SVM(sqlCtx, is_multi_class=True)
pipeline = Pipeline(stages=[tokenizer, hashingTF, svm])
model = pipeline.fit(training)
test = sqlCtx.createDataFrame([
    (12L, "spark i j k"),
    (13L, "l m n"),
    (14L, "mapreduce spark"),
    (15L, "apache hadoop")], ["id", "text"])
prediction = model.transform(test)
prediction.show()

Using SystemML's Naive Bayes (Scikit learn way):

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
import SystemML as sml
from sklearn import metrics
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 'sci.space']
newsgroups_train = fetch_20newsgroups(subset='train', categories=categories)
newsgroups_test = fetch_20newsgroups(subset='test', categories=categories)
vectorizer = TfidfVectorizer()
# Both vectors and vectors_test are SciPy CSR matrix
vectors = vectorizer.fit_transform(newsgroups_train.data)
vectors_test = vectorizer.transform(newsgroups_test.data)
nb = sml.mllearn.NaiveBayes(sqlCtx)
nb.fit(vectors, newsgroups_train.target)
pred = nb.predict(vectors_test)
metrics.f1_score(newsgroups_test.target, pred, average='weighted')

@niketanpansare
Copy link
Contributor Author

@MechCoder
Copy link
Contributor

Hey, the script looks great! But I may be biased as a sklearn user and developer.

What are you planning to do with the previous PR? In other words, what are the use-cases of exposing SystemML datastructures to the not-so-advanced user?

@niketanpansare
Copy link
Contributor Author

@MechCoder Please see #197 for answer to your question. Also, since you are biased sklearn user/developer, you will be the best person to critique the API of our library :) ... The high-level pitch we can make as SystemML community is that we support subset of sklearn algorithm and if your application is using these sklearn algorithms, you can replace sk._ call with sml._ call and everything should work as expected.

# traceback.print_exc()

def getNumCols(numPyArr):
if len(numPyArr.shape) == 1:
Copy link
Contributor

@MechCoder MechCoder Aug 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.ndim is the preferred way to do this.

(Also in sklearn, we deprecated the use of 1-D arrays (since it is ambiguous if it is a single sample with n_features or n_samples with a single feature. All data should be provided as a 2-D array)

@MechCoder
Copy link
Contributor

Would you be able to add some minor tests? Thanks!

@mboehm7
Copy link
Contributor

mboehm7 commented Aug 5, 2016

could we please batch these comments?

numArgs = len(args) + 1
if numArgs == 1:
return self._fit(X)
elif numArgs == 2 and (isinstance(X, np.ndarray) or isinstance(X, pd.core.frame.DataFrame)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just change the signature to fit(X, y=None) and remove args?

@MechCoder
Copy link
Contributor

@mboehm7 Sorry, but what are batched comments?

@dusenberrymw
Copy link
Contributor

@mboehm7 Inline comments at specific lines of code in the PR are super useful for reviewing and discussing the code without any confusion. Unless you're "Watching" the entire repo, "Unsubscribing" from this particular PR should limit the inbox noise. :)

pdfX = X
else:
raise Exception('The input type not supported')
return pdfX
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would refactor this entire method this way in any case

# Let Pandas handle the conversion error internally and allow other array-like formats
if not instance(X, pd.DataFrame):
    return pd.DataFrame(X, columns=['C' + str(i) for i in range(numCols)])
return X

@mboehm7
Copy link
Contributor

mboehm7 commented Aug 5, 2016

thanks @dusenberrymw.

@niketanpansare
Copy link
Contributor Author

niketanpansare commented Aug 6, 2016

Would you be able to add some minor tests? Thanks!

Do you have recommendations of how we can add Python tests along with JUnit ?

Why update again?

I added that to test MLPipeline's CrossValidator. But, for some reason, couldn't get it working. Not sure if the CrossValidator passes the parameters through object or through fit's params.

Why not just change the signature to fit(X, y=None) and remove args?

Done.

Let Pandas handle the conversion error internally and allow other array-like formats

Good point.

self.updateLog()
if y is None:
return self._fit(X)
elif y is not None and (isinstance(X, np.ndarray) or isinstance(X, pd.core.frame.DataFrame)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the check for X is done internally in convertToPandasDF

@MechCoder
Copy link
Contributor

@niketanpansare Thanks for addressing my comments! I made a first pass. Hope to get back to it on Monday.

@niketanpansare
Copy link
Contributor Author

Thanks @MechCoder for your help and suggestions 👍

@MechCoder
Copy link
Contributor

@niketanpansare Is it possible to just keep one ML model (LogisticRegression) in this PR and keep the rest for another PR after this has been merged. So that the reviewing can be focused on just the design?

@niketanpansare
Copy link
Contributor Author

@MechCoder I would prefer to add other ML models as well in this PR for three reasons:

  1. All the other models (including LogisticRegression) inherit their implementation from BaseSystemMLEstimator. So, the focus should be on design of BaseSystemMLEstimator. If we are comfortable with that, it doesn't matter whether the implemented ML model is LogisticRegression or NaiveBayes or any other.
  2. Due to overdesigning the initial implementation, very little progress has been made. As a side note, java LogisticRegression API has been there for almost a year and no model has been added since then (java or python).
  3. The hope of this API is to increase adoption at least in algorithm front. As a first step towards that, we need to update the Algorithm documentation with interesting examples that motivates people to try SystemML. Adding just one algorithm (without strong reason) seems to defeat the purpose of that.

Please note: the API is WIP, so you are welcome to modify the added ML models or add new ML models once this PR is in :)

elif isinstance(inputCols, list):
return inputCols
else:
raise Exception('inputCols should be of type pandas.indexes.base.Index or list')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole method is just list(inputCols)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

@MechCoder
Copy link
Contributor

MechCoder commented Aug 9, 2016

Before I proceed any further:

  1. Can you run PEP8 on all the python files?
  2. Can you add documentation to all of the methods? Please explicitly document the expected return types of all the methods?
  3. With regards to making it pip-installable or even having a setup.py, I would postpone that to another PR. There is still some discussion (or non-discussion rather) in making pyspark pip installable. ([SPARK-1267][PYSPARK] Adds pip installer for pyspark spark#8318). Right now how would just focus on organizing the project structure. We can just have it similar to pyspark's project structure.

@niketanpansare
Copy link
Contributor Author

Added documentation as well as created BaseSystemMLClassifier and BaseSystemMLRegressor classes in Python.

Let's address remaining comments in next PR. For now, I believe this API is in reasonably stable state. Also Spark 2.0 support is dependent on this.

@niketanpansare
Copy link
Contributor Author

f02f7c0 closes this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants