diff --git a/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh b/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh
index 2a926602d3f9..c3a9c6a9a091 100755
--- a/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh
+++ b/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh
@@ -17,8 +17,6 @@ zookeeper_install_version="%%zookeeper.version.recommended%%"
# required for hadoop - make sure it corresponds to the hadoop installed version
guava_install_version="%%accumulo.guava.version%%"
-function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }
-
# gets the dependencies for this module
# args:
# $1 - current classpath
diff --git a/geomesa-fs/geomesa-fs-dist/pom.xml b/geomesa-fs/geomesa-fs-dist/pom.xml
index 5093d6cc6cc6..01d041a7c6bd 100644
--- a/geomesa-fs/geomesa-fs-dist/pom.xml
+++ b/geomesa-fs/geomesa-fs-dist/pom.xml
@@ -68,6 +68,15 @@
org.locationtech.geomesa
geomesa-fs-datastore_${scala.binary.version}
+
+ io.netty
+ netty-all
+
+
+ io.netty
+ netty-transport-native-epoll
+ linux-x86_64
+
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml
index b5ada8ccc5f0..45453837dfb2 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml
@@ -65,11 +65,32 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.apache.hadoop
+ hadoop-aws
+
+
com.amazonaws
aws-java-sdk-s3
provided
+
+
+ software.amazon.awssdk
+ s3
+ provided
+
+
+ software.amazon.awssdk
+ s3-transfer-manager
+ provided
+
+
+ software.amazon.awssdk.crt
+ aws-crt
+ provided
+
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/AbstractS3VisibilityObserver.scala
similarity index 57%
rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserver.scala
rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/AbstractS3VisibilityObserver.scala
index 5c0f399679db..ed721be13633 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserver.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/AbstractS3VisibilityObserver.scala
@@ -8,26 +8,34 @@
package org.locationtech.geomesa.fs.storage.common.s3
-import com.amazonaws.services.s3.AmazonS3
import org.apache.accumulo.access.AccessExpression
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.SimpleFeature
+import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.security.SecurityUtils
+import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.Base64
-/**
- * Creates a tag containing the base64 encoded summary visibility for the observed file
- *
- * @param s3 s3 client
- * @param path file path
- * @param tag tag name to use
- */
-class S3VisibilityObserver(s3: AmazonS3, path: Path, tag: String) extends S3ObjectTagObserver(s3, path) {
+abstract class AbstractS3VisibilityObserver(path: Path) extends FileSystemObserver {
private val visibilities = scala.collection.mutable.Set.empty[String]
+ private val (bucket, key) = {
+ val uri = path.toUri
+ val uriPath = uri.getPath
+ val key = if (uriPath.startsWith("/")) { uriPath.substring(1) } else { uriPath }
+ (uri.getHost, key)
+ }
+
+ override def flush(): Unit = {}
+
+ override def close(): Unit = {
+ try { makeTagRequest(bucket, key) } catch {
+ case e: Exception => throw new IOException("Error tagging object", e)
+ }
+ }
override def write(feature: SimpleFeature): Unit = {
val vis = SecurityUtils.getVisibility(feature)
if (vis != null) {
@@ -35,12 +43,14 @@ class S3VisibilityObserver(s3: AmazonS3, path: Path, tag: String) extends S3Obje
}
}
- override protected def tags(): Iterable[(String, String)] = {
- if (visibilities.isEmpty) { Seq.empty } else {
+ private def makeTagRequest(bucket: String, key: String): Unit = {
+ if (visibilities.nonEmpty) {
val vis = visibilities.mkString("(", ")&(", ")")
// this call simplifies and de-duplicates the expression
val expression = AccessExpression.of(vis, /*normalize = */true).getExpression
- Seq(tag -> Base64.getEncoder.encodeToString(expression.getBytes(StandardCharsets.UTF_8)))
+ makeTagRequest(bucket: String, key: String, Base64.getEncoder.encodeToString(expression.getBytes(StandardCharsets.UTF_8)))
}
}
+
+ protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala
deleted file mode 100644
index 65cbabe55b5c..000000000000
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************
- * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Apache License, Version 2.0
- * which accompanies this distribution and is available at
- * http://www.opensource.org/licenses/apache2.0.php.
- ***********************************************************************/
-
-package org.locationtech.geomesa.fs.storage.common.s3
-
-import com.amazonaws.services.s3.AmazonS3
-import com.amazonaws.services.s3.model.{ObjectTagging, SetObjectTaggingRequest, Tag}
-import org.apache.hadoop.fs.Path
-import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
-
-import java.io.IOException
-
-/**
- * Abstract baseclass for writing s3 object tags
- *
- * @param s3 s3 client
- * @param path file path
- */
-abstract class S3ObjectTagObserver(s3: AmazonS3, path: Path) extends FileSystemObserver {
-
- private val (bucket, key) = {
- val uri = path.toUri
- val uriPath = uri.getPath
- val key = if (uriPath.startsWith("/")) { uriPath.substring(1) } else { uriPath }
- (uri.getHost, key)
- }
-
- /**
- * Return the tags to set on this file
- *
- * @return
- */
- protected def tags(): Iterable[(String, String)]
-
- override def flush(): Unit = {}
-
- override def close(): Unit = {
- val iter = tags()
- if (iter.nonEmpty) {
- try {
- val list = new java.util.ArrayList[Tag]()
- iter.foreach { case (k, v) => list.add(new Tag(k, v)) }
- s3.setObjectTagging(new SetObjectTaggingRequest(bucket, key, new ObjectTagging(list)))
- } catch {
- case e: Exception => throw new IOException("Error tagging object", e)
- }
- }
- }
-}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala
index 3484a6c8d135..c4023c81f335 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala
@@ -8,53 +8,49 @@
package org.locationtech.geomesa.fs.storage.common.s3
-import com.amazonaws.services.s3.AmazonS3
+import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
-import java.io.IOException
+import scala.util.control.NonFatal
/**
* Factory for S3VisibilityObserver
*/
-class S3VisibilityObserverFactory extends FileSystemObserverFactory {
+class S3VisibilityObserverFactory extends FileSystemObserverFactory with LazyLogging {
- private var fs: FileSystem = _
- private var s3: AmazonS3 = _
- private var tag: String = _
+ private var delegate: FileSystemObserverFactory = _
override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = {
try {
- // use reflection to access to private client factory used by the s3a hadoop impl
- fs = root.getFileSystem(conf)
- val field = fs.getClass.getDeclaredField("s3")
- field.setAccessible(true)
- s3 = field.get(fs).asInstanceOf[AmazonS3]
- tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag)
+ if (S3VisibilityObserverFactory.UseV2) {
+ delegate = new v2.S3VisibilityObserverFactory()
+ } else {
+ delegate = new v1.S3VisibilityObserverFactory()
+ }
+ delegate.init(conf, root, sft)
} catch {
case e: Exception => throw new RuntimeException("Unable to get s3 client", e)
}
}
- override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(s3, path, tag)
-
- override def close(): Unit = {
- s3 = null
- if (fs != null) {
- try {
- fs.close()
- } catch {
- case e: Exception => throw new IOException("Error closing S3 filesystem", e)
- } finally {
- fs = null
- }
- }
- }
+ override def apply(path: Path): FileSystemObserver = delegate.apply(path)
+
+ override def close(): Unit = if (delegate != null) { delegate.close() }
}
-object S3VisibilityObserverFactory {
+object S3VisibilityObserverFactory extends LazyLogging {
+
val TagNameConfig = "geomesa.fs.vis.tag"
val DefaultTag = "geomesa.file.visibility"
+
+ lazy private val UseV2: Boolean = try {
+ val versionRegex = """(\d+)\.(\d+)\..*""".r
+ val versionRegex(maj, min) = org.apache.hadoop.util.VersionInfo.getVersion
+ maj.toInt >= 3 && min.toInt >= 4
+ } catch {
+ case NonFatal(e) => logger.warn("Unable to evaluate hadoop version, defaulting to aws sdk v2: ", e); true
+ }
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala
new file mode 100644
index 000000000000..aad15d0a1ab0
--- /dev/null
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala
@@ -0,0 +1,31 @@
+/***********************************************************************
+ * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.fs.storage.common.s3
+package v1
+
+import com.amazonaws.services.s3.AmazonS3
+import com.amazonaws.services.s3.model.{ObjectTagging, SetObjectTaggingRequest, Tag}
+import org.apache.hadoop.fs.Path
+
+import java.util.Collections
+
+/**
+ * Creates a tag containing the base64 encoded summary visibility for the observed file
+ *
+ * @param path file path
+ * @param s3 s3 client
+ * @param tag tag name to use
+ */
+class S3VisibilityObserver(val path: Path, s3: AmazonS3, tag: String) extends AbstractS3VisibilityObserver(path) {
+ override protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit = {
+ val tagging = new ObjectTagging(Collections.singletonList(new Tag(tag, visibility)))
+ val request = new SetObjectTaggingRequest(bucket, key, tagging)
+ s3.setObjectTagging(request)
+ }
+}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala
new file mode 100644
index 000000000000..e615203e3ba9
--- /dev/null
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala
@@ -0,0 +1,53 @@
+/***********************************************************************
+ * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.fs.storage.common.s3
+package v1
+
+import com.amazonaws.services.s3.AmazonS3
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.geotools.api.feature.simple.SimpleFeatureType
+import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
+import org.locationtech.geomesa.utils.io.CloseQuietly
+
+import java.io.IOException
+
+/**
+ * Visibility observer for aws sdk v1
+ */
+class S3VisibilityObserverFactory extends FileSystemObserverFactory {
+
+ private var fs: FileSystem = _
+ private var s3: AmazonS3 = _
+ private var tag: String = _
+
+ override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = {
+ try {
+ // use reflection to access to private client factory used by the s3a hadoop impl
+ fs = root.getFileSystem(conf)
+ val field = fs.getClass.getDeclaredField("s3")
+ field.setAccessible(true)
+ s3 = field.get(fs).asInstanceOf[AmazonS3]
+ tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag)
+ } catch {
+ case e: Exception => throw new RuntimeException("Unable to get s3 client", e)
+ }
+ }
+
+ override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(path, s3, tag)
+
+ override def close(): Unit = {
+ if (fs != null) {
+ val err = CloseQuietly(fs)
+ s3 = null
+ fs = null
+ err.foreach(e => throw new IOException("Error closing S3 filesystem", e))
+ }
+ }
+}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala
new file mode 100644
index 000000000000..ba85575c8418
--- /dev/null
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala
@@ -0,0 +1,29 @@
+/***********************************************************************
+ * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.fs.storage.common.s3
+package v2
+
+import org.apache.hadoop.fs.Path
+import software.amazon.awssdk.services.s3.S3Client
+import software.amazon.awssdk.services.s3.model.{PutObjectTaggingRequest, Tag, Tagging}
+
+/**
+ * Creates a tag containing the base64 encoded summary visibility for the observed file
+ *
+ * @param path file path
+ * @param s3 s3 client
+ * @param tag tag name to use
+ */
+class S3VisibilityObserver(path: Path, s3: S3Client, tag: String) extends AbstractS3VisibilityObserver(path) {
+ override protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit = {
+ val tagging = Tagging.builder().tagSet(Tag.builder.key(tag).value(visibility).build()).build()
+ val request = PutObjectTaggingRequest.builder.bucket(bucket).key(key).tagging(tagging).build()
+ s3.putObjectTagging(request)
+ }
+}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala
new file mode 100644
index 000000000000..008ddfccceee
--- /dev/null
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala
@@ -0,0 +1,51 @@
+/***********************************************************************
+ * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.fs.storage.common.s3
+package v2
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.s3a.S3AFileSystem
+import org.geotools.api.feature.simple.SimpleFeatureType
+import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
+import org.locationtech.geomesa.utils.io.CloseQuietly
+import software.amazon.awssdk.services.s3.S3Client
+
+import java.io.IOException
+
+/**
+ * Visibility observer for aws sdk v2
+ */
+class S3VisibilityObserverFactory extends FileSystemObserverFactory {
+
+ private var fs: S3AFileSystem = _
+ private var s3: S3Client = _
+ private var tag: String = _
+
+ override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = {
+ try {
+ fs = root.getFileSystem(conf).asInstanceOf[S3AFileSystem]
+ s3 = fs.getS3AInternals.getAmazonS3Client("tags")
+ tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag)
+ } catch {
+ case e: Exception => throw new RuntimeException("Unable to get s3 client", e)
+ }
+ }
+
+ override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(path, s3, tag)
+
+ override def close(): Unit = {
+ if (fs != null) {
+ val err = CloseQuietly(fs)
+ s3 = null
+ fs = null
+ err.foreach(e => throw new IOException("Error closing S3 filesystem", e))
+ }
+ }
+}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala
index cc0987c9a45b..39b1b4626eeb 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala
@@ -8,12 +8,9 @@
package org.locationtech.geomesa.fs.storage.common.s3
-import com.amazonaws.services.s3.AmazonS3
-import com.amazonaws.services.s3.model.{SetObjectTaggingRequest, Tag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.util.Progressable
+import org.apache.hadoop.fs.s3a.{S3AFileSystem, S3AInternals}
import org.geotools.api.feature.simple.SimpleFeature
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
@@ -24,8 +21,9 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.specs2.mock.Mockito
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner
+import software.amazon.awssdk.services.s3.S3Client
+import software.amazon.awssdk.services.s3.model.{PutObjectTaggingRequest, Tag}
-import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Base64
@@ -42,105 +40,91 @@ class S3VisibilityObserverTest extends Specification with Mockito {
sf
}
- // noinspection NotImplementedCode
- class MockFileSystem extends FileSystem{
-
- val s3: AmazonS3 = mock[AmazonS3]
-
- override def getUri: URI = ???
- override def open(path: Path, i: Int): FSDataInputStream = ???
- override def create(path: Path, fsp: FsPermission, b: Boolean, i: Int, i1: Short, l: Long, p: Progressable): FSDataOutputStream = ???
- override def append(path: Path, i: Int, p: Progressable): FSDataOutputStream = ???
- override def rename(path: Path, path1: Path): Boolean = ???
- override def delete(path: Path, b: Boolean): Boolean = ???
- override def listStatus(path: Path): Array[FileStatus] = ???
- override def setWorkingDirectory(path: Path): Unit = ???
- override def getWorkingDirectory: Path = ???
- override def mkdirs(path: Path, fsp: FsPermission): Boolean = ???
- override def getFileStatus(path: Path): FileStatus = ???
+ def mockS3(factory: S3VisibilityObserverFactory): S3Client = {
+ val root = mock[Path]
+ val s3Internals = mock[S3AInternals]
+ val s3 = mock[S3Client]
+ val fs = new S3AFileSystem() {
+ override def getS3AInternals: S3AInternals = s3Internals
+ }
+
+ root.getFileSystem(ArgumentMatchers.any()) returns fs
+ s3Internals.getAmazonS3Client(ArgumentMatchers.any()) returns s3
+ factory.init(new Configuration(), root, sft)
+ s3
}
"S3VisibilityObserver" should {
"initialize factory correctly" >> {
// mimic construction through reflection
- WithClose(classOf[S3VisibilityObserverFactory].newInstance()) { factory =>
- val root = mock[Path]
- root.getFileSystem(ArgumentMatchers.any()) returns new MockFileSystem()
- factory.init(new Configuration(), root, sft) must not(throwAn[Exception])
+ WithClose(classOf[S3VisibilityObserverFactory].getDeclaredConstructor().newInstance()) { factory =>
+ mockS3(factory) must not(throwAn[Exception])
}
}
"tag a single visibility label" >> {
WithClose(new S3VisibilityObserverFactory) { factory =>
- val fs = new MockFileSystem()
- val root = mock[Path]
- root.getFileSystem(ArgumentMatchers.any()) returns fs
- factory.init(new Configuration(), root, sft)
+ val s3 = mockS3(factory)
val observer = factory.apply(new Path("s3a://foo/bar/baz.json"))
observer.write(feature(0, "user"))
observer.close()
- val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest])
- there was one(fs.s3).setObjectTagging(captor.capture())
+ val captor: ArgumentCaptor[PutObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[PutObjectTaggingRequest])
+ there was one(s3).putObjectTagging(captor.capture())
val request = captor.getValue
- request.getBucketName mustEqual "foo"
- request.getKey mustEqual "bar/baz.json"
+ request.bucket mustEqual "foo"
+ request.key mustEqual "bar/baz.json"
val encoded = Base64.getEncoder.encodeToString("user".getBytes(StandardCharsets.UTF_8))
- request.getTagging.getTagSet.asScala mustEqual Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encoded))
+ request.tagging.tagSet.asScala mustEqual
+ Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encoded).build())
}
}
"tag multiple visibility labels" >> {
WithClose(new S3VisibilityObserverFactory) { factory =>
- val fs = new MockFileSystem()
- val root = mock[Path]
- root.getFileSystem(ArgumentMatchers.any()) returns fs
- factory.init(new Configuration(), root, sft)
+ val s3 = mockS3(factory)
val observer = factory.apply(new Path("s3a://foo/bar/baz.json"))
observer.write(feature(0, "user"))
observer.write(feature(1, "admin"))
observer.write(feature(2, "user"))
observer.close()
- val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest])
- there was one(fs.s3).setObjectTagging(captor.capture())
+ val captor: ArgumentCaptor[PutObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[PutObjectTaggingRequest])
+ there was one(s3).putObjectTagging(captor.capture())
val request = captor.getValue
- request.getBucketName mustEqual "foo"
- request.getKey mustEqual "bar/baz.json"
+ request.bucket mustEqual "foo"
+ request.key mustEqual "bar/baz.json"
// since the vis are kept in a set, the order is not defined
val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8))
val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8))
- request.getTagging.getTagSet.asScala must beOneOf(
- Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedFront)),
- Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedBack))
+ request.tagging.tagSet.asScala must beOneOf(
+ Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedFront).build()),
+ Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedBack).build())
)
}
}
"simplify tag expressions" >> {
WithClose(new S3VisibilityObserverFactory) { factory =>
- val fs = new MockFileSystem()
- val root = mock[Path]
- root.getFileSystem(ArgumentMatchers.any()) returns fs
- factory.init(new Configuration(), root, sft)
+ val s3 = mockS3(factory)
val observer = factory.apply(new Path("s3a://foo/bar/baz.json"))
observer.write(feature(0, "user&admin"))
observer.write(feature(1, "admin"))
observer.write(feature(2, "user"))
observer.close()
- val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest])
- there was one(fs.s3).setObjectTagging(captor.capture())
+ val captor: ArgumentCaptor[PutObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[PutObjectTaggingRequest])
+ there was one(s3).putObjectTagging(captor.capture())
val request = captor.getValue
- request.getBucketName mustEqual "foo"
- request.getKey mustEqual "bar/baz.json"
+ request.bucket mustEqual "foo"
+ request.key mustEqual "bar/baz.json"
// since the vis are kept in a set, the order is not defined
val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8))
val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8))
- request.getTagging.getTagSet.asScala must beOneOf(
- Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedFront)),
- Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedBack))
+ request.tagging.tagSet.asScala must beOneOf(
+ Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedFront).build()),
+ Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedBack).build())
)
}
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala
new file mode 100644
index 000000000000..da217fa6c1cf
--- /dev/null
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala
@@ -0,0 +1,147 @@
+/***********************************************************************
+ * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.fs.storage.common.s3.v1
+
+import com.amazonaws.services.s3.AmazonS3
+import com.amazonaws.services.s3.model.{SetObjectTaggingRequest, Tag}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.util.Progressable
+import org.geotools.api.feature.simple.SimpleFeature
+import org.junit.runner.RunWith
+import org.locationtech.geomesa.features.ScalaSimpleFeature
+import org.locationtech.geomesa.security.SecurityUtils
+import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
+import org.locationtech.geomesa.utils.io.WithClose
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.specs2.mock.Mockito
+import org.specs2.mutable.Specification
+import org.specs2.runner.JUnitRunner
+
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+
+@RunWith(classOf[JUnitRunner])
+class S3VisibilityObserverTest extends Specification with Mockito {
+
+ import scala.collection.JavaConverters._
+
+ val sft = SimpleFeatureTypes.createType("s3", "dtg:Date,*geom:Point:srid=4326")
+ val defaultTag = org.locationtech.geomesa.fs.storage.common.s3.S3VisibilityObserverFactory.DefaultTag
+
+ def feature(id: Int, vis: String): SimpleFeature = {
+ val sf = ScalaSimpleFeature.create(sft, s"$id", "2020-01-01T00:00:00.000Z", "POINT (45 55)")
+ SecurityUtils.setFeatureVisibility(sf, vis)
+ sf
+ }
+
+ // noinspection NotImplementedCode
+ class MockFileSystem extends FileSystem {
+
+ val s3: AmazonS3 = mock[AmazonS3]
+
+ override def getUri: URI = ???
+ override def open(path: Path, i: Int): FSDataInputStream = ???
+ override def create(path: Path, fsp: FsPermission, b: Boolean, i: Int, i1: Short, l: Long, p: Progressable): FSDataOutputStream = ???
+ override def append(path: Path, i: Int, p: Progressable): FSDataOutputStream = ???
+ override def rename(path: Path, path1: Path): Boolean = ???
+ override def delete(path: Path, b: Boolean): Boolean = ???
+ override def listStatus(path: Path): Array[FileStatus] = ???
+ override def setWorkingDirectory(path: Path): Unit = ???
+ override def getWorkingDirectory: Path = ???
+ override def mkdirs(path: Path, fsp: FsPermission): Boolean = ???
+ override def getFileStatus(path: Path): FileStatus = ???
+ }
+
+ "S3VisibilityObserver" should {
+
+ "initialize factory correctly" >> {
+ // mimic construction through reflection
+ WithClose(classOf[S3VisibilityObserverFactory].getDeclaredConstructor().newInstance()) { factory =>
+ val root = mock[Path]
+ root.getFileSystem(ArgumentMatchers.any()) returns new MockFileSystem()
+ factory.init(new Configuration(), root, sft) must not(throwAn[Exception])
+ }
+ }
+
+ "tag a single visibility label" >> {
+ WithClose(new S3VisibilityObserverFactory) { factory =>
+ val fs = new MockFileSystem()
+ val root = mock[Path]
+ root.getFileSystem(ArgumentMatchers.any()) returns fs
+ factory.init(new Configuration(), root, sft)
+ val observer = factory.apply(new Path("s3a://foo/bar/baz.json"))
+ observer.write(feature(0, "user"))
+ observer.close()
+
+ val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest])
+ there was one(fs.s3).setObjectTagging(captor.capture())
+ val request = captor.getValue
+ request.getBucketName mustEqual "foo"
+ request.getKey mustEqual "bar/baz.json"
+ val encoded = Base64.getEncoder.encodeToString("user".getBytes(StandardCharsets.UTF_8))
+ request.getTagging.getTagSet.asScala mustEqual Seq(new Tag(defaultTag, encoded))
+ }
+ }
+
+ "tag multiple visibility labels" >> {
+ WithClose(new S3VisibilityObserverFactory) { factory =>
+ val fs = new MockFileSystem()
+ val root = mock[Path]
+ root.getFileSystem(ArgumentMatchers.any()) returns fs
+ factory.init(new Configuration(), root, sft)
+ val observer = factory.apply(new Path("s3a://foo/bar/baz.json"))
+ observer.write(feature(0, "user"))
+ observer.write(feature(1, "admin"))
+ observer.write(feature(2, "user"))
+ observer.close()
+
+ val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest])
+ there was one(fs.s3).setObjectTagging(captor.capture())
+ val request = captor.getValue
+ request.getBucketName mustEqual "foo"
+ request.getKey mustEqual "bar/baz.json"
+ // since the vis are kept in a set, the order is not defined
+ val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8))
+ val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8))
+ request.getTagging.getTagSet.asScala must beOneOf(
+ Seq(new Tag(defaultTag, encodedFront)), Seq(new Tag(defaultTag, encodedBack))
+ )
+ }
+ }
+
+ "simplify tag expressions" >> {
+ WithClose(new S3VisibilityObserverFactory) { factory =>
+ val fs = new MockFileSystem()
+ val root = mock[Path]
+ root.getFileSystem(ArgumentMatchers.any()) returns fs
+ factory.init(new Configuration(), root, sft)
+ val observer = factory.apply(new Path("s3a://foo/bar/baz.json"))
+ observer.write(feature(0, "user&admin"))
+ observer.write(feature(1, "admin"))
+ observer.write(feature(2, "user"))
+ observer.close()
+
+ val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest])
+ there was one(fs.s3).setObjectTagging(captor.capture())
+ val request = captor.getValue
+ request.getBucketName mustEqual "foo"
+ request.getKey mustEqual "bar/baz.json"
+ // since the vis are kept in a set, the order is not defined
+ val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8))
+ val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8))
+ request.getTagging.getTagSet.asScala must beOneOf(
+ Seq(new Tag(defaultTag, encodedFront)), Seq(new Tag(defaultTag, encodedBack))
+ )
+ }
+ }
+ }
+}
diff --git a/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh b/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh
index fca2dbb83267..d9696e586f5b 100755
--- a/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh
+++ b/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh
@@ -12,7 +12,9 @@
# Update the versions as required to match the target environment.
hadoop_install_version="%%hadoop.version.recommended%%"
-aws_sdk_install_version="1.12.385" # latest version as of 2023/01
+aws_sdk_v1_install_version="1.12.735" # latest version as of 2024/06
+aws_sdk_v2_install_version="2.25.64" # latest version as of 2024/06
+aws_crt_install_version="0.29.18"
# this should match the parquet desired version
snappy_install_version="1.1.1.6"
@@ -23,12 +25,14 @@ function dependencies() {
local classpath="$1"
local hadoop_version="$hadoop_install_version"
- local aws_sdk_version="$aws_sdk_install_version"
+ local aws_sdk_v1_version="$aws_sdk_v1_install_version"
+ local aws_sdk_v2_version="$aws_sdk_v2_install_version"
local snappy_version="$snappy_install_version"
if [[ -n "$classpath" ]]; then
hadoop_version="$(get_classpath_version hadoop-common "$classpath" "$hadoop_version")"
- aws_sdk_version="$(get_classpath_version aws-java-sdk-core "$classpath" "$aws_sdk_version")"
+ aws_sdk_v1_version="$(get_classpath_version aws-java-sdk-core "$classpath" "$aws_sdk_v1_version")"
+ aws_sdk_v2_version="$(get_classpath_version aws-core "$classpath" "$aws_sdk_v2_version")"
snappy_version="$(get_classpath_version snappy-java "$classpath" "$snappy_version")"
fi
@@ -52,22 +56,14 @@ function dependencies() {
"com.google.protobuf:protobuf-java:2.5.0:jar"
"org.apache.htrace:htrace-core:3.1.0-incubating:jar"
"org.apache.htrace:htrace-core4:4.1.0-incubating:jar"
- "com.amazonaws:aws-java-sdk-core:${aws_sdk_version}:jar"
- "com.amazonaws:aws-java-sdk-s3:${aws_sdk_version}:jar"
- "com.amazonaws:aws-java-sdk-dynamodb:${aws_sdk_version}:jar"
- # joda-time required for aws sdk
- "joda-time:joda-time:2.8.1:jar"
# these are the versions used by hadoop 2.8 and 3.1
"org.apache.httpcomponents:httpclient:4.5.2:jar"
"org.apache.httpcomponents:httpcore:4.4.4:jar"
"commons-httpclient:commons-httpclient:3.1:jar"
-
)
# add hadoop 3+ jars if needed
- local hadoop_maj_ver
- hadoop_maj_ver="$([[ "$hadoop_version" =~ ([0-9][0-9]*)\. ]] && echo "${BASH_REMATCH[1]}")"
- if [[ "$hadoop_maj_ver" -ge 3 ]]; then
+ if version_ge "${hadoop_version}" 3.0.0; then
gavs+=(
"org.apache.hadoop:hadoop-client-api:${hadoop_version}:jar"
"org.apache.hadoop:hadoop-client-runtime:${hadoop_version}:jar"
@@ -79,6 +75,49 @@ function dependencies() {
)
fi
+ # aws sdk
+ if version_ge "${hadoop_version}" 3.4.0; then
+ gavs+=(
+ "software.amazon.awssdk:annotations:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:apache-client:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:arns:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:auth:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:aws-core:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:aws-query-protocol:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:aws-xml-protocol:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:checksums:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:checksums-spi:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:crt-core:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:endpoints-spi:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:http-auth:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:http-auth-aws:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:http-auth-spi:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:http-client-spi:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:identity-spi:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:json-utils:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:metrics-spi:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:netty-nio-client:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:profiles:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:protocol-core:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:regions:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:s3:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:s3-transfer-manager:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:sdk-core:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:third-party-jackson-core:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk:utils:${aws_sdk_v2_version}:jar"
+ "software.amazon.awssdk.crt:aws-crt:${aws_crt_install_version}:jar"
+ "software.amazon.eventstream:eventstream:1.0.1:jar"
+ "org.reactivestreams:reactive-streams:1.0.4:jar"
+ )
+ else
+ gavs+=(
+ "com.amazonaws:aws-java-sdk-core:${aws_sdk_v1_version}:jar"
+ "com.amazonaws:aws-java-sdk-s3:${aws_sdk_v1_version}:jar"
+ "com.amazonaws:aws-java-sdk-dynamodb:${aws_sdk_v1_version}:jar"
+ "joda-time:joda-time:2.8.1:jar"
+ )
+ fi
+
echo "${gavs[@]}" | tr ' ' '\n' | sort | tr '\n' ' '
}
diff --git a/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh b/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh
index c3abf0235405..e04512c1fbdc 100755
--- a/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh
+++ b/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh
@@ -18,8 +18,6 @@ zookeeper_install_version="%%zookeeper.version.recommended%%"
# required for hadoop - make sure it corresponds to the hadoop installed version
guava_install_version="%%hbase.guava.version%%"
-function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }
-
# gets the dependencies for this module
# args:
# $1 - current classpath
diff --git a/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh b/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh
index b415fa9edbbd..bb97eca55c42 100755
--- a/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh
+++ b/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh
@@ -14,8 +14,6 @@
kafka_install_version="%%kafka.version%%"
zookeeper_install_version="%%zookeeper.version.recommended%%"
-function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }
-
# gets the dependencies for this module
# args:
# $1 - current classpath
diff --git a/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh b/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh
index a46969c11083..c607a986a191 100755
--- a/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh
+++ b/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh
@@ -18,8 +18,6 @@ kafka_install_version="%%kafka.version%%"
# required for hadoop - make sure it corresponds to the hadoop installed version
guava_install_version="%%accumulo.guava.version%%"
-function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }
-
# gets the dependencies for this module
# args:
# $1 - current classpath
diff --git a/geomesa-tools/conf-filtered/geomesa-env.sh b/geomesa-tools/conf-filtered/geomesa-env.sh
index 1a1ce5bf8f0b..f6338f9fb215 100644
--- a/geomesa-tools/conf-filtered/geomesa-env.sh
+++ b/geomesa-tools/conf-filtered/geomesa-env.sh
@@ -84,6 +84,9 @@ fi
newline=$'\n'
+# checks a version string is >= a test
+function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }
+
# setup opts for invoking the geomesa java process
function get_options() {
# create log dir if needed
diff --git a/geomesa-tools/conf-filtered/parquet-dependencies.sh b/geomesa-tools/conf-filtered/parquet-dependencies.sh
index 7bdf14ecd5cb..228d13a74dda 100755
--- a/geomesa-tools/conf-filtered/parquet-dependencies.sh
+++ b/geomesa-tools/conf-filtered/parquet-dependencies.sh
@@ -12,8 +12,6 @@
hadoop_install_version="%%hadoop.version.recommended%%"
-function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }
-
# gets the dependencies for this module
# args:
# $1 - current classpath
diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala
index 2e55641f29c4..f2709d5c6382 100644
--- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala
+++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala
@@ -9,7 +9,7 @@
package org.locationtech.geomesa.utils.io
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
-import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
+import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject, SwallowedExceptionListener}
import java.io.Closeable
@@ -70,6 +70,19 @@ object CloseablePool {
}
}
- override def close(): Unit = pool.close()
+ override def close(): Unit = {
+ val errors = new java.util.concurrent.LinkedBlockingQueue[Exception]()
+ pool.setSwallowedExceptionListener(new SwallowedExceptionListener() {
+ override def onSwallowException(e: Exception): Unit = errors.offer(e)
+ })
+ pool.close()
+ if (!errors.isEmpty) {
+ val e = errors.poll()
+ while (!errors.isEmpty) {
+ e.addSuppressed(errors.poll())
+ }
+ throw e
+ }
+ }
}
}
diff --git a/pom.xml b/pom.xml
index 39994a3a502c..7b03b6387398 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,9 @@
1.0.0-beta
16.1.0
1.11.3
- 1.12.625
+ 1.12.735
+ 2.25.64
+ 0.29.18
3.1.8
2.9.8
1.16.0
@@ -1587,6 +1589,21 @@
aws-java-sdk-cloudwatch
${aws.sdk.version}
+
+ software.amazon.awssdk
+ s3
+ ${aws.sdk.v2.version}
+
+
+ software.amazon.awssdk
+ s3-transfer-manager
+ ${aws.sdk.v2.version}
+
+
+ software.amazon.awssdk.crt
+ aws-crt
+ ${aws.crt.version}
+
commons-io
commons-io
@@ -2206,6 +2223,18 @@
${hadoop.version}
provided
+
+ org.apache.hadoop
+ hadoop-aws
+ ${hadoop.version}
+ provided
+
+
+ software.amazon.awssdk
+ bundle
+
+
+
xerces