Skip to content

Latest commit

 

History

History
312 lines (211 loc) · 9.85 KB

06_data_preparation.md

File metadata and controls

312 lines (211 loc) · 9.85 KB

Data Preparation

DataFrame Metadata

Column metadata is one of the most useful and the least known features of the Spark Dataset. It is worth noting that all features described below, although not private, are part of the developer API and as such can be unstable or even removed in minor versions.

Metadata in ML pipelines

Although it is widely used by ML Pipelines to indicate variable types and levels a whole process is usually completely transparent and at least partially hidden from the final user so let's look at a simple pipeline and see what happens behind the scenes.

We'll start with a simple dataset:

val df = Seq(
  (0.0, "x", 2.0),
  (1.0, "y", 3.0),
  (2.0, "x", -1.0)
).toDF("label", "x1", "x2")

and a following pipeline:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}

val stages = Array(
  new StringIndexer().setInputCol("x1").setOutputCol("x1_"),
  new VectorAssembler().setInputCols(Array("x1_", "x2")).setOutputCol("features")
)

val model = new Pipeline().setStages(stages).fit(df)

Now we can extract stages, transform data step-by-step:

val dfs = model.stages.scanLeft(df)((df, stage) => stage.transform(df))

and see what is going on at each stage:

  1. Our initial dataset has no metadata:
dfs(0).schema.map(_.metadata)

// Seq[org.apache.spark.sql.types.Metadata] = List({}, {}, {}, {})
  1. After transforming with StringIndexerModel we can see indexer specific metadata:
dfs(1).schema.last.metadata

// org.apache.spark.sql.types.Metadata =
//   {"ml_attr":{"vals":["x","y"],"type":"nominal","name":"x1_"}}

It is important to note that this information is stored locally so it is better to keep that in mind if number of unique values is large.

  1. Finally metadata for assembled feature vector:
dfs(2).schema.last.metadata

// org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{
//   "numeric":[{"idx":1,"name":"x2"}],
//   "nominal":[{"vals":["x","y"],"idx":0,"name":"x1_"}]
// },"num_attrs":2}}

Metadata from upstream stages is picked by the assembler and used to describe vector indices.

Let's check if metadata is actually used in practice:

import org.apache.spark.ml.classification.DecisionTreeClassifier

new DecisionTreeClassifier().setLabelCol("label").fit(dfs.last).toDebugString

// String =
// "DecisionTreeClassificationModel (uid=dtc_72c1c370aa00) of depth 2 with 5 nodes
//   If (feature 0 in {0.0})
//    If (feature 1 <= -1.0)
//     Predict: 2.0
//    Else (feature 1 > -1.0)
//     Predict: 1.0
//   Else (feature 0 not in {0.0})
//    Predict: 0.0
// "

Note: Prior to Spark 2.0.0 label column would require indexing.

As you can see nominal and numerical features are recognized and used in different ways. Which is exactly the thing we would expect.

Setting ML attributes manually

Scala

So far so good but what if you work with data which has been already preprocessed? In case like this Spark provides a set of utilities designed to create ML compliant metadata. Let's get familiar with a whole process by building metadata equivalent to the one generated by the ML pipeline we used before.

We'll need a NominalAttribute:

import org.apache.spark.ml.attribute.NominalAttribute

val firstAttr = NominalAttribute.defaultAttr.withValues("x", "y").withName("x1_")

and a NumericAttribute

import org.apache.spark.ml.attribute.NumericAttribute

val secondAttr = NumericAttribute.defaultAttr.withName("x2")

Numeric attributes provide also a number of methods which can be used to store basic descriptive statistics like mean, standard deviation, minimum or maximum

Finally we combine attributes using AttributeGroup and convert it to Metadata object:

import org.apache.spark.ml.attribute.AttributeGroup

val featuresMetadata = new AttributeGroup("features", Array(firstAttr, secondAttr))

All what is left is quick sanity check:

featuresMetadata == dfs.last.schema.last.metadata

// Boolean = true

Generated metadata can be applied using Column.as method:

/* Note We use local MLib API only to show VectorUDT usage.
 */

import org.apache.spark.mllib.linalg.Vectors

val records = Seq(
  (0.0, Vectors.dense(Array(0.0, 2.0))),
  (1.0, Vectors.dense(Array(1.0, 3.0))),
  (2.0, Vectors.dense(Array(0.0, -1.0)))
)

records.toDF("label", "features")
  .withColumn("features", $"features".as("features", featuresMetadata))

or added to schema when creating DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.mllib.linalg.VectorUDT

val schema = StructType(Seq(
  StructField("label", DoubleType, false),
  StructField("features", new VectorUDT(), false, featuresMetadata)
))
Python

Unlike Scala Python doesn't provide any helpers and metadata is simply represented as a standard Python dict.

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

df = sc.parallelize((
    (0.0, "x", 2.0),
    (1.0, "y", 3.0),
    (2.0, "x", -1.0)
)).toDF(["label", "x1", "x2"])


model = Pipeline(stages=[
    StringIndexer(inputCol="x1", outputCol="x1_"),
    VectorAssembler(inputCols=["x1_", "x2"], outputCol="features"),
]).fit(df)

model.transform(df).schema[-1].metadata

## {'ml_attr': {'attrs': {'nominal': [{'idx': 0,
##      'name': 'x1_',
##      'vals': ['x', 'y']}],
##    'numeric': [{'idx': 1, 'name': 'x2'}]},
##   'num_attrs': 2}}

Before Spark 2.2, PySpark doesn't support attaching metadata to a single column. It is possible though, to use method similar to this one:

import json

from pyspark import SparkContext
from pyspark.sql import Column
from pyspark.sql.functions import col

def withMeta(self, alias, meta):
    sc = SparkContext._active_spark_context
    jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata
    return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta))))

Column.withMeta = withMeta


meta = {"ml_attr": {"name": "label_with_meta",
  "type": "nominal",
  "vals": ["0.0", "1.0", "2.0"]}}


df_with_meta = df.withColumn("label_with_meta", col("label").withMeta("", meta))
df_with_meta.schema[-1].metadata == meta

## True

Spark 2.2 added support for setting metadata with Column.alias:

df_with_meta = df.withColumn("label_with_meta", col("label").alias("label", metadata=meta))
df_with_meta.schema[-1].metadata == meta

## True

Setting custom column metadata

Arguably the true power of metadata shows itself when used outside restricted ML environment. It is possible to attach an arbitrary JSON document to each column using it to provenance tracking, storing diagnostic information or performing different data enrichment tasks.

Metadata object created from JSON string:

import org.apache.spark.sql.types.Metadata


Metadata.fromJson("""{"foo": "bar"}""")

// org.apache.spark.sql.types.Metadata = {"foo":"bar"}

or constructed using MetadataBuilder:

import org.apache.spark.sql.types.MetadataBuilder

new MetadataBuilder().putString("foo", "bar").build

// org.apache.spark.sql.types.Metadata = {"foo":"bar"}

Moreover it can attached to Parquet files and loaded back later:

Seq((1L, "foo"), (2L, "bar"))
  .toDF("id", "txt")
  .withColumn("id", $"id".as("", Metadata.fromJson("""{"foo": "bar"}""")))
  .write.parquet("/tmp/foo")


spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata)

// Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"})

Accessing Metadata Directly

Metadata can be also accessed directly using Parquet tools:

import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}

import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration


val conf = spark.sparkContext.hadoopConfiguration

def getFooters(conf: Configuration, path: String) = {
  val fs = FileSystem.get(conf)
  val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
  footers
}

def getFileMetadata(conf: Configuration, path: String) = {
  getFooters(conf, path)
    .asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala)
}

getFileMetadata(conf, "/tmp/foo").headOption

// Option[scala.collection.mutable.Map[String,String]] =
//   Some(Map(org.apache.spark.sql.parquet.row.metadata ->
//     {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}}
//     {"name":"txt","type":"string","nullable":true,"metadata":{}}]}))

We can also use extracted footers to write standalone metadata file when needed:

import org.apache.parquet.hadoop.ParquetFileWriter

def createMetadata(conf: Configuration, path: String) = {
  val footers = getFooters(conf, path)
  ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers)
}