Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3411 FSDS - Fix path cache registration order #3249

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.orc.OrcFileSystemWriter
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.PathUtils
Expand Down Expand Up @@ -52,22 +52,23 @@ object FileSystemExporter extends LazyLogging {

class ParquetFileSystemExporter(path: String) extends FileSystemExporter {
override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = {
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
val file = new Path(PathUtils.getUrl(path).toURI)
val conf = new Configuration()
StorageConfiguration.setSft(conf, sft)
try { Class.forName("org.xerial.snappy.Snappy") } catch {
case _: ClassNotFoundException =>
logger.warn("SNAPPY compression is not available on the classpath - falling back to GZIP")
conf.set("parquet.compression", "GZIP")
}
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
new ParquetFileSystemWriter(sft, new Path(PathUtils.getUrl(path).toURI), conf)
new ParquetFileSystemWriter(sft, FileSystemContext(file, conf), file)
}
}

class OrcFileSystemExporter(path: String) extends FileSystemExporter {
override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = {
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
new OrcFileSystemWriter(sft, new Configuration(), new Path(PathUtils.getUrl(path).toURI))
val file = new Path(PathUtils.getUrl(path).toURI)
new OrcFileSystemWriter(sft, FileSystemContext(file, new Configuration()), file)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ abstract class AbstractFileSystemStorage(

def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fs, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object PathCache {
)

/**
* * Register a path as existing
* Register a path as existing
*
* @param fs file system
* @param path path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new OrcFileSystemWriter(metadata.sft, context.conf, file, observer)
new OrcFileSystemWriter(metadata.sft, context, file, observer)

override protected def createReader(
filter: Option[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,29 @@

package org.locationtech.geomesa.fs.storage.orc

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.orc.OrcFile
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.orc.utils.OrcAttributeWriter
import org.locationtech.geomesa.utils.io.CloseQuietly

import scala.util.control.NonFatal

class OrcFileSystemWriter(
sft: SimpleFeatureType,
config: Configuration,
context: FileSystemContext,
file: Path,
observer: FileSystemObserver = NoOpObserver
) extends FileSystemWriter {

private val schema = OrcFileSystemStorage.createTypeDescription(sft)

private val options = OrcFile.writerOptions(config).setSchema(schema)
private val options = OrcFile.writerOptions(context.conf).setSchema(schema)
private val writer = OrcFile.createWriter(file, options)
private val batch = schema.createRowBatch()

Expand All @@ -56,6 +57,7 @@ class OrcFileSystemWriter(
case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e
}
CloseQuietly.raise(Seq(writer, observer))
PathCache.register(context.fs, file)
}

private def flushBatch(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.io.WithClose
Expand All @@ -32,26 +33,22 @@ class OrcFileSystemWriterTest extends Specification {
ScalaSimpleFeature.create(sft, "1", "name1", "1", "2017-01-01T00:00:01.000Z", "LINESTRING (10 1, 5 1)")
)

val config = new Configuration()

"OrcFileSystemWriter" should {
"write and read simple features" in {

withPath { path =>
withTestFile { file =>
WithClose(new OrcFileSystemWriter(sft, config, file)) { writer => features.foreach(writer.write) }
val reader = new OrcFileSystemReader(sft, config, None, None)
val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList }
read mustEqual features
// test out not calling 'hasNext'
var i = 0
WithClose(reader.read(file)) { iter =>
while (i < features.size) {
iter.next() mustEqual features(i)
i += 1
}
iter.next must throwA[NoSuchElementException]
withTestFile { file =>
val fc = FileSystemContext(file.getParent, new Configuration())
WithClose(new OrcFileSystemWriter(sft, fc, file)) { writer => features.foreach(writer.write) }
val reader = new OrcFileSystemReader(sft, fc.conf, None, None)
val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList }
read mustEqual features
// test out not calling 'hasNext'
var i = 0
WithClose(reader.read(file)) { iter =>
while (i < features.size) {
iter.next() mustEqual features(i)
i += 1
}
iter.next must throwA[NoSuchElementException]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.example.data.Group
import org.apache.parquet.filter2.compat.FilterCompat
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
Expand All @@ -24,9 +25,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.File
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.CloseQuietly

import scala.util.control.NonFatal

/**
*
* @param context file system context
Expand All @@ -35,11 +39,8 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = {
val sftConf = new Configuration(context.conf)
StorageConfiguration.setSft(sftConf, metadata.sft)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer)
}
override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new ParquetFileSystemWriter(metadata.sft, context, file, observer)

override protected def createReader(
filter: Option[Filter],
Expand Down Expand Up @@ -72,11 +73,17 @@ object ParquetFileSystemStorage extends LazyLogging {

class ParquetFileSystemWriter(
sft: SimpleFeatureType,
context: FileSystemContext,
file: Path,
conf: Configuration,
observer: FileSystemObserver = NoOpObserver
) extends FileSystemWriter {

private val conf = {
val conf = new Configuration(context.conf)
StorageConfiguration.setSft(conf, sft)
conf
}

private val writer = SimpleFeatureParquetWriter.builder(file, conf).build()

override def write(f: SimpleFeature): Unit = {
Expand All @@ -86,27 +93,35 @@ object ParquetFileSystemStorage extends LazyLogging {
override def flush(): Unit = observer.flush()
override def close(): Unit = {
CloseQuietly(Seq(writer, observer)).foreach(e => throw e)
if (FileValidationEnabled.get.toBoolean) {
PathCache.register(context.fs, file)
if (FileValidationEnabled.toBoolean.get) {
validateParquetFile(file)
}
}
}

/**
* Validate a file by reading it back
*
* @param file file to validate
*/
def validateParquetFile(file: Path): Unit = {
val reader = ParquetReader.builder(new GroupReadSupport(), file).build()

var reader: ParquetReader[Group] = null
try {
// Read Parquet file content
// read Parquet file content
reader = ParquetReader.builder(new GroupReadSupport(), file).build()
var record = reader.read()
while (record != null) {
// Process the record
record = reader.read()
}
logger.debug(s"${file} is a valid Parquet file")
logger.trace(s"$file is a valid Parquet file")
} catch {
case e: Exception => throw new RuntimeException(s"Unable to validate ${file}: File may be corrupted", e)
case NonFatal(e) => throw new RuntimeException(s"Unable to validate $file: File may be corrupted", e)
} finally {
reader.close()
if (reader != null) {
reader.close()
}
}
}
}
Loading