Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions notebooks/samples/302 - Pipeline Image Transformations.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"mml-deploy": "hdinsight",
"collapsed": true
},
"metadata": {},
"outputs": [],
"source": [
"IMAGE_PATH = \"/datasets/CIFAR10/test\""
Expand Down Expand Up @@ -97,6 +94,64 @@
"print(images.count())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also alternatively stream the images with a similiar api.\n",
"Check the [Structured Streaming Programming Guide](",
"https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)\n",
"for more details on streaming."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"imageStream = spark.streamImages(IMAGE_PATH + \"/*\", sampleRatio = 0.1)\n",
"query = imageStream.select(\"image.height\").writeStream.format(\"memory\").queryName(\"heights\").start()\n",
"print(\"Streaming query activity: {}\".format(query.isActive))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Wait a few seconds and then try querying for the images below.\n",
"Note that when streaming a directory of images that already exists it will\n",
"consume all images in a single batch. If one were to move images into the\n",
"directory, the streaming engine would pick up on them and send them as\n",
"another batch."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"heights = spark.sql(\"select * from heights\")\n",
"print(\"Streamed {} heights\".format(heights.count()))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After we have streamed the images we can stop the query:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down Expand Up @@ -208,6 +263,13 @@
"print(type(vector))\n",
"len(vector.toArray())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -227,7 +289,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
"version": "3.6.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did this work before? Because we rewrite the notebook at build time?

}
},
"nbformat": 4,
Expand Down
7 changes: 4 additions & 3 deletions src/cntk-model/src/main/scala/CNTKModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.spark.ml.linalg.{DenseVector, Vectors}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -245,14 +246,14 @@ class CNTKModel(override val uid: String) extends Model[CNTKModel] with ComplexP

val inputType = df.schema($(inputCol)).dataType
val broadcastedModel = broadcastedModelOption.getOrElse(spark.sparkContext.broadcast(getModel))
val rdd = df.rdd.mapPartitions(
val encoder = RowEncoder(df.schema.add(StructField(getOutputCol, VectorType)))
val output = df.mapPartitions(
CNTKModelUtils.applyModel(selectedIndex,
broadcastedModel,
getMiniBatchSize,
getInputNode,
get(outputNodeName),
get(outputNodeIndex)))
val output = spark.createDataFrame(rdd, df.schema.add(StructField(getOutputCol, VectorType)))
get(outputNodeIndex)))(encoder)

coersionOptionUDF match {
case Some(_) => output.drop(coercedCol)
Expand Down
29 changes: 12 additions & 17 deletions src/core/env/src/main/scala/StreamUtilities.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

package com.microsoft.ml.spark

import java.io.ByteArrayOutputStream
import java.io.{ByteArrayOutputStream, InputStream}
import java.util.zip.ZipInputStream

import org.apache.commons.io.IOUtils
import org.apache.spark.input.PortableDataStream

import scala.util.Random

object StreamUtilities {
Expand Down Expand Up @@ -34,40 +36,33 @@ object StreamUtilities {

/** Iterate through the entries of a streamed .zip file, selecting only sampleRatio of them
*
* @param portableStream Stream of zip file
* @param zipfile File name is only used to construct the names of the entries
* @param sampleRatio What fraction of files is returned from zip
* @param stream Stream of zip file
* @param zipfile File name is only used to construct the names of the entries
* @param sampleRatio What fraction of files is returned from zip
*/
class ZipIterator(portableStream: PortableDataStream, zipfile: String, sampleRatio: Double = 1)
class ZipIterator(stream: InputStream, zipfile: String, random: Random, sampleRatio: Double = 1)
extends Iterator[(String, Array[Byte])] {

val stream = portableStream.open
private val zipstream = new ZipInputStream(stream)

val random = {
val rd = new Random()
rd.setSeed(0)
rd
}
private val zipStream = new ZipInputStream(stream)

private def getNext: Option[(String, Array[Byte])] = {
var entry = zipstream.getNextEntry
var entry = zipStream.getNextEntry
while (entry != null) {
if (!entry.isDirectory && random.nextDouble < sampleRatio) {

val filename = zipfile + java.io.File.separator + entry.getName()

//extracting all bytes of a given entry
val byteStream = new ByteArrayOutputStream
IOUtils.copy(zipstream, byteStream)
IOUtils.copy(zipStream, byteStream)
val bytes = byteStream.toByteArray

assert(bytes.length == entry.getSize,
"incorrect number of bytes is read from zipstream: " + bytes.length + " instead of " + entry.getSize)

return Some((filename, bytes))
}
entry = zipstream.getNextEntry
entry = zipStream.getNextEntry
}

stream.close()
Expand All @@ -76,7 +71,7 @@ object StreamUtilities {

private var nextValue = getNext

def hasNext: Boolean = !nextValue.isEmpty
def hasNext: Boolean = nextValue.isDefined

def next: (String, Array[Byte]) = {
val result = nextValue.get
Expand Down
111 changes: 2 additions & 109 deletions src/core/hadoop/src/main/scala/HadoopUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,9 @@

package com.microsoft.ml.spark.hadoop

import java.nio.file.Paths

import org.apache.commons.io.FilenameUtils

import scala.sys.process._
import org.apache.hadoop.conf.{Configuration, Configured}
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.conf.Configuration
import scala.language.existentials
import scala.util.Random
import scala.sys.process._

class HadoopUtils(hadoopConf: Configuration) {
// Is there a better way? We need to deduce full Hadoop conf
Expand Down Expand Up @@ -73,102 +65,3 @@ class HadoopUtils(hadoopConf: Configuration) {
}

}

/** Filter that allows loading a fraction of HDFS files. */
class SamplePathFilter extends Configured with PathFilter {
val random = {
val rd = new Random()
rd.setSeed(0)
rd
}

// Ratio of files to be read from disk
var sampleRatio: Double = 1

// When inspectZip is enabled, zip files are treated as directories, and SamplePathFilter can't filter them out.
// Otherwise, zip files are treated as regular files and only sampleRatio of them is read.
var inspectZip: Boolean = true

override def setConf(conf: Configuration): Unit = {
if (conf != null) {
sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
inspectZip = conf.getBoolean(SamplePathFilter.inspectZipParam, true)
}
}

override def accept(path: Path): Boolean = {
// Note: checking fileSystem.isDirectory is very slow here, so we use basic rules instead
!SamplePathFilter.isFile(path) ||
(SamplePathFilter.isZipFile(path) && inspectZip) ||
random.nextDouble() < sampleRatio
}
}

object SamplePathFilter {
val ratioParam = "sampleRatio"
val inspectZipParam = "inspectZip"

def isFile(path: Path): Boolean = FilenameUtils.getExtension(path.toString) != ""

def isZipFile(filename: String): Boolean = FilenameUtils.getExtension(filename) == "zip"

def isZipFile(path: Path): Boolean = isZipFile(path.toString)

/** Set/unset hdfs PathFilter
*
* @param value Filter class that is passed to HDFS
* @param sampleRatio Fraction of the files that the filter picks
* @param inspectZip Look into zip files, if true
* @param spark Existing Spark session
* @return
*/
def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = None,
inspectZip: Option[Boolean] = None, spark: SparkSession)
: Option[Class[_]] = {
val flagName = FileInputFormat.PATHFILTER_CLASS
val hadoopConf = spark.sparkContext.hadoopConfiguration
val old = Option(hadoopConf.getClass(flagName, null))
if (sampleRatio.isDefined) {
hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio.get)
} else {
hadoopConf.unset(SamplePathFilter.ratioParam)
None
}

if (inspectZip.isDefined) {
hadoopConf.setBoolean(SamplePathFilter.inspectZipParam, inspectZip.get)
} else {
hadoopConf.unset(SamplePathFilter.inspectZipParam)
None
}

value match {
case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
case None => hadoopConf.unset(flagName)
}
old
}
}

object RecursiveFlag {

/** Sets a value of spark recursive flag
*
* @param value value to set
* @param spark existing spark session
* @return previous value of this flag
*/
def setRecursiveFlag(value: Option[String], spark: SparkSession): Option[String] = {
val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
val hadoopConf = spark.sparkContext.hadoopConfiguration
val old = Option(hadoopConf.get(flagName))

value match {
case Some(v) => hadoopConf.set(flagName, v)
case None => hadoopConf.unset(flagName)
}

old
}

}
9 changes: 6 additions & 3 deletions src/core/schema/src/main/scala/BinaryFileSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
package com.microsoft.ml.spark.schema

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{StructType, StructField, StringType, BinaryType}
import org.apache.spark.sql.types.{BinaryType, StringType, StructField, StructType}

object BinaryFileSchema {

/** Schema for the binary file column: Row(String, Array[Byte]) */
val columnSchema = StructType(Seq(
StructField("path", StringType, true),
StructField("bytes", BinaryType, true) //raw file bytes
StructField("path", StringType, true),
StructField("bytes", BinaryType, true) // raw file bytes
))

/** Schema for the binary file column: Row(String, Array[Byte]) */
val schema = StructType(StructField("value", columnSchema, true) :: Nil)

def getPath(row: Row): String = row.getString(0)
def getBytes(row: Row): Array[Byte] = row.getAs[Array[Byte]](1)

Expand Down
31 changes: 9 additions & 22 deletions src/core/schema/src/main/scala/ImageSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@

package com.microsoft.ml.spark.schema

import com.microsoft.ml.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import scala.reflect.ClassTag
import org.apache.spark.sql.{DataFrame, Row}

object ImageSchema {

/** Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) */
val columnSchema = StructType(
StructField("path", StringType, true) ::
StructField("height", IntegerType, true) ::
StructField("width", IntegerType, true) ::
StructField("type", IntegerType, true) :: //OpenCV type: CV_8U in most cases
StructField("bytes", BinaryType, true) :: Nil) //OpenCV bytes: row-wise BGR in most cases
StructField("path", StringType, true) ::
StructField("height", IntegerType, true) ::
StructField("width", IntegerType, true) ::
StructField("type", IntegerType, true) :: //OpenCV type: CV_8U in most cases
StructField("bytes", BinaryType, true) :: Nil) //OpenCV bytes: row-wise BGR in most cases

// single column of images named "image"
val schema = StructType(StructField("image", columnSchema, true) :: Nil)

def getPath(row: Row): String = row.getString(0)
def getHeight(row: Row): Int = row.getInt(1)
Expand All @@ -34,17 +34,4 @@ object ImageSchema {
def isImage(df: DataFrame, column: String): Boolean =
df.schema(column).dataType == columnSchema

/** This object will load the openCV binaries when the object is referenced
* for the first time, subsequent references will not re-load the binaries.
* In spark, this loads one copy for each running jvm, instead of once per partition.
* This technique is similar to that used by the cntk_jni jar,
* but in the case where microsoft cannot edit the jar
*/
private[spark] object OpenCVLoader {
import org.opencv.core.Core
new NativeLoader("/nu/pattern/opencv").loadLibraryByName(Core.NATIVE_LIBRARY_NAME)
}

private[spark] def loadOpenCV[T:ClassTag](rdd: RDD[T]):RDD[T] =
rdd.mapPartitions({it => OpenCVLoader; it}, preservesPartitioning = true)
}
Loading