Skip to content
lisahua edited this page Jul 31, 2014 · 11 revisions

Table of Content

Example of Using Shifu-plugin-Spark

Spark Logistic Regression Model

Initialize Spark model and PMML model with stats and transformations.
         protected void initModel() {
             pmml = PMMLUtils.loadPMML(initPmmlPath);
             // training
             JavaRDD<String> lines = new JavaSparkContext("local","SparkLRAdapter").textFile(inputData);
             ParsePoint parseFunc = new ParsePoint(targetFieldID,activeFieldIDs, ",");
             RDD<LabeledPoint> data = lines.map(parseFunc).rdd();
             mlModel = LogisticRegressionWithSGD.train(data, iterations,stepSize).clearThreshold();
         }

Here is the implementation of ParsePoint - which is the map function for the RDD data

public static class ParsePoint implements Function<String, LabeledPoint> {
	private Pattern COMMA;
	private int target;
	private int[] activeField;
	public ParsePoint(int targetID, int[] activeField, String splitter) {
		target = targetID;
		this.activeField = activeField;
		COMMA = Pattern.compile("\\" + splitter);
	}
	@Override
	public LabeledPoint call(String line) {
		String[] parts = COMMA.split(line);
		double y = Double.parseDouble(parts[target]);
		int len = activeField.length;
		double[] x = new double[len];
		for (int i = 0; i < len; i++) {
			x[i] = Double.parseDouble(parts[activeField[i]]);
		}
		return new LabeledPoint(y, Vectors.dense(x));
	}
}
Adapt Spark model to PMML model
         protected void adaptToPMML() {
           Model pmmlLR = pmml.getModels().get(0);
           pmmlLR = new PMMLSparkLogisticRegressionModel().adaptMLModelToPMML(mlModel, (RegressionModel)pmmlLR);
           pmml.getModels().set(0, pmmlLR);
         }
Validate the score calculated by PMML evaluator with the score calculated by Spark
           private void evaluate(SparkTestDataGenerator evalInput) {
             for (Map<FieldName, String> map : evalInput.getEvaluatorInput()) {
               Vector vector = new DenseVector(evalInput.normalizeData(context));
               Assert.assertEquals(getPMMLEvaluatorResult(map), mlModel.predict(vector), DELTA);
           }
         }
More Evaluation Metrics
  • Prepare DataSet and calculate score

          JavaRDD<Vector> evalVectors = lines.map(new ParseVector().cache();
          List<Double> evalList = lrModel.predict(evalVectors).collect();
    
  • Calculate Evaluation Metrics

      JavaRDD<LabeledPoint> data = inputLines.map(new ParsePoint(targetID, activeFields));
      RDD<Tuple2<Object, Object>> matrixInput = datas.map(new EvalMetricsCalculator(lrModel)).rdd();
      BinaryClassificationMetrics bcMetrics = new BinaryClassificationMetrics(matrixInput);
      RDD<Tuple2<Object,Object>> prCurve =bcMetrics.pr();
      bcMetrics.areaUnderPR();
      bcMetrics.areaUnderROC();
    

    precision, recall, F-measure, precision-recall curve: pr(), precisionByThreshold(),recallByThreshold()..

    area under the curves (AUC) - areaUnderPR()

    receiver operating characteristic (ROC) - areaUnderROC(), roc()

More about Spark Logistic Regression Model

Clear threshold (mlModel.clearThreshold())

The threshold is 0.5 by default, to clear the threshold can change the function type from classification to regression, that is, change the output from 0 or 1 to a score. Refer to the source code in LogisticRegressionModel class

 private var threshold: Option[Double] = Some(0.5)

  override protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
      intercept: Double) = {
    val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
    val score = 1.0/ (1.0 + math.exp(-margin))
    threshold match {
      case Some(t) => if (score < t) 0.0 else 1.0
      case None => score
    }
  }
}
The method lrModel.weights()returns the intercept followed by the weight list.

Refer to the code snippet

 val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0
    val weights =
      if (addIntercept) {
        Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size))
      } else {
        weightsWithIntercept
 }

PMML Conversion

Convert Spark Logistic Regression Model to PMML

        public class SparkLogisticRegressionToPMML implements ModelToPMML<org.dmg.pmml.RegressionModel, org.apache.spark.mllib.classification.LogisticRegressionModel> {
       
         public org.dmg.pmml.RegressionModel adaptMLModelToPMML(
             org.apache.spark.mllib.classification.LogisticRegressionModel lrModel, org.dmg.pmml.RegressionModel pmmlModel) {
           double[] weights = lrModel.weights().toArray();
           double intercept = weights[0];// lrModel.intercept();
           return PMMLAdapterCommonUtil.getRegressionTable(weights, intercept, pmmlModel);
         }
       }

More about Spark

Spark Overview Presentation Slides

  • a fast and general-purpose cluster computing system
  • supports a set of higher-level tools including MLlib for machine learning
  • resilient distributed dataset (RDD) is a collection of elements partitioned across cluster nodes ** can be operated on in parallel ** provides fault tolerance
  • MLlib: LogisticRegression, linear SVM, Decision Tree

Dependency in Shifu-plugin-spark

  1. Spark.core_2.10 version 1.0.0
  2. Spark.mllib_2.10 version 1.0.0
  3. exclude akka 2.1.1 which is inherited from shift-core. Notice that, Spark depends on akka 2.2.3 while Shifu-core depends on akka 2.1.1. I tried to change Shifu-core akka version to 2.2.3 and there were built error here. The root cause is still unclear to me, and I suppose guagua is not compatible with akka 2.1.1. Check akka migration guide from 2.1 to 2.2.

Get started

  1. Initialize Spark

     SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
     JavaSparkContext sc = new JavaSparkContxt(conf);`
    
  2. Prepare Dataset

From Parallelized Collections

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);

From External DataSets

    JavaRDD<String> distFile = sc.textFile("data.txt");   

OR

     JavaRDD<String> distFile = sc.textFile("hdfs://data.txt");
  1. Passing Functions to Spark

     class ParseLabeledPoint implements Function<String, LabeledPoint> {
          public LabeledPoint call(String s) {...
                	for (int i = 0; i < len; i++) {
                          x[i] = Double.parseDouble(tokens[i]);
               }
           return new LabeledPoint(y, Vectors.dense(x));
            }
       }
      RDD<LabeledPoint> data = distData.map(new ParseLabeledPoint().cache().rdd();
    
  2. Train LogisticRegressionModel

        /*
        * @param input RDD of (label, array of features) pairs.
        * @param numIterations Number of iterations of gradient descent to run.
        * @param stepSize Step size to be used for each iteration of gradient descent.
        * @param miniBatchFraction Fraction of data to be used per iteration.
        */
        LogisticRegressionModel lrModel = LogisticRegressionWithSGD.train(data, iterations,stepSize,miniBatchFraction);
    
  • Notes: only the last 10 training errors are printed out in log when Spark LogisticRegressionModelDSGD trains the model.
Clone this wiki locally