Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 0 additions & 72 deletions mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,76 +191,4 @@ object ImageSchema {
Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
}
}

/**
* Read the directory of images from the local or remote source
*
* @note If multiple jobs are run in parallel with different sampleRatio or recursive flag,
* there may be a race condition where one job overwrites the hadoop configs of another.
* @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
* potentially non-deterministic.
*
* @param path Path to the image directory
* @return DataFrame with a single column "image" of images;
* see ImageSchema for the details
*/
@deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " +
"removed in 3.0.0.", "2.4.0")
def readImages(path: String): DataFrame = readImages(path, null, false, -1, false, 1.0, 0)

/**
* Read the directory of images from the local or remote source
*
* @note If multiple jobs are run in parallel with different sampleRatio or recursive flag,
* there may be a race condition where one job overwrites the hadoop configs of another.
* @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
* potentially non-deterministic.
*
* @param path Path to the image directory
* @param sparkSession Spark Session, if omitted gets or creates the session
* @param recursive Recursive path search flag
* @param numPartitions Number of the DataFrame partitions,
* if omitted uses defaultParallelism instead
* @param dropImageFailures Drop the files that are not valid images from the result
* @param sampleRatio Fraction of the files loaded
* @return DataFrame with a single column "image" of images;
* see ImageSchema for the details
*/
@deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " +
"removed in 3.0.0.", "2.4.0")
def readImages(
path: String,
sparkSession: SparkSession,
recursive: Boolean,
numPartitions: Int,
dropImageFailures: Boolean,
sampleRatio: Double,
seed: Long): DataFrame = {
require(sampleRatio <= 1.0 && sampleRatio >= 0, "sampleRatio should be between 0 and 1")

val session = if (sparkSession != null) sparkSession else SparkSession.builder().getOrCreate
val partitions =
if (numPartitions > 0) {
numPartitions
} else {
session.sparkContext.defaultParallelism
}

RecursiveFlag.withRecursiveFlag(recursive, session) {
SamplePathFilter.withPathFilter(sampleRatio, session, seed) {
val binResult = session.sparkContext.binaryFiles(path, partitions)
val streams = if (numPartitions == -1) binResult else binResult.repartition(partitions)
val convert = (origin: String, bytes: PortableDataStream) =>
decode(origin, bytes.toArray())
val images = if (dropImageFailures) {
streams.flatMap { case (origin, bytes) => convert(origin, bytes) }
} else {
streams.map { case (origin, bytes) =>
convert(origin, bytes).getOrElse(invalidImageRow(origin))
}
}
session.createDataFrame(images, imageSchema)
}
}
}
}
171 changes: 0 additions & 171 deletions mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,24 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {
private lazy val imagePath = "../data/mllib/images/partitioned"
private lazy val recursiveImagePath = "../data/mllib/images"

test("Smoke test: create basic ImageSchema dataframe") {
val origin = "path"
val width = 1
val height = 1
val nChannels = 3
val data = Array[Byte](0, 0, 0)
val mode = ocvTypes("CV_8UC3")

// Internal Row corresponds to image StructType
val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)),
Row(Row(null, height, width, nChannels, mode, data)))
val rdd = sc.makeRDD(rows)
val df = spark.createDataFrame(rdd, imageSchema)

assert(df.count === 2, "incorrect image count")
assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
}

test("image datasource count test") {
val df1 = spark.read.format("image").load(imagePath)
assert(df1.count === 9)
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@ object MimaExcludes {

// [SPARK-28556][SQL] QueryExecutionListener should also notify Error
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),

// [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages")
)

// Exclude rules for 2.4.x
Expand Down
38 changes: 0 additions & 38 deletions python/pyspark/ml/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,44 +203,6 @@ def toImage(self, array, origin=""):
return _create_row(self.imageFields,
[origin, height, width, nChannels, mode, data])

def readImages(self, path, recursive=False, numPartitions=-1,
dropImageFailures=False, sampleRatio=1.0, seed=0):
"""
Reads the directory of images from the local or remote source.

.. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag,
there may be a race condition where one job overwrites the hadoop configs of another.

.. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
potentially non-deterministic.

.. note:: Deprecated in 2.4.0. Use `spark.read.format("image").load(path)` instead and
this `readImages` will be removed in 3.0.0.

:param str path: Path to the image directory.
:param bool recursive: Recursive search flag.
:param int numPartitions: Number of DataFrame partitions.
:param bool dropImageFailures: Drop the files that are not valid images.
:param float sampleRatio: Fraction of the images loaded.
:param int seed: Random number seed.
:return: a :class:`DataFrame` with a single column of "images",
see ImageSchema for details.

>>> df = ImageSchema.readImages('data/mllib/images/origin/kittens', recursive=True)
>>> df.count()
5

.. versionadded:: 2.3.0
"""
warnings.warn("`ImageSchema.readImage` is deprecated. " +
"Use `spark.read.format(\"image\").load(path)` instead.", DeprecationWarning)
spark = SparkSession.builder.getOrCreate()
image_schema = spark._jvm.org.apache.spark.ml.image.ImageSchema
jsession = spark._jsparkSession
jresult = image_schema.readImages(path, jsession, recursive, numPartitions,
dropImageFailures, float(sampleRatio), seed)
return DataFrame(jresult, spark._wrapped)


ImageSchema = _ImageSchema()

Expand Down
29 changes: 19 additions & 10 deletions python/pyspark/ml/tests/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
from pyspark.testing.utils import QuietTest


class ImageReaderTest(SparkSessionTestCase):
class ImageFileFormatTest(SparkSessionTestCase):

def test_read_images(self):
data_path = 'data/mllib/images/origin/kittens'
df = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
df = self.spark.read.format("image") \
.option("dropInvalid", True) \
.option("recursiveFileLookup", True) \
.load(data_path)
self.assertEqual(df.count(), 4)
first_row = df.take(1)[0][0]
# compare `schema.simpleString()` instead of directly compare schema,
# because the df loaded from datasouce may change schema column nullability.
self.assertEqual(df.schema.simpleString(), ImageSchema.imageSchema.simpleString())
self.assertEqual(df.schema["image"].dataType.simpleString(),
ImageSchema.columnSchema.simpleString())
array = ImageSchema.toNDArray(first_row)
self.assertEqual(len(array), first_row[1])
self.assertEqual(ImageSchema.toImage(array, origin=first_row[0]), first_row)
self.assertEqual(df.schema, ImageSchema.imageSchema)
self.assertEqual(df.schema["image"].dataType, ImageSchema.columnSchema)
expected = {'CV_8UC3': 16, 'Undefined': -1, 'CV_8U': 0, 'CV_8UC1': 0, 'CV_8UC4': 24}
self.assertEqual(ImageSchema.ocvTypes, expected)
expected = ['origin', 'height', 'width', 'nChannels', 'mode', 'data']
Expand All @@ -61,11 +67,11 @@ def test_read_images(self):
lambda: ImageSchema.toImage("a"))


class ImageReaderTest2(PySparkTestCase):
class ImageFileFormatOnHiveContextTest(PySparkTestCase):

@classmethod
def setUpClass(cls):
super(ImageReaderTest2, cls).setUpClass()
super(ImageFileFormatOnHiveContextTest, cls).setUpClass()
cls.hive_available = True
# Note that here we enable Hive's support.
cls.spark = None
Expand All @@ -86,17 +92,20 @@ def setUp(self):

@classmethod
def tearDownClass(cls):
super(ImageReaderTest2, cls).tearDownClass()
super(ImageFileFormatOnHiveContextTest, cls).tearDownClass()
if cls.spark is not None:
cls.spark.sparkSession.stop()
cls.spark = None

def test_read_images_multiple_times(self):
# This test case is to check if `ImageSchema.readImages` tries to
# This test case is to check if ImageFileFormat tries to
# initiate Hive client multiple times. See SPARK-22651.
data_path = 'data/mllib/images/origin/kittens'
ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
for i in range(2):
self.spark.read.format("image") \
.option("dropInvalid", True) \
.option("recursiveFileLookup", True) \
.load(data_path)


if __name__ == "__main__":
Expand Down