@@ -44,7 +44,7 @@ case class PartitionedFile(
4444 filePath : String ,
4545 start : Long ,
4646 length : Long ,
47- locations : Array [String ] = Array .empty) {
47+ @ transient locations : Array [String ] = Array .empty) {
4848 override def toString : String = {
4949 s " path: $filePath, range: $start- ${start + length}, partition values: $partitionValues"
5050 }
@@ -121,6 +121,20 @@ class FileScanRDD(
121121 nextElement
122122 }
123123
124+ private def readCurrentFile (): Iterator [InternalRow ] = {
125+ try {
126+ readFunction(currentFile)
127+ } catch {
128+ case e : java.io.FileNotFoundException =>
129+ throw new java.io.FileNotFoundException (
130+ e.getMessage + " \n " +
131+ " It is possible the underlying files have been updated. " +
132+ " You can explicitly invalidate the cache in Spark by " +
133+ " running 'REFRESH TABLE tableName' command in SQL or " +
134+ " by recreating the Dataset/DataFrame involved." )
135+ }
136+ }
137+
124138 /** Advances to the next file. Returns true if a new non-empty iterator is available. */
125139 private def nextIterator (): Boolean = {
126140 updateBytesReadWithFileSize()
@@ -130,54 +144,35 @@ class FileScanRDD(
130144 // Sets InputFileBlockHolder for the file block's information
131145 InputFileBlockHolder .set(currentFile.filePath, currentFile.start, currentFile.length)
132146
133- try {
134- if (ignoreCorruptFiles) {
135- currentIterator = new NextIterator [Object ] {
136- private val internalIter = {
137- try {
138- // The readFunction may read files before consuming the iterator.
139- // E.g., vectorized Parquet reader.
140- readFunction(currentFile)
141- } catch {
142- case e @ (_ : RuntimeException | _ : IOException ) =>
143- logWarning(s " Skipped the rest content in the corrupted file: $currentFile" , e)
144- Iterator .empty
145- }
146- }
147-
148- override def getNext (): AnyRef = {
149- try {
150- if (internalIter.hasNext) {
151- internalIter.next()
152- } else {
153- finished = true
154- null
155- }
156- } catch {
157- case e : IOException =>
158- logWarning(s " Skipped the rest content in the corrupted file: $currentFile" , e)
159- finished = true
160- null
147+ if (ignoreCorruptFiles) {
148+ currentIterator = new NextIterator [Object ] {
149+ // The readFunction may read some bytes before consuming the iterator, e.g.,
150+ // vectorized Parquet reader. Here we use lazy val to delay the creation of
151+ // iterator so that we will throw exception in `getNext`.
152+ private lazy val internalIter = readCurrentFile()
153+
154+ override def getNext (): AnyRef = {
155+ try {
156+ if (internalIter.hasNext) {
157+ internalIter.next()
158+ } else {
159+ finished = true
160+ null
161161 }
162+ } catch {
163+ // Throw FileNotFoundException even `ignoreCorruptFiles` is true
164+ case e : java.io.FileNotFoundException => throw e
165+ case e @ (_ : RuntimeException | _ : IOException ) =>
166+ logWarning(s " Skipped the rest content in the corrupted file: $currentFile" , e)
167+ finished = true
168+ null
162169 }
163-
164- override def close (): Unit = {}
165170 }
166- } else {
167- currentIterator = readFunction(currentFile)
171+
172+ override def close () : Unit = {}
168173 }
169- } catch {
170- case e : IOException if ignoreCorruptFiles =>
171- logWarning(s " Skipped the rest content in the corrupted file: $currentFile" , e)
172- currentIterator = Iterator .empty
173- case e : java.io.FileNotFoundException =>
174- throw new java.io.FileNotFoundException (
175- e.getMessage + " \n " +
176- " It is possible the underlying files have been updated. " +
177- " You can explicitly invalidate the cache in Spark by " +
178- " running 'REFRESH TABLE tableName' command in SQL or " +
179- " by recreating the Dataset/DataFrame involved."
180- )
174+ } else {
175+ currentIterator = readCurrentFile()
181176 }
182177
183178 hasNext
0 commit comments