Skip to content

Commit

Permalink
Speedrunning duplicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
mdellabitta committed Jan 2, 2025
1 parent c133be6 commit 7f024b3
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 343 deletions.
25 changes: 25 additions & 0 deletions src/main/scala/dpla/ingestion3/harvesters/Harvester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dpla.ingestion3.harvesters

import java.io.File
import dpla.ingestion3.confs.i3Conf
import dpla.ingestion3.harvesters.file.ParsedResult
import dpla.ingestion3.utils.{FlatFileIO, Utils}
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
Expand All @@ -10,6 +11,7 @@ import org.apache.commons.io.FileUtils
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.xml._

abstract class Harvester(
spark: SparkSession,
Expand Down Expand Up @@ -72,6 +74,17 @@ abstract class LocalHarvester(

def getAvroWriter: DataFileWriter[GenericRecord] = avroWriter

def writeOut(unixEpoch: Long, item: ParsedResult): Unit = {
val avroWriter = getAvroWriter
val genericRecord = new GenericData.Record(Harvester.schema)
genericRecord.put("id", item.id)
genericRecord.put("ingestDate", unixEpoch)
genericRecord.put("provider", shortName)
genericRecord.put("document", item.item)
genericRecord.put("mimetype", mimeType)
avroWriter.append(genericRecord)
}

override def cleanUp(): Unit = {
avroWriter.flush()
avroWriter.close()
Expand All @@ -82,6 +95,18 @@ abstract class LocalHarvester(

object Harvester {


/** Converts a Node to an xml string
*
* @param node
* The root of the tree to write to a string
* @return
* a String containing xml
*/
def xmlToString(node: Node): String =
Utility.serialize(node, minimizeTags = MinimizeMode.Always).toString


// Schema for harvested records.
val schema: Schema =
new Schema.Parser()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ class CommunityWebsHarvester(

protected val extractor = new FlFileExtractor()

/** Loads .zip files
*
* @param file
* File to parse
* @return
* ZipInputstream of the zip contents
*/
def getInputStream(file: File): Option[ZipInputStream] =
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}

/** Parses JValue to extract item local item id and renders compact full
* record
*
Expand Down Expand Up @@ -114,7 +100,7 @@ class CommunityWebsHarvester(
inFiles
.listFiles(zipFilter)
.foreach(inFile => {
val inputStream: ZipInputStream = getInputStream(inFile)
val inputStream: ZipInputStream = FileHarvester.getZipInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,6 @@ class DlgFileHarvester(

protected val extractor = new DlgFileExtractor()

/** Loads .zip files
*
* @param file
* File to parse
* @return
* ZipInputstream of the zip contents
*/
def getInputStream(file: File): Option[ZipInputStream] =
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}

/** Parses JValue to extract item local item id and renders compact full
* record
Expand Down Expand Up @@ -118,7 +105,7 @@ class DlgFileHarvester(
inFiles
.listFiles(zipFilter)
.foreach(inFile => {
val inputStream: ZipInputStream = getInputStream(inFile)
val inputStream: ZipInputStream = FileHarvester.getZipInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,6 @@ class DplaJsonlFileHarvester(

protected val extractor = new DplaJsonlFileExtractor()

/** Loads .zip files containing DPLA JSONL
*
* @param file
* File to parse
* @return
* FileInputStream of the file contents
*/
def getInputStream(file: File): Option[ZipInputStream] = {
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}
}

/** Parses JValue to extract item local item id and renders compact full
* record
*
Expand Down Expand Up @@ -122,7 +107,7 @@ class DplaJsonlFileHarvester(
inFiles
.listFiles(zipFilter)
.foreach(inFile => {
val inputStream: ZipInputStream = getInputStream(inFile)
val inputStream: ZipInputStream = FileHarvester.getZipInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
Expand Down
43 changes: 31 additions & 12 deletions src/main/scala/dpla/ingestion3/harvesters/file/FileHarvester.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dpla.ingestion3.harvesters.file

import java.io.{BufferedReader, InputStreamReader}
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
import dpla.ingestion3.confs.i3Conf
import dpla.ingestion3.harvesters.{Harvester, LocalHarvester}
import org.apache.avro.generic.GenericData
Expand All @@ -9,8 +9,9 @@ import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.tools.tar.TarInputStream

import java.util.zip.ZipInputStream
import java.util.zip.{GZIPInputStream, ZipInputStream}
import scala.util.Try
import scala.xml._

/** File based harvester
*
Expand Down Expand Up @@ -46,19 +47,10 @@ abstract class FileHarvester(
* Harvested record
*/

def writeOut(unixEpoch: Long, item: ParsedResult): Unit = {
val avroWriter = getAvroWriter
val genericRecord = new GenericData.Record(Harvester.schema)
genericRecord.put("id", item.id)
genericRecord.put("ingestDate", unixEpoch)
genericRecord.put("provider", shortName)
genericRecord.put("document", item.item)
genericRecord.put("mimetype", mimeType)
avroWriter.append(genericRecord)
}

def flush(): Unit = getAvroWriter.flush()


}

/** Case class to hold the results of a file
Expand All @@ -83,6 +75,33 @@ case class FileResult(
case class ParsedResult(id: String, item: String)

object FileHarvester {

def getZipInputStream(file: File): Option[ZipInputStream] =
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}

/** Loads .tar.gz files
*
* @param file
* File to parse
* @return
* Option[TarInputStream] of the zip contents
*/
def getTarInputStream(file: File): Option[TarInputStream] = {
file.getName match {
case zipName if zipName.endsWith("gz") =>
Some(new TarInputStream(new GZIPInputStream(new FileInputStream(file))))
case zipName if zipName.endsWith("tar") =>
Some(new TarInputStream(new FileInputStream(file)))

case _ => None
}
}


def iter(zipInputStream: ZipInputStream): LazyList[FileResult] =
Option(zipInputStream.getNextEntry) match {
case None =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package dpla.ingestion3.harvesters.file

import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
import java.io.{File, FileInputStream}
import java.util.zip.ZipInputStream
import dpla.ingestion3.confs.i3Conf
import dpla.ingestion3.harvesters.file.FileFilters.zipFilter
import dpla.ingestion3.mappers.utils.JsonExtractor
import dpla.ingestion3.model.AVRO_MIME_JSON
import org.apache.avro.generic.GenericData
import org.apache.commons.io.IOUtils
import org.apache.log4j.Logger

import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand All @@ -33,20 +33,6 @@ class FlFileHarvester(

protected val extractor = new FlFileExtractor()

/** Loads .zip files
*
* @param file
* File to parse
* @return
* ZipInputstream of the zip contents
*/
def getInputStream(file: File): Option[ZipInputStream] = {
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}
}

/** Parses JValue to extract item local item id and renders compact full
* record
Expand Down Expand Up @@ -121,7 +107,7 @@ class FlFileHarvester(
inFiles
.listFiles(zipFilter)
.foreach(inFile => {
val inputStream: ZipInputStream = getInputStream(inFile)
val inputStream: ZipInputStream = FileHarvester.getZipInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package dpla.ingestion3.harvesters.file

import java.io.{File, FileInputStream}
import java.util.zip.GZIPInputStream
import java.io.File
import dpla.ingestion3.confs.i3Conf
import dpla.ingestion3.harvesters.Harvester
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.tools.tar.TarInputStream
import dpla.ingestion3.harvesters.file.FileFilters.gzFilter
import dpla.ingestion3.mappers.utils.XmlExtractor
import dpla.ingestion3.model.AVRO_MIME_XML
Expand All @@ -26,24 +25,6 @@ class HathiFileHarvester(

def mimeType: GenericData.EnumSymbol = AVRO_MIME_XML

/** Loads .tar.gz files
*
* @param file
* File to parse
* @return
* Option[TarInputStream] of the zip contents
*/
def getInputStream(file: File): Option[TarInputStream] = {
file.getName match {
case zipName if zipName.endsWith("gz") =>
Some(new TarInputStream(new GZIPInputStream(new FileInputStream(file))))
case zipName if zipName.endsWith("tar") =>
Some(new TarInputStream(new FileInputStream(file)))

case _ => None
}
}

/** Takes care of parsing an xml file into a list of Nodes each representing
* an item
*
Expand All @@ -66,7 +47,7 @@ class HathiFileHarvester(
.headOption
.flatten

val outputXML = xmlToString(record)
val outputXML = Harvester.xmlToString(record)

id match {
case None =>
Expand All @@ -91,7 +72,7 @@ class HathiFileHarvester(
.listFiles(gzFilter)
.foreach(inFile => {

val inputStream = getInputStream(inFile)
val inputStream = FileHarvester.getTarInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException(
s"Couldn't load file, ${inFile.getAbsolutePath}"
Expand Down Expand Up @@ -120,16 +101,6 @@ class HathiFileHarvester(
spark.read.format("avro").load(tmpOutStr)
}

/** Converts a Node to an xml string
*
* @param node
* The root of the tree to write to a string
* @return
* a String containing xml
*/
def xmlToString(node: Node): String =
Utility.serialize(node, minimizeTags = MinimizeMode.Always).toString

/** Main logic for handling individual entries in the tar.
*
* @param tarResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,6 @@ class HeartlandFileHarvester(

protected val extractor = new HeartlandFileExtractor()

/** Loads .zip files
*
* @param file
* File to parse
* @return
* ZipInputstream of the zip contents
*/
def getInputStream(file: File): Option[ZipInputStream] = {
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}
}

/** Parses JValue to extract item local item id and renders compact full
* record
Expand Down Expand Up @@ -121,7 +107,7 @@ class HeartlandFileHarvester(
inFiles
.listFiles(zipFilter)
.foreach(inFile => {
val inputStream: ZipInputStream = getInputStream(inFile)
val inputStream: ZipInputStream = FileHarvester.getZipInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,6 @@ class NYPLFileHarvester(

protected val extractor = new FlFileExtractor()

/** Loads .zip files
*
* @param file
* File to parse
* @return
* ZipInputstream of the zip contents
*/
def getInputStream(file: File): Option[ZipInputStream] = {
file.getName match {
case zipName if zipName.endsWith("zip") =>
Some(new ZipInputStream(new FileInputStream(file)))
case _ => None
}
}

/** @param json
* Full JSON item record
Expand Down Expand Up @@ -130,7 +116,7 @@ class NYPLFileHarvester(
inFiles
.listFiles(zipFilter)
.foreach(inFile => {
val inputStream: ZipInputStream = getInputStream(inFile)
val inputStream: ZipInputStream = FileHarvester.getZipInputStream(inFile)
.getOrElse(
throw new IllegalArgumentException("Couldn't load ZIP files.")
)
Expand Down
Loading

0 comments on commit 7f024b3

Please sign in to comment.