From c133be628e02c951df4187342843fccf715f255a Mon Sep 17 00:00:00 2001 From: Michael Della Bitta Date: Thu, 2 Jan 2025 15:33:56 -0500 Subject: [PATCH] Fixed duplicate iters --- .../file/CommunityWebsHarvester.scala | 22 +---- .../harvesters/file/DlgFileHarvester.scala | 22 +---- .../file/DplaJsonlFileHarvester.scala | 23 +---- .../harvesters/file/FileHarvester.scala | 85 ++++++++++++++----- .../harvesters/file/FlFileHarvester.scala | 23 +---- .../harvesters/file/HathiFileHarvester.scala | 33 +------ .../file/HeartlandFileHarvester.scala | 22 +---- .../harvesters/file/NYPLFileHarvester.scala | 23 +---- .../harvesters/file/NaraDeltaHarvester.scala | 50 +---------- .../harvesters/file/NaraFileHarvester.scala | 35 ++------ .../file/NorthwestHeritageFileHarvester.scala | 13 ++- .../harvesters/file/OaiFileHarvester.scala | 5 +- .../harvesters/file/VaFileHarvester.scala | 27 +----- .../harvesters/file/VtFileHarvester.scala | 26 +----- 14 files changed, 87 insertions(+), 322 deletions(-) diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/CommunityWebsHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/CommunityWebsHarvester.scala index 0b679a637..9bc29d076 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/CommunityWebsHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/CommunityWebsHarvester.scala @@ -105,26 +105,6 @@ class CommunityWebsHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(new BufferedReader(new InputStreamReader(zipInputStream))) - FileResult(entry.getName, None, result) #:: iter(zipInputStream) - } override def localHarvest(): DataFrame = { val harvestTime = System.currentTimeMillis() @@ -138,7 +118,7 @@ class CommunityWebsHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => { + FileHarvester.iter(inputStream).foreach(result => { handleFile(result, unixEpoch) match { case Failure(exception) => LogManager diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/DlgFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/DlgFileHarvester.scala index b56c3098b..839720327 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/DlgFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/DlgFileHarvester.scala @@ -107,26 +107,6 @@ class DlgFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream ZipInputStream from the zip file - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(new BufferedReader(new InputStreamReader(zipInputStream))) - FileResult(entry.getName, None, result) #:: iter(zipInputStream) - } /** Executes the Georgia harvest */ @@ -142,7 +122,7 @@ class DlgFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => { + FileHarvester.iter(inputStream).foreach(result => { handleFile(result, unixEpoch) match { case Failure(exception) => LogManager diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/DplaJsonlFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/DplaJsonlFileHarvester.scala index 14f92642f..bc3021e5e 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/DplaJsonlFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/DplaJsonlFileHarvester.scala @@ -112,27 +112,6 @@ class DplaJsonlFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(new BufferedReader(new InputStreamReader(zipInputStream))) - FileResult(entry.getName, None, result) #:: iter(zipInputStream) - } - /** Executes the DPLA JSON file harvest */ override def localHarvest(): DataFrame = { @@ -147,7 +126,7 @@ class DplaJsonlFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach( result => { + FileHarvester.iter(inputStream).foreach( result => { handleFile(result, unixEpoch) match { case Failure(exception) => LogManager diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/FileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/FileHarvester.scala index 19c7e54b9..cdf0b4c06 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/FileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/FileHarvester.scala @@ -1,13 +1,15 @@ package dpla.ingestion3.harvesters.file -import java.io.BufferedReader - +import java.io.{BufferedReader, InputStreamReader} import dpla.ingestion3.confs.i3Conf import dpla.ingestion3.harvesters.{Harvester, LocalHarvester} import org.apache.avro.generic.GenericData +import org.apache.commons.io.IOUtils import org.apache.log4j.Logger import org.apache.spark.sql.SparkSession +import org.apache.tools.tar.TarInputStream +import java.util.zip.ZipInputStream import scala.util.Try /** File based harvester @@ -25,25 +27,7 @@ abstract class FileHarvester( conf: i3Conf ) extends LocalHarvester(spark, shortName, conf) { - /** Case class to hold the results of a file - * - * @param entryName - * Path of the entry in the file - * @param data - * Holds the data for the entry, or None if it's a directory. - * @param bufferedData - * Holds a buffered reader for the entry if it's too large to be held in - * memory. - */ - case class FileResult( - entryName: String, - data: Option[Array[Byte]], - bufferedData: Option[BufferedReader] = None - ) - /** Case class hold the parsed value from a given FileResult - */ - case class ParsedResult(id: String, item: String) /** Parses and extracts ZipInputStream and writes parses records out. * @@ -61,7 +45,7 @@ abstract class FileHarvester( * @param item * Harvested record */ - // todo this is in harvester? + def writeOut(unixEpoch: Long, item: ParsedResult): Unit = { val avroWriter = getAvroWriter val genericRecord = new GenericData.Record(Harvester.schema) @@ -76,3 +60,62 @@ abstract class FileHarvester( def flush(): Unit = getAvroWriter.flush() } + +/** Case class to hold the results of a file + * + * @param entryName + * Path of the entry in the file + * @param data + * Holds the data for the entry, or None if it's a directory. + * @param bufferedData + * Holds a buffered reader for the entry if it's too large to be held in + * memory. + */ +case class FileResult( + entryName: String, + data: Option[Array[Byte]], + bufferedData: Option[BufferedReader] = None + ) + +/** Case class hold the parsed value from a given FileResult + */ + +case class ParsedResult(id: String, item: String) + +object FileHarvester { + def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = + Option(zipInputStream.getNextEntry) match { + case None => + LazyList.empty + case Some(entry) => + val result = + if (entry.isDirectory || entry.getName.contains("._")) + None + else + Some(new BufferedReader(new InputStreamReader(zipInputStream))) + FileResult(entry.getName, None, result) #:: iter(zipInputStream) + } + + def iter(tarInputStream: TarInputStream): LazyList[FileResult] = + Option(tarInputStream.getNextEntry) match { + case None => + LazyList.empty + + case Some(entry) => + val filename = Try { + entry.getName + }.getOrElse("") + + val result = + if ( + entry.isDirectory || filename.contains("._") + ) // drop OSX hidden files + None + else if (filename.endsWith(".xml")) // only read xml files + Some(IOUtils.toByteArray(tarInputStream, entry.getSize)) + else + None + + FileResult(entry.getName, result) #:: iter(tarInputStream) + } +} diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/FlFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/FlFileHarvester.scala index 0199afc5d..9da08ff37 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/FlFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/FlFileHarvester.scala @@ -111,27 +111,6 @@ class FlFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(new BufferedReader(new InputStreamReader(zipInputStream))) - FileResult(entry.getName, None, result) #:: iter(zipInputStream) - } - /** Executes the Florida harvest */ override def localHarvest(): DataFrame = { @@ -146,7 +125,7 @@ class FlFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => { + FileHarvester.iter(inputStream).foreach(result => { handleFile(result, unixEpoch) match { case Failure(exception) => LogManager.getLogger(this.getClass).error(s"Caught exception on $inFile.", exception) diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/HathiFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/HathiFileHarvester.scala index 31ba80a2a..a2d4d8ba8 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/HathiFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/HathiFileHarvester.scala @@ -80,37 +80,6 @@ class HathiFileHarvester( } } - /** Implements a stream of files from the tar. Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param tarInputStream - * @return - * Lazy stream of tar records - */ - def iter(tarInputStream: TarInputStream): LazyList[FileResult] = - Option(tarInputStream.getNextEntry) match { - case None => - LazyList.empty - - case Some(entry) => - val filename = Try { - entry.getName - }.getOrElse("") - - val result = - if ( - entry.isDirectory || filename.contains("._") - ) // drop OSX hidden files - None - else if (filename.endsWith(".xml")) // only read xml files - Some(IOUtils.toByteArray(tarInputStream, entry.getSize)) - else - None - - FileResult(entry.getName, result) #:: iter(tarInputStream) - } - /** Executes the harvest */ override def localHarvest(): DataFrame = { @@ -129,7 +98,7 @@ class HathiFileHarvester( ) ) - iter(inputStream).foreach(tarResult => { + FileHarvester.iter(inputStream).foreach(tarResult => { handleFile(tarResult, unixEpoch) match { case Failure(exception) => logger diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/HeartlandFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/HeartlandFileHarvester.scala index 510b30ea3..c4f8b144b 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/HeartlandFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/HeartlandFileHarvester.scala @@ -110,26 +110,6 @@ class HeartlandFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(new BufferedReader(new InputStreamReader(zipInputStream))) - FileResult(entry.getName, None, result) #:: iter(zipInputStream) - } /** Executes the Heartland (Missouri + Iowa) harvest */ @@ -145,7 +125,7 @@ class HeartlandFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => + FileHarvester.iter(inputStream).foreach(result => handleFile(result, unixEpoch) match { case Failure(exception) => LogManager diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/NYPLFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/NYPLFileHarvester.scala index 356cad9d9..b32a9d5e5 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/NYPLFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/NYPLFileHarvester.scala @@ -120,27 +120,6 @@ class NYPLFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(new BufferedReader(new InputStreamReader(zipInputStream))) - FileResult(entry.getName, None, result) #:: iter(zipInputStream) - } - /** Executes the NYPL harvest */ override def localHarvest(): DataFrame = { @@ -155,7 +134,7 @@ class NYPLFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach( result => + FileHarvester.iter(inputStream).foreach( result => handleFile(result, unixEpoch) match { case Failure(exception) => LogManager.getLogger(this.getClass).error(s"Caught exception on $inFile.", exception) diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/NaraDeltaHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/NaraDeltaHarvester.scala index 6899539e1..542c5864a 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/NaraDeltaHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/NaraDeltaHarvester.scala @@ -1,6 +1,6 @@ package dpla.ingestion3.harvesters.file -import java.io.{BufferedReader, File, FileInputStream} +import java.io.{File, FileInputStream} import java.util.zip.GZIPInputStream import dpla.ingestion3.confs.i3Conf import dpla.ingestion3.harvesters.file.FileFilters.{avroFilter, gzFilter} @@ -30,22 +30,6 @@ class NaraDeltaHarvester( */ case class ParsedResult(id: String, item: String) - /** Case class to hold the results of a file - * - * @param entryName - * Path of the entry in the file - * @param data - * Holds the data for the entry, or None if it's a directory. - * @param bufferedData - * Holds a buffered reader for the entry if it's too large to be held in - * memory. - */ - case class FileResult( - entryName: String, - data: Option[Array[Byte]], - bufferedData: Option[BufferedReader] = None - ) - lazy val naraSchema: Schema = new Schema.Parser() .parse(new FlatFileIO().readFileAsString("/avro/OriginalRecord.avsc")) @@ -166,36 +150,6 @@ class NaraDeltaHarvester( } - /** Implements a stream of files from the tar. Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param tarInputStream - * @return - * Lazy stream of tar records - */ - def iter(tarInputStream: TarInputStream): LazyList[FileResult] = - Option(tarInputStream.getNextEntry) match { - case None => - LazyList.empty - - case Some(entry) => - val filename = Try { - entry.getName - }.getOrElse("") - - val result = - if ( - entry.isDirectory || filename.contains("._") - ) // drop OSX hidden files - None - else if (filename.endsWith(".xml")) // only read xml files - Some(IOUtils.toByteArray(tarInputStream, entry.getSize)) - else - None - - FileResult(entry.getName, result) #:: iter(tarInputStream) - } /** Executes the nara harvest */ @@ -256,7 +210,7 @@ class NaraDeltaHarvester( ) ) - val recordCount = iter(inputStream).map(tarResult => + val recordCount = FileHarvester.iter(inputStream).map(tarResult => handleFile(tarResult, unixEpoch, file.getName) match { case Failure(exception) => val logger = LogManager.getLogger(this.getClass) diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/NaraFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/NaraFileHarvester.scala index 08154127b..6d9a73eae 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/NaraFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/NaraFileHarvester.scala @@ -28,26 +28,6 @@ class NaraFileHarvester( conf: i3Conf ) extends LocalHarvester(spark, shortName, conf) { - /** Case class hold the parsed value from a given FileResult - */ - case class ParsedResult(id: String, item: String) - - /** Case class to hold the results of a file - * - * @param entryName - * Path of the entry in the file - * @param data - * Holds the data for the entry, or None if it's a directory. - * @param bufferedData - * Holds a buffered reader for the entry if it's too large to be held in - * memory. - */ - case class FileResult( - entryName: String, - data: Option[Array[Byte]], - bufferedData: Option[BufferedReader] = None - ) - lazy val naraSchema: Schema = new Schema.Parser() .parse(new FlatFileIO().readFileAsString("/avro/NaraOriginalRecord.avsc")) @@ -214,9 +194,9 @@ class NaraFileHarvester( logger.info(s"Writing harvest tmp output to $naraTmp") - // FIXME This assumes files on local file system and not on S3. Files should be able to read off of S3. + val inFile = new File(conf.harvest.endpoint.getOrElse("in")) - // FIXME Deletes are tracked in files alongside harvest data, this should be done in a more sustainable way + val deletes = new File(inFile, "/deletes/") if (inFile.isDirectory) @@ -310,19 +290,16 @@ class NaraFileHarvester( ) ) - val recordCount = iter(inputStream).map(tarResult => + FileHarvester.iter(inputStream).foreach(tarResult => handleFile(tarResult, unixEpoch, file.getName) match { case Failure(exception) => val logger = LogManager.getLogger(this.getClass) logger .error(s"Caught exception on ${tarResult.entryName}.", exception) - 0 - case Success(count) => - count - }).sum - val logger = LogManager.getLogger(this.getClass) - logger.info(s"Harvested $recordCount records from ${file.getName}") + case Success(count) => () + } + ) IOUtils.closeQuietly(inputStream) } diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/NorthwestHeritageFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/NorthwestHeritageFileHarvester.scala index 78a071070..01ccb5708 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/NorthwestHeritageFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/NorthwestHeritageFileHarvester.scala @@ -38,9 +38,6 @@ class NorthwestHeritageFileHarvester( * @return * ZipInputstream of the zip contents * - * TODO: Because we're only handling zips in this class, and they should - * already be filtered by the FilenameFilter, I wonder if we even need the - * match statement here. */ def getInputStream(file: File): Option[ZipInputStream] = { file.getName match { @@ -150,17 +147,17 @@ class NorthwestHeritageFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - val recordCount = iter(inputStream).map(result => + FileHarvester.iter(inputStream).foreach(result => handleFile(result, unixEpoch) match { case Failure(exception) => LogManager .getLogger(this.getClass) .error(s"Caught exception on $inFile.", exception) - 0 - case Success(count) => - count + + case Success(count) => () + } - ).sum + ) IOUtils.closeQuietly(inputStream) }) diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/OaiFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/OaiFileHarvester.scala index 35f20f5dd..dde810a24 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/OaiFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/OaiFileHarvester.scala @@ -38,9 +38,6 @@ class OaiFileHarvester( * @return * ZipInputstream of the zip contents * - * TODO: Because we're only handling zips in this class, and they should - * already be filtered by the FilenameFilter, I wonder if we even need the - * match statement here. */ def getInputStream(file: File): Option[ZipInputStream] = file.getName match { @@ -150,7 +147,7 @@ class OaiFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => + FileHarvester.iter(inputStream).foreach(result => handleFile(result, unixEpoch) match { case Failure(exception) => logger.error(s"Caught exception on $inFile.", exception) diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/VaFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/VaFileHarvester.scala index df6a15b04..4fa372359 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/VaFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/VaFileHarvester.scala @@ -37,9 +37,6 @@ class VaFileHarvester( * @return * ZipInputstream of the zip contents * - * TODO: Because we're only handling zips in this class, and they should - * already be filtered by the FilenameFilter, I wonder if we even need the - * match statement here. */ def getInputStream(file: File): Option[ZipInputStream] = file.getName match { @@ -76,28 +73,6 @@ class VaFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory || !entry.getName.endsWith(".xml")) - None - else { - Some(IOUtils.toByteArray(zipInputStream, entry.getSize)) - } - FileResult(entry.getName, result) #:: iter(zipInputStream) - } - /** Executes the Digital Virginias harvest */ override def localHarvest(): DataFrame = { @@ -112,7 +87,7 @@ class VaFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => { + FileHarvester.iter(inputStream).foreach(result => { handleFile(result, unixEpoch) match { case Failure(exception) => LogManager diff --git a/src/main/scala/dpla/ingestion3/harvesters/file/VtFileHarvester.scala b/src/main/scala/dpla/ingestion3/harvesters/file/VtFileHarvester.scala index 3a54622bc..b1400b87b 100644 --- a/src/main/scala/dpla/ingestion3/harvesters/file/VtFileHarvester.scala +++ b/src/main/scala/dpla/ingestion3/harvesters/file/VtFileHarvester.scala @@ -34,9 +34,6 @@ class VtFileHarvester( * @return * ZipInputstream of the zip contents * - * TODO: Because we're only handling zips in this class, and they should - * already be filtered by the FilenameFilter, I wonder if we even need the - * match statement here. */ def getInputStream(file: File): Option[ZipInputStream] = file.getName match { @@ -73,27 +70,6 @@ class VtFileHarvester( } } - /** Implements a stream of files from the zip Can't use @tailrec here because - * the compiler can't recognize it as tail recursive, but this won't blow the - * stack. - * - * @param zipInputStream - * @return - * Lazy stream of zip records - */ - def iter(zipInputStream: ZipInputStream): LazyList[FileResult] = - Option(zipInputStream.getNextEntry) match { - case None => - LazyList.empty - case Some(entry) => - val result = - if (entry.isDirectory) - None - else - Some(IOUtils.toByteArray(zipInputStream, entry.getSize)) - FileResult(entry.getName, result) #:: iter(zipInputStream) - } - /** Executes the Vermont harvest */ override def localHarvest(): DataFrame = { @@ -108,7 +84,7 @@ class VtFileHarvester( .getOrElse( throw new IllegalArgumentException("Couldn't load ZIP files.") ) - iter(inputStream).foreach(result => + FileHarvester.iter(inputStream).foreach(result => handleFile(result, unixEpoch) match { case Failure(exception) => LogManager