Skip to content

Commit

Permalink
Fixed duplicate iters
Browse files Browse the repository at this point in the history
  • Loading branch information
mdellabitta committed Jan 2, 2025
1 parent 9dd348c commit c133be6
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,6 @@ class CommunityWebsHarvester(
}
}

/** Implements a stream of files from the zip Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param zipInputStream
* @return
* Lazy stream of zip records
*/
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory)
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

override def localHarvest(): DataFrame = {
val harvestTime = System.currentTimeMillis()
Expand All @@ -138,7 +118,7 @@ class CommunityWebsHarvester(
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
iter(inputStream).foreach(result => {
FileHarvester.iter(inputStream).foreach(result => {
handleFile(result, unixEpoch) match {
case Failure(exception) =>
LogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,6 @@ class DlgFileHarvester(
}
}

/** Implements a stream of files from the zip Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param zipInputStream ZipInputStream from the zip file
* @return
* Lazy stream of zip records
*/
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory)
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

/** Executes the Georgia harvest
*/
Expand All @@ -142,7 +122,7 @@ class DlgFileHarvester(
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
iter(inputStream).foreach(result => {
FileHarvester.iter(inputStream).foreach(result => {
handleFile(result, unixEpoch) match {
case Failure(exception) =>
LogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,6 @@ class DplaJsonlFileHarvester(
}
}

/** Implements a stream of files from the zip Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param zipInputStream
* @return
* Lazy stream of zip records
*/
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory)
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

/** Executes the DPLA JSON file harvest
*/
override def localHarvest(): DataFrame = {
Expand All @@ -147,7 +126,7 @@ class DplaJsonlFileHarvester(
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
iter(inputStream).foreach( result => {
FileHarvester.iter(inputStream).foreach( result => {
handleFile(result, unixEpoch) match {
case Failure(exception) =>
LogManager
Expand Down
85 changes: 64 additions & 21 deletions src/main/scala/dpla/ingestion3/harvesters/file/FileHarvester.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package dpla.ingestion3.harvesters.file

import java.io.BufferedReader

import java.io.{BufferedReader, InputStreamReader}
import dpla.ingestion3.confs.i3Conf
import dpla.ingestion3.harvesters.{Harvester, LocalHarvester}
import org.apache.avro.generic.GenericData
import org.apache.commons.io.IOUtils
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.tools.tar.TarInputStream

import java.util.zip.ZipInputStream
import scala.util.Try

/** File based harvester
Expand All @@ -25,25 +27,7 @@ abstract class FileHarvester(
conf: i3Conf
) extends LocalHarvester(spark, shortName, conf) {

/** Case class to hold the results of a file
*
* @param entryName
* Path of the entry in the file
* @param data
* Holds the data for the entry, or None if it's a directory.
* @param bufferedData
* Holds a buffered reader for the entry if it's too large to be held in
* memory.
*/
case class FileResult(
entryName: String,
data: Option[Array[Byte]],
bufferedData: Option[BufferedReader] = None
)

/** Case class hold the parsed value from a given FileResult
*/
case class ParsedResult(id: String, item: String)

/** Parses and extracts ZipInputStream and writes parses records out.
*
Expand All @@ -61,7 +45,7 @@ abstract class FileHarvester(
* @param item
* Harvested record
*/
// todo this is in harvester?

def writeOut(unixEpoch: Long, item: ParsedResult): Unit = {
val avroWriter = getAvroWriter
val genericRecord = new GenericData.Record(Harvester.schema)
Expand All @@ -76,3 +60,62 @@ abstract class FileHarvester(
def flush(): Unit = getAvroWriter.flush()

}

/** Case class to hold the results of a file
*
* @param entryName
* Path of the entry in the file
* @param data
* Holds the data for the entry, or None if it's a directory.
* @param bufferedData
* Holds a buffered reader for the entry if it's too large to be held in
* memory.
*/
case class FileResult(
entryName: String,
data: Option[Array[Byte]],
bufferedData: Option[BufferedReader] = None
)

/** Case class hold the parsed value from a given FileResult
*/

case class ParsedResult(id: String, item: String)

object FileHarvester {
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory || entry.getName.contains("._"))
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

def iter(tarInputStream: TarInputStream): LazyList[FileResult] =
Option(tarInputStream.getNextEntry) match {
case None =>
LazyList.empty

case Some(entry) =>
val filename = Try {
entry.getName
}.getOrElse("")

val result =
if (
entry.isDirectory || filename.contains("._")
) // drop OSX hidden files
None
else if (filename.endsWith(".xml")) // only read xml files
Some(IOUtils.toByteArray(tarInputStream, entry.getSize))
else
None

FileResult(entry.getName, result) #:: iter(tarInputStream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,6 @@ class FlFileHarvester(
}
}

/** Implements a stream of files from the zip Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param zipInputStream
* @return
* Lazy stream of zip records
*/
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory)
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

/** Executes the Florida harvest
*/
override def localHarvest(): DataFrame = {
Expand All @@ -146,7 +125,7 @@ class FlFileHarvester(
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
iter(inputStream).foreach(result => {
FileHarvester.iter(inputStream).foreach(result => {
handleFile(result, unixEpoch) match {
case Failure(exception) =>
LogManager.getLogger(this.getClass).error(s"Caught exception on $inFile.", exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,37 +80,6 @@ class HathiFileHarvester(
}
}

/** Implements a stream of files from the tar. Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param tarInputStream
* @return
* Lazy stream of tar records
*/
def iter(tarInputStream: TarInputStream): LazyList[FileResult] =
Option(tarInputStream.getNextEntry) match {
case None =>
LazyList.empty

case Some(entry) =>
val filename = Try {
entry.getName
}.getOrElse("")

val result =
if (
entry.isDirectory || filename.contains("._")
) // drop OSX hidden files
None
else if (filename.endsWith(".xml")) // only read xml files
Some(IOUtils.toByteArray(tarInputStream, entry.getSize))
else
None

FileResult(entry.getName, result) #:: iter(tarInputStream)
}

/** Executes the harvest
*/
override def localHarvest(): DataFrame = {
Expand All @@ -129,7 +98,7 @@ class HathiFileHarvester(
)
)

iter(inputStream).foreach(tarResult => {
FileHarvester.iter(inputStream).foreach(tarResult => {
handleFile(tarResult, unixEpoch) match {
case Failure(exception) =>
logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,6 @@ class HeartlandFileHarvester(
}
}

/** Implements a stream of files from the zip Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param zipInputStream
* @return
* Lazy stream of zip records
*/
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory)
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

/** Executes the Heartland (Missouri + Iowa) harvest
*/
Expand All @@ -145,7 +125,7 @@ class HeartlandFileHarvester(
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
iter(inputStream).foreach(result =>
FileHarvester.iter(inputStream).foreach(result =>
handleFile(result, unixEpoch) match {
case Failure(exception) =>
LogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,6 @@ class NYPLFileHarvester(
}
}

/** Implements a stream of files from the zip Can't use @tailrec here because
* the compiler can't recognize it as tail recursive, but this won't blow the
* stack.
*
* @param zipInputStream
* @return
* Lazy stream of zip records
*/
def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
LazyList.empty
case Some(entry) =>
val result =
if (entry.isDirectory)
None
else
Some(new BufferedReader(new InputStreamReader(zipInputStream)))
FileResult(entry.getName, None, result) #:: iter(zipInputStream)
}

/** Executes the NYPL harvest
*/
override def localHarvest(): DataFrame = {
Expand All @@ -155,7 +134,7 @@ class NYPLFileHarvester(
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
iter(inputStream).foreach( result =>
FileHarvester.iter(inputStream).foreach( result =>
handleFile(result, unixEpoch) match {
case Failure(exception) =>
LogManager.getLogger(this.getClass).error(s"Caught exception on $inFile.", exception)
Expand Down
Loading

0 comments on commit c133be6

Please sign in to comment.