diff --git a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala index 847da81c1..1c2eac3f8 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala @@ -266,6 +266,48 @@ object HDFSOperatorV2 { } } + def saveWithoutLastNLines(inPath: String, skipLastNLines: Int): String = { + val fs = FileSystem.get(hadoopConfiguration) + val src: Path = new Path(inPath) + var line: String = null + var dos: FSDataOutputStream = null + var br1: BufferedReader = null + var br2: BufferedReader = null + val pathElements = inPath.split(PathFun.pathSeparator) + val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipLastNLines_%s_%s", String.valueOf(skipLastNLines), pathElements(pathElements.length - 1)) + val outPath = writePathParts.mkString(PathFun.pathSeparator) + + try { + dos = fs.create(new Path(new java.io.File(outPath).getPath), true) + br1 = new BufferedReader(new InputStreamReader(fs.open(src))) + var totalLinesCount = 0 + while (br1.readLine() != null) totalLinesCount += 1 + + br2 = new BufferedReader(new InputStreamReader(fs.open(src))) + line = br2.readLine() + var count = 1 + + while (line != null) { + if (count <= totalLinesCount - skipLastNLines) { + dos.writeBytes(line + "\n") + } + count += 1 + line = br2.readLine() + } + } finally { + if (br1 != null) br1.close() + if (null != dos) { + try { + dos.close() + } catch { + case ex: Exception => throw ex + } + dos.close() + } + } + outPath + } + def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int): String = { val fs = FileSystem.get(hadoopConfiguration) val src: Path = new Path(inPath) @@ -280,7 +322,6 @@ object HDFSOperatorV2 { dos = fs.create(new Path(new java.io.File(outPath).getPath), true) br = new BufferedReader(new InputStreamReader(fs.open(src))) line = br.readLine() - var count = 1 while (line != null) { if (count > skipFirstNLines) { @@ -295,8 +336,7 @@ object HDFSOperatorV2 { try { dos.close() } catch { - case ex: Exception => - println("close exception") + case ex: Exception => throw ex } dos.close() } diff --git a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala index 4d9d726c0..14603fc00 100644 --- a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala +++ b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala @@ -16,7 +16,8 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar def handleDFWithOption(originPath: String, config: DataSourceConfig): String = { val skipFirstNLines = config.config.get("skipFirstNLines") - skipFirstNLines match { + val skipLastNLines = config.config.get("skipLastNLines") + val pathAfterSkipTopLines = skipFirstNLines match { case None => originPath case Some(_) => { var numsToSkip = skipFirstNLines.get.toInt @@ -25,6 +26,16 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar newPath } } + val res = skipLastNLines match { + case None=> pathAfterSkipTopLines + case Some(_) => { + var numsToSkip = skipLastNLines.get.toInt + val path = pathAfterSkipTopLines + val newPath = HDFSOperatorV2.saveWithoutLastNLines(path, numsToSkip) + newPath + } + } + res } def deleteTmpFiles(config: DataSourceConfig, newPath: String): Unit = {