diff --git a/geomesa-gt/geomesa-gt-partitioning/pom.xml b/geomesa-gt/geomesa-gt-partitioning/pom.xml index cbe3b62af594..5828e402c617 100644 --- a/geomesa-gt/geomesa-gt-partitioning/pom.xml +++ b/geomesa-gt/geomesa-gt-partitioning/pom.xml @@ -24,6 +24,10 @@ org.locationtech.geomesa geomesa-filter_${scala.binary.version} + + org.locationtech.geomesa + geomesa-index-api_${scala.binary.version} + org.geotools.jdbc gt-jdbc-postgis diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala index 95998b8c88b9..449056db13e7 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala @@ -9,6 +9,7 @@ package org.locationtech.geomesa.gt.partition.postgis.dialect import com.typesafe.scalalogging.StrictLogging +import org.geotools.api.data.Query import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor} import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.api.filter.Filter @@ -22,8 +23,9 @@ import org.locationtech.geomesa.gt.partition.postgis.dialect.functions.{LogClean import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures._ import org.locationtech.geomesa.gt.partition.postgis.dialect.tables._ import org.locationtech.geomesa.gt.partition.postgis.dialect.triggers.{DeleteTrigger, InsertTrigger, UpdateTrigger, WriteAheadTrigger} +import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptorFactory import org.locationtech.geomesa.utils.geotools.{Conversions, SimpleFeatureTypes} -import org.locationtech.geomesa.utils.io.WithClose +import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} import org.locationtech.jts.geom._ import java.sql.{Connection, DatabaseMetaData, ResultSet, Types} @@ -55,6 +57,12 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto override def initialValue(): Boolean = false } + private val interceptors = { + val factory = QueryInterceptorFactory(store) + sys.addShutdownHook(CloseWithLogging(factory)) // we don't have any API hooks to dispose of things... + factory + } + /** * Re-create the PLPG/SQL procedures associated with a feature type. This can be used * to 'upgrade in place' if the code is changed. @@ -240,7 +248,10 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto override def splitFilter(filter: Filter, schema: SimpleFeatureType): Array[Filter] = { import PartitionedPostgisDialect.Config.ConfigConversions - super.splitFilter(SplitFilterVisitor(filter, schema.isFilterWholeWorld), schema) + val simplified = SplitFilterVisitor(filter, schema.isFilterWholeWorld) + val query = new Query(schema.getTypeName, simplified) + interceptors(schema).foreach(_.rewrite(query)) + super.splitFilter(query.getFilter, schema) } override def registerClassToSqlMappings(mappings: java.util.Map[Class[_], Integer]): Unit = { diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala index 2f549e98eabf..fada2fc7e2bd 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala @@ -47,15 +47,17 @@ class UserDataTable extends Sql { s"INSERT INTO ${table.quoted} (type_name, key, value) VALUES (?, ?, ?) " + s"ON CONFLICT (type_name, key) DO UPDATE SET value = EXCLUDED.value;" - def insert(config: String, value: Option[String]): Unit = - value.foreach(v => ex.executeUpdate(insertSql, Seq(info.typeName, config, v))) + def insert(config: String, value: String): Unit = + ex.executeUpdate(insertSql, Seq(info.typeName, config, value)) - insert(SimpleFeatureTypes.Configs.DefaultDtgField, Some(info.cols.dtg.raw)) - insert(Config.IntervalHours, Some(Integer.toString(info.partitions.hoursPerPartition))) - insert(Config.PagesPerRange, Some(Integer.toString(info.partitions.pagesPerRange))) - insert(Config.MaxPartitions, info.partitions.maxPartitions.map(Integer.toString)) - insert(Config.CronMinute, info.partitions.cronMinute.map(Integer.toString)) - insert(Config.FilterWholeWorld, info.userData.get(Config.FilterWholeWorld)) + insert(SimpleFeatureTypes.Configs.DefaultDtgField, info.cols.dtg.raw) + insert(Config.IntervalHours, Integer.toString(info.partitions.hoursPerPartition)) + insert(Config.PagesPerRange, Integer.toString(info.partitions.pagesPerRange)) + info.partitions.maxPartitions.map(Integer.toString).foreach(insert(Config.MaxPartitions, _)) + info.partitions.cronMinute.map(Integer.toString).foreach(insert(Config.CronMinute, _)) + Seq(Config.FilterWholeWorld, SimpleFeatureTypes.Configs.QueryInterceptors).foreach { key => + info.userData.get(key).foreach(insert(key, _)) + } } override def drop(info: TypeInfo)(implicit ex: ExecutionContext): Unit = { diff --git a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala index 3d46f485aa30..9a49c35cf4d6 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala @@ -577,6 +577,48 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll } } + "support query interceptors" in { + val sft = SimpleFeatureTypes.renameSft(this.sft, "interceptor") + sft.getUserData.put(SimpleFeatureTypes.Configs.QueryInterceptors, classOf[TestQueryInterceptor].getName) + + val ds = DataStoreFinder.getDataStore(params.asJava) + ds must not(beNull) + + try { + ds must beAnInstanceOf[JDBCDataStore] + + ds.getTypeNames.toSeq must not(contain(sft.getTypeName)) + ds.createSchema(sft) + + val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null) + schema must not(beNull) + schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq) + logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}") + + val Array(left, right) = ds.asInstanceOf[JDBCDataStore].getSQLDialect.splitFilter(Filter.EXCLUDE, schema) + left mustEqual Filter.INCLUDE + right mustEqual Filter.INCLUDE + + // write some data + WithClose(new DefaultTransaction()) { tx => + WithClose(ds.getFeatureWriterAppend(sft.getTypeName, tx)) { writer => + features.foreach { feature => + FeatureUtils.write(writer, feature, useProvidedFid = true) + } + } + tx.commit() + } + + // verify that filter is re-written to be Filter.INCLUDE + WithClose(ds.getFeatureReader(new Query(sft.getTypeName, ECQL.toFilter("IN('1')")), Transaction.AUTO_COMMIT)) { reader => + val result = SelfClosingIterator(reader).toList + result.map(compFromDb) must containTheSameElementsAs(features.map(compWithFid(_, sft))) + } + } finally { + ds.dispose() + } + } + "support idle_in_transaction_session_timeout" in { val sft = SimpleFeatureTypes.renameSft(this.sft, "timeout") diff --git a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/TestQueryInterceptor.scala b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/TestQueryInterceptor.scala new file mode 100644 index 000000000000..f8be78530bbc --- /dev/null +++ b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/TestQueryInterceptor.scala @@ -0,0 +1,25 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.gt.partition.postgis + +import org.geotools.api.data.{DataStore, Query} +import org.geotools.api.feature.simple.SimpleFeatureType +import org.geotools.api.filter.Filter +import org.locationtech.geomesa.index.planning.QueryInterceptor + +class TestQueryInterceptor extends QueryInterceptor { + + var sft: SimpleFeatureType = _ + + override def init(ds: DataStore, sft: SimpleFeatureType): Unit = this.sft = sft + + override def rewrite(query: Query): Unit = query.setFilter(Filter.INCLUDE) + + override def close(): Unit = {} +} diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryInterceptor.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryInterceptor.scala index b04b49a51325..4b74387efab1 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryInterceptor.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryInterceptor.scala @@ -111,7 +111,7 @@ object QueryInterceptor extends LazyLogging { classes.split(",").toSeq.flatMap { c => var interceptor: QueryInterceptor = null try { - interceptor = Class.forName(c).newInstance().asInstanceOf[QueryInterceptor] + interceptor = Class.forName(c).getDeclaredConstructor().newInstance().asInstanceOf[QueryInterceptor] interceptor.init(ds, sft) Seq(interceptor) } catch {