From fdce2404688fee1b22154258de5d85f0cee8aa4b Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Wed, 1 Mar 2017 15:47:53 -0800 Subject: [PATCH 01/10] fpm doc --- docs/_data/menu-ml.yaml | 2 + docs/ml-frequent-pattern-mining.md | 68 +++++++++++++++++ .../examples/ml/JavaFPGrowthExample.java | 73 +++++++++++++++++++ .../spark/examples/ml/FPGrowthExample.scala | 71 ++++++++++++++++++ .../org/apache/spark/ml/fpm/FPGrowth.scala | 11 +-- 5 files changed, 220 insertions(+), 5 deletions(-) create mode 100644 docs/ml-frequent-pattern-mining.md create mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java create mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala diff --git a/docs/_data/menu-ml.yaml b/docs/_data/menu-ml.yaml index 0c6b9b20a6e4..047423f75aec 100644 --- a/docs/_data/menu-ml.yaml +++ b/docs/_data/menu-ml.yaml @@ -8,6 +8,8 @@ url: ml-clustering.html - text: Collaborative filtering url: ml-collaborative-filtering.html +- text: Frequent Pattern Mining + url: ml-frequent-pattern-mining.html - text: Model selection and tuning url: ml-tuning.html - text: Advanced topics diff --git a/docs/ml-frequent-pattern-mining.md b/docs/ml-frequent-pattern-mining.md new file mode 100644 index 000000000000..dba12f18d700 --- /dev/null +++ b/docs/ml-frequent-pattern-mining.md @@ -0,0 +1,68 @@ +--- +layout: global +title: Frequent Pattern Mining +displayTitle: Frequent Pattern Mining +--- + +Mining frequent items, itemsets, subsequences, or other substructures is usually among the +first steps to analyze a large-scale dataset, which has been an active research topic in +data mining for years. +We refer users to Wikipedia's [association rule learning](http://en.wikipedia.org/wiki/Association_rule_learning) +for more information. + +**Table of Contents** + +* This will become a table of contents (this text will be scraped). +{:toc} + +## FP-Growth + +The FP-growth algorithm is described in the paper +[Han et al., Mining frequent patterns without candidate generation](http://dx.doi.org/10.1145/335191.335372), +where "FP" stands for frequent pattern. +Given a dataset of transactions, the first step of FP-growth is to calculate item frequencies and identify frequent items. +Different from [Apriori-like](http://en.wikipedia.org/wiki/Apriori_algorithm) algorithms designed for the same purpose, +the second step of FP-growth uses a suffix tree (FP-tree) structure to encode transactions without generating candidate sets +explicitly, which are usually expensive to generate. +After the second step, the frequent itemsets can be extracted from the FP-tree. +In `spark.mllib`, we implemented a parallel version of FP-growth called PFP, +as described in [Li et al., PFP: Parallel FP-growth for query recommendation](http://dx.doi.org/10.1145/1454008.1454027). +PFP distributes the work of growing FP-trees based on the suffices of transactions, +and hence more scalable than a single-machine implementation. +We refer users to the papers for more details. + +`spark.ml`'s FP-growth implementation takes the following (hyper-)parameters: + +* `minSupport`: the minimum support for an itemset to be identified as frequent. + For example, if an item appears 3 out of 5 transactions, it has a support of 3/5=0.6. +* `minConfidence`: minimum confidence for generating Association Rule. The parameter has no effect during `fit`, but specify + the minimum confidence for generating association rules from frequent itemsets. +* `numPartitions`: the number of partitions used to distribute the work. + +The `FPGrowthModel` provides: + +* `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Seq], "freq"[Long]) +* `associationRules`: association rules generated with confidence above `minConfidence`, in the format of + DataFrame("antecedent"[Seq], "consequent"[Seq], "confidence"[Double]). +* `transform`: The transform method examines the input items against all the association rules and + summarize the consequents as prediction. The prediction column has the same data type as the + input column and does not contain existing items in the input column. + + +**Examples** + +
+ +
+Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.fpm.FPGrowth) for more details. + +{% include_example scala/org/apache/spark/examples/ml/FPGrowthExample.scala %} +
+ +
+Refer to the [Java API docs](api/java/org/apache/spark/ml/fpm/FPGrowth.html) for more details. + +{% include_example java/org/apache/spark/examples/ml/JavaFPGrowthExample.java %} +
+ +
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java new file mode 100644 index 000000000000..7c9ef4bd857f --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +// $example on$ +import java.util.Arrays; +import java.util.List; + +import org.apache.spark.ml.fpm.FPGrowth; +import org.apache.spark.ml.fpm.FPGrowthModel; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.*; +// $example off$ + +public class JavaFPGrowthExample { + public static void main(String[] args) { + SparkSession spark = SparkSession + .builder() + .appName("JavaFPGrowthExample") + .getOrCreate(); + + // $example on$ + List data = Arrays.asList( + RowFactory.create(Arrays.asList("1 2 5".split(" "))), + RowFactory.create(Arrays.asList("1 2 3 5".split(" "))), + RowFactory.create(Arrays.asList("1 2".split(" "))) + ); + StructType schema = new StructType(new StructField[]{ new StructField( + "features", new ArrayType(DataTypes.StringType, true), false, Metadata.empty()) + }); + Dataset itemsDF = spark.createDataFrame(data, schema); + + // Learn a mapping from words to Vectors. + FPGrowth fpgrowth = new FPGrowth() + .setMinSupport(0.5) + .setMinConfidence(0.6); + + FPGrowthModel model = fpgrowth.fit(itemsDF); + + // get frequent itemsets. + model.freqItemsets().show(); + + // get generated association rules. + model.associationRules().show(); + + // transform examines the input items against all the association rules and summarize the + // consequents as prediction + Dataset result = model.transform(itemsDF); + + result.show(); + // $example off$ + + spark.stop(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala new file mode 100644 index 000000000000..94f0b3dca49c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml + +// scalastyle:off println + +// $example on$ +import org.apache.spark.ml.fpm.FPGrowth +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * An example demonstrating FP-Growth. + * Run with + * {{{ + * bin/run-example ml.FPGrowthExample + * }}} + */ +object FPGrowthExample { + + def main(args: Array[String]): Unit = { + + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + import spark.implicits._ + + // $example on$ + // Loads data. + val dataset = spark.createDataset(Seq( + "1 2 5", + "1 2 3 5", + "1 2") + ).map(t => t.split(" ")).toDF("features") + + // Trains a FPGrowth model. + val fpgrowth = new FPGrowth().setMinSupport(0.5).setMinConfidence(0.6) + val model = fpgrowth.fit(dataset) + + // get frequent itemsets. + model.freqItemsets.show() + + // get generated association rules. + model.associationRules.show() + + // transform examines the input items against all the association rules and summarize the + // consequents as prediction + model.transform(dataset).show() + + // $example off$ + + spark.stop() + } +} +// scalastyle:on println diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 417968d9b817..62fbc2564291 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -56,8 +56,8 @@ private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPre def getMinSupport: Double = $(minSupport) /** - * Number of partitions (>=1) used by parallel FP-growth. By default the param is not set, and - * partition number of the input dataset is used. + * Number of partitions (positive) used by parallel FP-growth. By default the param is not set, + * and partition number of the input dataset is used. * @group expertParam */ @Since("2.2.0") @@ -240,12 +240,13 @@ class FPGrowthModel private[ml] ( val predictUDF = udf((items: Seq[_]) => { if (items != null) { val itemset = items.toSet - brRules.value.flatMap(rule => - if (items != null && rule._1.forall(item => itemset.contains(item))) { + brRules.value.flatMap { rule => + if (rule._1.forall(item => itemset.contains(item))) { rule._2.filter(item => !itemset.contains(item)) } else { Seq.empty - }) + } + } } else { Seq.empty }.distinct }, dt) From ca12877c7f7e224268e145c3e8c4c37413596d66 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 2 Mar 2017 14:02:29 -0800 Subject: [PATCH 02/10] change transform to filter --- .../org/apache/spark/examples/ml/FPGrowthExample.scala | 7 ++----- .../main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 9 ++------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala index 94f0b3dca49c..7acc8e5c75b5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.SparkSession object FPGrowthExample { def main(args: Array[String]): Unit = { - val spark = SparkSession .builder .appName(s"${this.getClass.getSimpleName}") @@ -42,7 +41,6 @@ object FPGrowthExample { import spark.implicits._ // $example on$ - // Loads data. val dataset = spark.createDataset(Seq( "1 2 5", "1 2 3 5", @@ -53,16 +51,15 @@ object FPGrowthExample { val fpgrowth = new FPGrowth().setMinSupport(0.5).setMinConfidence(0.6) val model = fpgrowth.fit(dataset) - // get frequent itemsets. + // Display frequent itemsets. model.freqItemsets.show() - // get generated association rules. + // Display generated association rules. model.associationRules.show() // transform examines the input items against all the association rules and summarize the // consequents as prediction model.transform(dataset).show() - // $example off$ spark.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 62fbc2564291..eaa63c394c61 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -240,13 +240,8 @@ class FPGrowthModel private[ml] ( val predictUDF = udf((items: Seq[_]) => { if (items != null) { val itemset = items.toSet - brRules.value.flatMap { rule => - if (rule._1.forall(item => itemset.contains(item))) { - rule._2.filter(item => !itemset.contains(item)) - } else { - Seq.empty - } - } + brRules.value.filter(_._1.forall(itemset.contains)) + .flatMap(_._2.filter(!itemset.contains(_))) } else { Seq.empty }.distinct }, dt) From 9ce00930ea18c7bb8fe0cc59b98f6ece34d20311 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 9 Mar 2017 23:47:49 -0800 Subject: [PATCH 03/10] merge and move --- mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index eaa63c394c61..8cf0cd8fd250 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -241,10 +241,10 @@ class FPGrowthModel private[ml] ( if (items != null) { val itemset = items.toSet brRules.value.filter(_._1.forall(itemset.contains)) - .flatMap(_._2.filter(!itemset.contains(_))) + .flatMap(_._2.filter(!itemset.contains(_))).distinct } else { Seq.empty - }.distinct }, dt) + }}, dt) dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } From de1bfc8eb48015ea629dea5bdc72ba913b76d234 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 14 Mar 2017 18:43:49 -0700 Subject: [PATCH 04/10] comments and doc refine --- docs/ml-frequent-pattern-mining.md | 11 +++++----- .../examples/ml/JavaFPGrowthExample.java | 16 +++++--------- .../spark/examples/ml/FPGrowthExample.scala | 1 - .../org/apache/spark/ml/fpm/FPGrowth.scala | 22 +++++++++---------- .../apache/spark/ml/fpm/FPGrowthSuite.scala | 4 +++- 5 files changed, 25 insertions(+), 29 deletions(-) diff --git a/docs/ml-frequent-pattern-mining.md b/docs/ml-frequent-pattern-mining.md index dba12f18d700..726df8dc3962 100644 --- a/docs/ml-frequent-pattern-mining.md +++ b/docs/ml-frequent-pattern-mining.md @@ -35,15 +35,16 @@ We refer users to the papers for more details. * `minSupport`: the minimum support for an itemset to be identified as frequent. For example, if an item appears 3 out of 5 transactions, it has a support of 3/5=0.6. -* `minConfidence`: minimum confidence for generating Association Rule. The parameter has no effect during `fit`, but specify - the minimum confidence for generating association rules from frequent itemsets. -* `numPartitions`: the number of partitions used to distribute the work. +* `minConfidence`: minimum confidence for generating Association Rule. The parameter will not affect the mining + for frequent itemsets,, but specify the minimum confidence for generating association rules from frequent itemsets. +* `numPartitions`: the number of partitions used to distribute the work. By default the param is not set, and + partition number of the input dataset is used. The `FPGrowthModel` provides: -* `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Seq], "freq"[Long]) +* `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long]) * `associationRules`: association rules generated with confidence above `minConfidence`, in the format of - DataFrame("antecedent"[Seq], "consequent"[Seq], "confidence"[Double]). + DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]). * `transform`: The transform method examines the input items against all the association rules and summarize the consequents as prediction. The prediction column has the same data type as the input column and does not contain existing items in the input column. diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java index 7c9ef4bd857f..4351bfbf12fe 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java @@ -48,24 +48,20 @@ public static void main(String[] args) { }); Dataset itemsDF = spark.createDataFrame(data, schema); - // Learn a mapping from words to Vectors. - FPGrowth fpgrowth = new FPGrowth() + FPGrowthModel model = new FPGrowth() .setMinSupport(0.5) - .setMinConfidence(0.6); + .setMinConfidence(0.6) + .fit(itemsDF); - FPGrowthModel model = fpgrowth.fit(itemsDF); - - // get frequent itemsets. + // Display frequent itemsets. model.freqItemsets().show(); - // get generated association rules. + // Display generated association rules. model.associationRules().show(); // transform examines the input items against all the association rules and summarize the // consequents as prediction - Dataset result = model.transform(itemsDF); - - result.show(); + model.transform(itemsDF).show(); // $example off$ spark.stop(); diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala index 7acc8e5c75b5..b0e9c6253bfd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala @@ -47,7 +47,6 @@ object FPGrowthExample { "1 2") ).map(t => t.split(" ")).toDF("features") - // Trains a FPGrowth model. val fpgrowth = new FPGrowth().setMinSupport(0.5).setMinConfidence(0.6) val model = fpgrowth.fit(dataset) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index cbba59eec932..bb2b3dc9f846 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -17,7 +17,6 @@ package org.apache.spark.ml.fpm -import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.hadoop.fs.Path @@ -41,7 +40,7 @@ private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPre /** * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears - * more than (minSupport * size-of-the-dataset) times will be output + * more than (minSupport * size-of-the-dataset) times will be output in the frequent itemsets. * Default: 0.3 * @group param */ @@ -69,8 +68,8 @@ private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPre def getNumPartitions: Int = $(numPartitions) /** - * Minimal confidence for generating Association Rule. - * Note that minConfidence has no effect during fitting. + * Minimal confidence for generating Association Rule. MinConfidence will not affect the mining + * for frequent itemsets, but will affect the association rules generation. * Default: 0.8 * @group param */ @@ -154,7 +153,6 @@ class FPGrowth @Since("2.2.0") ( } val parentModel = mllibFP.run(items) val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq)) - val schema = StructType(Seq( StructField("items", dataset.schema($(featuresCol)).dataType, nullable = false), StructField("freq", LongType, nullable = false))) @@ -183,7 +181,7 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] { * :: Experimental :: * Model fitted by FPGrowth. * - * @param freqItemsets frequent items in the format of DataFrame("items"[Seq], "freq"[Long]) + * @param freqItemsets frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long]) */ @Since("2.2.0") @Experimental @@ -303,13 +301,13 @@ private[fpm] object AssociationRules { /** * Computes the association rules with confidence above minConfidence. - * @param dataset DataFrame("items", "freq") containing frequent itemset obtained from - * algorithms like [[FPGrowth]]. + * @param dataset DataFrame("items"[Array], "freq"[Long]) containing frequent itemsets obtained + * from algorithms like [[FPGrowth]]. * @param itemsCol column name for frequent itemsets - * @param freqCol column name for frequent itemsets count - * @param minConfidence minimum confidence for the result association rules - * @return a DataFrame("antecedent", "consequent", "confidence") containing the association - * rules. + * @param freqCol column name for appearance count of the frequent itemsets + * @param minConfidence minimum confidence for generating the association rules + * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]) + * containing the association rules. */ def getAssociationRulesFromFP[T: ClassTag]( dataset: Dataset[_], diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index 910d4b07d130..0cae970d2b40 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.fpm import org.apache.spark.SparkFunSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -91,6 +91,8 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul .setMinConfidence(0.5678) assert(fpGrowth.getMinSupport === 0.4567) assert(model.getMinConfidence === 0.5678) + // numPartitions should not have default value. + assert(fpGrowth.isDefined(fpGrowth.numPartitions) === false) } test("read/write") { From 9fef280751378dbeaa843c673fd962192320a5b1 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 21 Mar 2017 11:32:23 -0700 Subject: [PATCH 05/10] adapt to itemsCol --- .../org/apache/spark/examples/ml/JavaFPGrowthExample.java | 3 ++- .../scala/org/apache/spark/examples/ml/FPGrowthExample.scala | 4 ++-- mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java index 4351bfbf12fe..f8d9123bc2a9 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java @@ -44,11 +44,12 @@ public static void main(String[] args) { RowFactory.create(Arrays.asList("1 2".split(" "))) ); StructType schema = new StructType(new StructField[]{ new StructField( - "features", new ArrayType(DataTypes.StringType, true), false, Metadata.empty()) + "items", new ArrayType(DataTypes.StringType, true), false, Metadata.empty()) }); Dataset itemsDF = spark.createDataFrame(data, schema); FPGrowthModel model = new FPGrowth() + .setItemsCol("items") .setMinSupport(0.5) .setMinConfidence(0.6) .fit(itemsDF); diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala index b0e9c6253bfd..59110d70de55 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala @@ -45,9 +45,9 @@ object FPGrowthExample { "1 2 5", "1 2 3 5", "1 2") - ).map(t => t.split(" ")).toDF("features") + ).map(t => t.split(" ")).toDF("items") - val fpgrowth = new FPGrowth().setMinSupport(0.5).setMinConfidence(0.6) + val fpgrowth = new FPGrowth().setItemsCol("items").setMinSupport(0.5).setMinConfidence(0.6) val model = fpgrowth.fit(dataset) // Display frequent itemsets. diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index f61297e5cf1f..cf6b4018a120 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -117,7 +117,7 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol { * Recommendation. PFP distributes computation in such a way that each worker executes an * independent group of mining tasks. The FP-Growth algorithm is described in * Han et al., Mining frequent patterns without - * candidate generation. Note null values in the feature column are ignored during fit(). + * candidate generation. Note null values in the itemsCol column are ignored during fit(). * * @see * Association rule learning (Wikipedia) @@ -230,7 +230,7 @@ class FPGrowthModel private[ml] ( * Then for each association rule, it will examine the input items against antecedents and * summarize the consequents as prediction. The prediction column has the same data type as the * input column(Array[T]) and will not contain existing items in the input column. The null - * values in the feature columns are treated as empty sets. + * values in the itemsCol columns are treated as empty sets. * WARNING: internally it collects association rules to the driver and uses broadcast for * efficiency. This may bring pressure to driver memory for large set of association rules. */ From c957ba5821d2bb6004c41a2e55c651d8c156de09 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 27 Mar 2017 11:55:34 -0700 Subject: [PATCH 06/10] add python example --- docs/ml-frequent-pattern-mining.md | 10 +++- .../examples/ml/JavaFPGrowthExample.java | 7 +++ .../src/main/python/ml/fpgrowth_example.py | 48 +++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 examples/src/main/python/ml/fpgrowth_example.py diff --git a/docs/ml-frequent-pattern-mining.md b/docs/ml-frequent-pattern-mining.md index 726df8dc3962..154d11098609 100644 --- a/docs/ml-frequent-pattern-mining.md +++ b/docs/ml-frequent-pattern-mining.md @@ -45,9 +45,9 @@ The `FPGrowthModel` provides: * `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long]) * `associationRules`: association rules generated with confidence above `minConfidence`, in the format of DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]). -* `transform`: The transform method examines the input items against all the association rules and +* `transform`: The transform method examines the input items in `itemsCol` against all the association rules and summarize the consequents as prediction. The prediction column has the same data type as the - input column and does not contain existing items in the input column. + `itemsCol` and does not contain existing items in the `itemsCol`. **Examples** @@ -66,4 +66,10 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/fpm/FPGrowth.html) for {% include_example java/org/apache/spark/examples/ml/JavaFPGrowthExample.java %} +
+Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth) for more details. + +{% include_example python/ml/fpgrowth_example.py %} +
+ diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java index f8d9123bc2a9..717ec21c8b20 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaFPGrowthExample.java @@ -30,6 +30,13 @@ import org.apache.spark.sql.types.*; // $example off$ +/** + * An example demonstrating FPGrowth. + * Run with + *
+ * bin/run-example ml.JavaFPGrowthExample
+ * 
+ */ public class JavaFPGrowthExample { public static void main(String[] args) { SparkSession spark = SparkSession diff --git a/examples/src/main/python/ml/fpgrowth_example.py b/examples/src/main/python/ml/fpgrowth_example.py new file mode 100644 index 000000000000..41d59d903869 --- /dev/null +++ b/examples/src/main/python/ml/fpgrowth_example.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# $example on$ +from pyspark.ml.fpm import FPGrowth +# $example off$ +from pyspark.sql import SparkSession + +""" +An example demonstrating FPGrowth. +Run with: + bin/spark-submit examples/src/main/python/ml/fpgrowth_example.py +""" + +if __name__ == "__main__": + spark = SparkSession\ + .builder\ + .appName("FPGrowthExample")\ + .getOrCreate() + + # $example on$ + df = spark.createDataFrame([ + (0, [1, 2, 5]), + (1, [1, 2, 3, 5]), + (2, [1, 2]) + ], ["id", "items"]) + + fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6) + fpGrowthModel = fpGrowth.fit(df) + + fpGrowthModel.transform(df).show() + # $example off$ + + spark.stop() From e9b090ac07291de9e09c81a0bb371fffb3384a4f Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 30 Mar 2017 17:09:40 -0700 Subject: [PATCH 07/10] docs update --- docs/ml-frequent-pattern-mining.md | 21 ++++++++++++------- docs/mllib-frequent-pattern-mining.md | 2 +- .../org/apache/spark/ml/fpm/FPGrowth.scala | 9 +++++--- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/docs/ml-frequent-pattern-mining.md b/docs/ml-frequent-pattern-mining.md index 154d11098609..94c5507020a0 100644 --- a/docs/ml-frequent-pattern-mining.md +++ b/docs/ml-frequent-pattern-mining.md @@ -27,27 +27,32 @@ explicitly, which are usually expensive to generate. After the second step, the frequent itemsets can be extracted from the FP-tree. In `spark.mllib`, we implemented a parallel version of FP-growth called PFP, as described in [Li et al., PFP: Parallel FP-growth for query recommendation](http://dx.doi.org/10.1145/1454008.1454027). -PFP distributes the work of growing FP-trees based on the suffices of transactions, -and hence more scalable than a single-machine implementation. +PFP distributes the work of growing FP-trees based on the suffixes of transactions, +and hence is more scalable than a single-machine implementation. We refer users to the papers for more details. `spark.ml`'s FP-growth implementation takes the following (hyper-)parameters: * `minSupport`: the minimum support for an itemset to be identified as frequent. For example, if an item appears 3 out of 5 transactions, it has a support of 3/5=0.6. -* `minConfidence`: minimum confidence for generating Association Rule. The parameter will not affect the mining - for frequent itemsets,, but specify the minimum confidence for generating association rules from frequent itemsets. +* `minConfidence`: minimum confidence for generating Association Rule. Confidence is an indication of how often an + association rule has been found to be true. For example, if in the transactions itemset `X` appears 4 times, `X` + and `Y` co-occur only 2 times, the confidence for the rule `X => Y` is then 2/4 = 0.5. The parameter will not + affect the mining for frequent itemsets, but specify the minimum confidence for generating association rules + from frequent itemsets. * `numPartitions`: the number of partitions used to distribute the work. By default the param is not set, and - partition number of the input dataset is used. + number of partitions of the input dataset is used. The `FPGrowthModel` provides: * `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long]) * `associationRules`: association rules generated with confidence above `minConfidence`, in the format of DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]). -* `transform`: The transform method examines the input items in `itemsCol` against all the association rules and - summarize the consequents as prediction. The prediction column has the same data type as the - `itemsCol` and does not contain existing items in the `itemsCol`. +* `transform`: For each transaction in itemsCol, the `transform` method will compare its items against the antecedents + of each association rule. If the record contains all the antecedents of a specific association rule, the rule + will be considered as applicable and its consequents will be added to the prediction result. The transform + method will summarize the consequents from all the applicable rules as prediction. The prediction column has + the same data type as `itemsCol` and does not contain existing items in the `itemsCol`. **Examples** diff --git a/docs/mllib-frequent-pattern-mining.md b/docs/mllib-frequent-pattern-mining.md index 93e3f0b2d226..c9cd7cc85e75 100644 --- a/docs/mllib-frequent-pattern-mining.md +++ b/docs/mllib-frequent-pattern-mining.md @@ -24,7 +24,7 @@ explicitly, which are usually expensive to generate. After the second step, the frequent itemsets can be extracted from the FP-tree. In `spark.mllib`, we implemented a parallel version of FP-growth called PFP, as described in [Li et al., PFP: Parallel FP-growth for query recommendation](http://dx.doi.org/10.1145/1454008.1454027). -PFP distributes the work of growing FP-trees based on the suffices of transactions, +PFP distributes the work of growing FP-trees based on the suffixes of transactions, and hence more scalable than a single-machine implementation. We refer users to the papers for more details. diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index d088bfae3a39..678c8d9e21e7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -227,9 +227,12 @@ class FPGrowthModel private[ml] ( /** * The transform method first generates the association rules according to the frequent itemsets. - * Then for each association rule, it will examine the input items against antecedents and - * summarize the consequents as prediction. The prediction column has the same data type as the - * input column(Array[T]) and will not contain existing items in the input column. The null + * Then for each transaction in itemsCol, the transform method will compare its items against the + * antecedents of each association rule. If the record contains all the antecedents of a + * specific association rule, the rule will be considered as applicable and its consequents + * will be added to the prediction result. The transform method will summarize the consequents + * from all the applicable rules as prediction. The prediction column has the same data type as + * the input column(Array[T]) and will not contain existing items in the input column. The null * values in the itemsCol columns are treated as empty sets. * WARNING: internally it collects association rules to the driver and uses broadcast for * efficiency. This may bring pressure to driver memory for large set of association rules. From 0fb5a8757f351649a1d6648e74f126a720b37b21 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 11 Apr 2017 12:57:54 -0700 Subject: [PATCH 08/10] refine python example --- examples/src/main/python/ml/fpgrowth_example.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/examples/src/main/python/ml/fpgrowth_example.py b/examples/src/main/python/ml/fpgrowth_example.py index 41d59d903869..c92c3c27abb2 100644 --- a/examples/src/main/python/ml/fpgrowth_example.py +++ b/examples/src/main/python/ml/fpgrowth_example.py @@ -40,9 +40,17 @@ ], ["id", "items"]) fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6) - fpGrowthModel = fpGrowth.fit(df) + model = fpGrowth.fit(df) - fpGrowthModel.transform(df).show() + # Display frequent itemsets. + model.freqItemsets.show() + + # Display generated association rules. + model.associationRules.show() + + # transform examines the input items against all the association rules and summarize the + # consequents as prediction + model.transform(df).show() # $example off$ spark.stop() From 2b1efb34493272abf46ec5142aa26117d83ed63b Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Wed, 19 Apr 2017 11:13:23 -0700 Subject: [PATCH 09/10] add R example --- docs/ml-frequent-pattern-mining.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/ml-frequent-pattern-mining.md b/docs/ml-frequent-pattern-mining.md index 94c5507020a0..124c3a520e75 100644 --- a/docs/ml-frequent-pattern-mining.md +++ b/docs/ml-frequent-pattern-mining.md @@ -77,4 +77,11 @@ Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowt {% include_example python/ml/fpgrowth_example.py %} +
+ +Refer to the [R API docs](api/R/spark.fpGrowth.html) for more details. + +{% include_example r/ml/fpm.R %} +
+ From ea3b9733329b7f50985d99001c179b040f802875 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 28 Apr 2017 16:02:07 -0700 Subject: [PATCH 10/10] remove code change per comments --- docs/ml-frequent-pattern-mining.md | 2 +- .../main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/ml-frequent-pattern-mining.md b/docs/ml-frequent-pattern-mining.md index 124c3a520e75..81634de8aade 100644 --- a/docs/ml-frequent-pattern-mining.md +++ b/docs/ml-frequent-pattern-mining.md @@ -48,7 +48,7 @@ The `FPGrowthModel` provides: * `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long]) * `associationRules`: association rules generated with confidence above `minConfidence`, in the format of DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]). -* `transform`: For each transaction in itemsCol, the `transform` method will compare its items against the antecedents +* `transform`: For each transaction in `itemsCol`, the `transform` method will compare its items against the antecedents of each association rule. If the record contains all the antecedents of a specific association rule, the rule will be considered as applicable and its consequents will be added to the prediction result. The transform method will summarize the consequents from all the applicable rules as prediction. The prediction column has diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 22a6deb313d5..8f00daa59f1a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -81,7 +81,7 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol { def getNumPartitions: Int = $(numPartitions) /** - * Minimal confidence for generating Association Rule. MinConfidence will not affect the mining + * Minimal confidence for generating Association Rule. minConfidence will not affect the mining * for frequent itemsets, but will affect the association rules generation. * Default: 0.8 * @group param @@ -269,8 +269,12 @@ class FPGrowthModel private[ml] ( val predictUDF = udf((items: Seq[_]) => { if (items != null) { val itemset = items.toSet - brRules.value.filter(_._1.forall(itemset.contains)) - .flatMap(_._2.filter(!itemset.contains(_))).distinct + brRules.value.flatMap(rule => + if (items != null && rule._1.forall(item => itemset.contains(item))) { + rule._2.filter(item => !itemset.contains(item)) + } else { + Seq.empty + }).distinct } else { Seq.empty }}, dt)