Skip to content

Commit

Permalink
add skip last n lines
Browse files Browse the repository at this point in the history
  • Loading branch information
ckeys committed Sep 2, 2022
1 parent f1f6824 commit c0f25a7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,48 @@ object HDFSOperatorV2 {
}
}

def saveWithoutLastNLines(inPath: String, skipLastNLines: Int): String = {
val fs = FileSystem.get(hadoopConfiguration)
val src: Path = new Path(inPath)
var line: String = null
var dos: FSDataOutputStream = null
var br1: BufferedReader = null
var br2: BufferedReader = null
val pathElements = inPath.split(PathFun.pathSeparator)
val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipLastNLines_%s_%s", String.valueOf(skipLastNLines), pathElements(pathElements.length - 1))
val outPath = writePathParts.mkString(PathFun.pathSeparator)

try {
dos = fs.create(new Path(new java.io.File(outPath).getPath), true)
br1 = new BufferedReader(new InputStreamReader(fs.open(src)))
var totalLinesCount = 0
while (br1.readLine() != null) totalLinesCount += 1

br2 = new BufferedReader(new InputStreamReader(fs.open(src)))
line = br2.readLine()
var count = 1

while (line != null) {
if (count <= totalLinesCount - skipLastNLines) {
dos.writeBytes(line + "\n")
}
count += 1
line = br2.readLine()
}
} finally {
if (br1 != null) br1.close()
if (null != dos) {
try {
dos.close()
} catch {
case ex: Exception => throw ex
}
dos.close()
}
}
outPath
}

def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int): String = {
val fs = FileSystem.get(hadoopConfiguration)
val src: Path = new Path(inPath)
Expand All @@ -280,7 +322,6 @@ object HDFSOperatorV2 {
dos = fs.create(new Path(new java.io.File(outPath).getPath), true)
br = new BufferedReader(new InputStreamReader(fs.open(src)))
line = br.readLine()

var count = 1
while (line != null) {
if (count > skipFirstNLines) {
Expand All @@ -295,8 +336,7 @@ object HDFSOperatorV2 {
try {
dos.close()
} catch {
case ex: Exception =>
println("close exception")
case ex: Exception => throw ex
}
dos.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar

def handleDFWithOption(originPath: String, config: DataSourceConfig): String = {
val skipFirstNLines = config.config.get("skipFirstNLines")
skipFirstNLines match {
val skipLastNLines = config.config.get("skipLastNLines")
val pathAfterSkipTopLines = skipFirstNLines match {
case None => originPath
case Some(_) => {
var numsToSkip = skipFirstNLines.get.toInt
Expand All @@ -25,6 +26,16 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar
newPath
}
}
val res = skipLastNLines match {
case None=> pathAfterSkipTopLines
case Some(_) => {
var numsToSkip = skipLastNLines.get.toInt
val path = pathAfterSkipTopLines
val newPath = HDFSOperatorV2.saveWithoutLastNLines(path, numsToSkip)
newPath
}
}
res
}

def deleteTmpFiles(config: DataSourceConfig, newPath: String): Unit = {
Expand Down

0 comments on commit c0f25a7

Please sign in to comment.