diff --git a/docs/changelog.md b/docs/changelog.md index 33974a984ed..7ac2e125bbb 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -328,6 +328,32 @@ and optimize this method execution to execute all the slice queries for all the (for example, by using asynchronous programming, slice queries grouping, multi-threaded execution, or any other technique which is efficient for the respective storage adapter). +##### Added possibility to group multiple slice queries together via CQL storage backend + +Starting from JanusGraph 1.0.0 CQL storage implementation now groups queries which fetch properties with `Cardinality.SINGLE` together +into the same CQL query. The behaviour can be changed by setting configuration `storage.cql.grouping.slice-allowed = false`. + +CQL storage implementation has also ability to group queries of different partition keys together if they belong to the same +token range or same replicas set (grouping strategy is available via `storage.cql.grouping.keys-class` configuration option). +This behaviour is disabled by default, but can be enabled via `storage.cql.grouping.keys-allowed = true`. +Notice, that by enabling keys grouping feature it increases the return size of such queries because each CQL query which groups +multiple keys together needs to return those keys per each row. Moreover, it could potentially lead to less balanced load +on the storage cluster. However, it reduces the amount of CQL queries sent which may positively influence throughput in some cases +as well as pricing point of some Serverless deployments. We recommend to benchmark each use-case before enabling keys grouping (`storage.cql.grouping.keys-allowed`). + +Notice, keys grouping (`storage.cql.grouping.keys-allowed`) can be used only with storage backends which support `PER PARTITION LIMIT`. +As such, this feature can't be used with Amazon Keyspaces because it doesn't support `PER PARTITION LIMIT`. + +Different storage backends may also have restriction set on maximum keys which can be provided via `IN` operator. +It is required to ensure that `storage.cql.grouping.keys-limit` or `storage.cql.grouping.slice-limit` is less or equal to the +restriction provided via the storage backend. +On ScyllaDB it's possible to configure this restriction using [max-partition-key-restrictions-per-query](https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions) +configuration option (default to `100`). +On AstraDB side it is needed to be asked on AstraDB side to be changed via `partition_keys_in_select_failure_threshold` and `in_select_cartesian_product_failure_threshold` threshold +configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml) which are set to `20` and `25` be default. + +See additional properties to control grouping configurations under the namespace `storage.cql.grouping`. + ##### Removal of deprecated classes/methods/functionalities ###### Methods diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index ed8bfb2971a..1f48d30a55e 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -461,10 +461,6 @@ CQL storage backend options | storage.cql.request-timeout | Timeout for CQL requests in milliseconds. See DataStax Java Driver option `basic.request.timeout` for more information. | Long | 12000 | MASKABLE | | storage.cql.session-leak-threshold | The maximum number of live sessions that are allowed to coexist in a given VM until the warning starts to log for every new session. If the value is less than or equal to 0, the feature is disabled: no warning will be issued. See DataStax Java Driver option `advanced.session-leak.threshold` for more information. | Integer | (no default value) | MASKABLE | | storage.cql.session-name | Default name for the Cassandra session | String | JanusGraph Session | MASKABLE | -| storage.cql.slice-grouping-allowed | If `true` this allows multiple Slice queries which are allowed to be performed as non-range queries (i.e. direct equality operation) to be grouped together into a single CQL query via `IN` operator. Notice, currently only operations to fetch properties with Cardinality.SINGLE are allowed to be performed as non-range queries (edges fetching or properties with Cardinality SET or LIST won't be grouped together). -If this option if `false` then each Slice query will be executed in a separate asynchronous CQL query even when grouping is allowed. | Boolean | true | MASKABLE | -| storage.cql.slice-grouping-limit | Maximum amount of grouped together slice queries into a single CQL query. -This option is used only when `storage.cql.slice-grouping-allowed` is `true`. | Integer | 100 | MASKABLE | | storage.cql.speculative-retry | The speculative retry policy. One of: NONE, ALWAYS, percentile, ms. | String | (no default value) | FIXED | | storage.cql.ttl-enabled | Whether TTL should be enabled or not. Must be turned off if the storage does not support TTL. Amazon Keyspace, for example, does not support TTL by default unless otherwise enabled. | Boolean | true | LOCAL | | storage.cql.use-external-locking | True to prevent JanusGraph from using its own locking mechanism. Setting this to true eliminates redundant checks when using an external locking mechanism outside of JanusGraph. Be aware that when use-external-locking is set to true, that failure to employ a locking algorithm which locks all columns that participate in a transaction upfront and unlocks them when the transaction ends, will result in a 'read uncommitted' transaction isolation level guarantee. If set to true without an appropriate external locking mechanism in place side effects such as dirty/non-repeatable/phantom reads should be expected. | Boolean | false | MASKABLE | @@ -482,6 +478,39 @@ Configuration options for CQL executor service which is used to process deserial | storage.cql.executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL | | storage.cql.executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL | +### storage.cql.grouping +Configuration options for controlling CQL queries grouping + + +| Name | Description | Datatype | Default Value | Mutability | +| ---- | ---- | ---- | ---- | ---- | +| storage.cql.grouping.keys-allowed | If `true` this allows multiple partition keys to be grouped together into a single CQL query via `IN` operator based on the keys grouping strategy provided (usually grouping is done by same token-ranges or same replica sets, but may also involve shard ids for custom implementations). +Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched. +This option is useful when less amount of CQL queries is desired to be sent for read requests in expense of fetching more data (partition key per each fetched value). +Notice, different storage backends may have different way of executing multi-partition `IN` queries (including, but not limited to how the checksum queries are sent for different consistency levels, processing node CPU usage, disk access pattern, etc.). Thus, a proper benchmarking is needed to determine if keys grouping is useful or not per case by case scenario. +This option can be enabled only for storage backends which support `PER PARTITION LIMIT`. As such, this feature can't be used with Amazon Keyspaces because it doesn't support `PER PARTITION LIMIT`. +If this option is `false` then each partition key will be executed in a separate asynchronous CQL query even when multiple keys from the same token range are queried. +Notice, the default grouping strategy is not taking into account shards. Thus, this might be inefficient with ScyllaDB storage backend. ScyllaDB specific keys grouping strategy should be implemented after the resolution of the [ticket #232](https://github.com/scylladb/java-driver/issues/232). | Boolean | false | MASKABLE | +| storage.cql.grouping.keys-class | Full class path of the keys grouping execution strategy. The class should implement `org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategy` interface and have a public constructor with two arguments `org.janusgraph.diskstorage.configuration.Configuration` and `org.janusgraph.diskstorage.cql.CQLStoreManager`. +Shortcuts available: +- `tokenRangeAware` - groups partition keys which belong to the same token range. Notice, this strategy is not taking into account shards. Thus, this might be inefficient with ScyllaDB storage backend. +- `replicasAware` - groups partition keys which belong to the same replica sets (same nodes). Notice, this strategy is not taking into account shards. Thus, this might be inefficient with ScyllaDB storage backend. + +This option takes effect only when `storage.cql.grouping.keys-allowed` is `true`. | String | replicasAware | MASKABLE | +| storage.cql.grouping.keys-limit | Maximum amount of the keys which belong to the same token range to be grouped together into a single CQL query. If more keys for the same token range are queried, they are going to be grouped into separate CQL queries. +Notice, for ScyllaDB this option should not exceed the maximum number of distinct clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` (https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions). For AstraDB this limit is set to 20 and usually it's fixed. However, you can ask customer support for a possibility to change the default threshold to your desired configuration via `partition_keys_in_select_failure_threshold` and `in_select_cartesian_product_failure_threshold` threshold configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml). +Ensure that your storage backends allows more IN selectors than the one set via this configuration. +This option takes effect only when `storage.cql.grouping.keys-allowed` is `true`. | Integer | 20 | MASKABLE | +| storage.cql.grouping.keys-min | Minimum amount of keys to consider for grouping. Grouping will be skipped for any multi-key query which has less than this amount of keys (i.e. a separate CQL query will be executed for each key in such case). +Usually this configuration should always be set to `2`. It is useful to increase the value only in cases when queries with more keys should not be grouped, but be performed separately to increase parallelism in expense of the network overhead. +This option takes effect only when `storage.cql.grouping.keys-allowed` is `true`. | Integer | 2 | MASKABLE | +| storage.cql.grouping.slice-allowed | If `true` this allows multiple Slice queries which are allowed to be performed as non-range queries (i.e. direct equality operation) to be grouped together into a single CQL query via `IN` operator. Notice, currently only operations to fetch properties with Cardinality.SINGLE are allowed to be performed as non-range queries (edges fetching or properties with Cardinality SET or LIST won't be grouped together). +If this option if `false` then each Slice query will be executed in a separate asynchronous CQL query even when grouping is allowed. | Boolean | true | MASKABLE | +| storage.cql.grouping.slice-limit | Maximum amount of grouped together slice queries into a single CQL query. +Notice, for ScyllaDB this option should not exceed the maximum number of distinct clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` (https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions). For AstraDB this limit is set to 20 and usually it's fixed. However, you can ask customer support for a possibility to change the default threshold to your desired configuration via `partition_keys_in_select_failure_threshold` and `in_select_cartesian_product_failure_threshold` threshold configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml). +Ensure that your storage backends allows more IN selectors than the one set via this configuration. +This option is used only when `storage.cql.grouping.slice-allowed` is `true`. | Integer | 20 | MASKABLE | + ### storage.cql.internal Advanced configuration of internal DataStax driver. Notice, all available configurations will be composed in the order. Non specified configurations will be skipped. By default only base configuration is enabled (which has the smallest priority. It means that you can overwrite any configuration used in base programmatic configuration by using any other configuration type). The configurations are composed in the next order (sorted by priority in descending order): `file-configuration`, `resource-configuration`, `string-configuration`, `url-configuration`, `base-programmatic-configuration` (which is controlled by `base-programmatic-configuration-enabled` property). Configurations with higher priority always overwrite configurations with lower priority. I.e. if the same configuration parameter is used in both `file-configuration` and `string-configuration` the configuration parameter from `file-configuration` will be used and configuration parameter from `string-configuration` will be ignored. See available configuration options and configurations structure here: https://docs.datastax.com/en/developer/java-driver/4.13/manual/core/configuration/reference/ diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryMetaData.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryMetaData.java index bd7f3218414..3687eb6df0c 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryMetaData.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryMetaData.java @@ -29,7 +29,8 @@ public enum EntryMetaData { TTL(Integer.class, false, data -> data instanceof Integer && ((Integer) data) >= 0L), VISIBILITY(String.class, true, data -> data instanceof String && StringEncoding.isAsciiString((String) data)), - TIMESTAMP(Long.class, false, data -> data instanceof Long); + TIMESTAMP(Long.class, false, data -> data instanceof Long), + ROW_KEY(StaticBuffer.class, false, data -> data instanceof StaticBuffer); public static final java.util.Map EMPTY_METADATA = Collections.emptyMap(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java index ab93e429f63..1c3d9d4d13c 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java @@ -18,7 +18,9 @@ import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -63,14 +65,17 @@ public static T get(CompletableFuture future){ public static Map unwrap(Map> futureMap) throws Throwable{ Map resultMap = new HashMap<>(futureMap.size()); Throwable firstException = null; + Set uniqueExceptions = null; for(Map.Entry> entry : futureMap.entrySet()){ try{ resultMap.put(entry.getKey(), entry.getValue().get()); } catch (Throwable throwable){ + Throwable rootException = unwrapExecutionException(throwable); if(firstException == null){ - firstException = throwable; - } else { - firstException.addSuppressed(throwable); + firstException = rootException; + uniqueExceptions = new HashSet<>(1); + } else if(firstException != rootException && uniqueExceptions.add(rootException)){ + firstException.addSuppressed(rootException); } } } @@ -84,14 +89,17 @@ public static Map unwrap(Map> futureMap) throw public static Map> unwrapMapOfMaps(Map>> futureMap) throws Throwable{ Map> resultMap = new HashMap<>(futureMap.size()); Throwable firstException = null; + Set uniqueExceptions = null; for(Map.Entry>> entry : futureMap.entrySet()){ try{ resultMap.put(entry.getKey(), unwrap(entry.getValue())); } catch (Throwable throwable){ + Throwable rootException = unwrapExecutionException(throwable); if(firstException == null){ - firstException = throwable; - } else { - firstException.addSuppressed(throwable); + firstException = rootException; + uniqueExceptions = new HashSet<>(1); + } else if(firstException != rootException && uniqueExceptions.add(rootException)){ + firstException.addSuppressed(rootException); } } } @@ -104,14 +112,17 @@ public static Map> unwrapMapOfMaps(Map void awaitAll(Collection> futureCollection) throws Throwable{ Throwable firstException = null; + Set uniqueExceptions = null; for(CompletableFuture future : futureCollection){ try{ future.get(); } catch (Throwable throwable){ + Throwable rootException = unwrapExecutionException(throwable); if(firstException == null){ - firstException = throwable; - } else { - firstException.addSuppressed(throwable); + firstException = rootException; + uniqueExceptions = new HashSet<>(1); + } else if(firstException != rootException && uniqueExceptions.add(rootException)){ + firstException.addSuppressed(rootException); } } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java index adbc994d6a9..0700c00509f 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java @@ -654,6 +654,8 @@ public static MetaDataSerializer getSerializer(EntryMetaData meta) { return LongSerializer.INSTANCE; case VISIBILITY: return ASCIIStringSerializer.INSTANCE; + case ROW_KEY: + return StaticBufferSerializer.INSTANCE; default: throw new AssertionError("Unexpected meta data: " + meta); } } @@ -736,4 +738,38 @@ public String read(byte[] data, int startPos) { } } + private enum StaticBufferSerializer implements MetaDataSerializer { + + INSTANCE; + + private static final StaticBuffer EMPTY_STATIC_BUFFER = StaticArrayBuffer.of(new byte[0]); + + @Override + public int getByteLength(StaticBuffer value) { + return value.length() + 4; + } + + @Override + public void write(byte[] data, int startPos, StaticBuffer value) { + int length = value.length(); + StaticArrayBuffer.putInt(data, startPos, length); + if(length > 0){ + startPos+=4; + for(int i=0; i, StaticBuffer> { + private static final StaticArrayBuffer EMPTY_KEY = StaticArrayBuffer.of(new byte[0]); + private final EntryMetaData[] schema; CQLColValGetter(final EntryMetaData[] schema) { @@ -50,6 +55,9 @@ public Object getMetaData(final Tuple3 tuple, f return tuple._3.getLong(CQLKeyColumnValueStore.WRITETIME_COLUMN_NAME); case TTL: return tuple._3.getInt(CQLKeyColumnValueStore.TTL_COLUMN_NAME); + case ROW_KEY: + ByteBuffer rawKey = tuple._3.getByteBuffer(CQLKeyColumnValueStore.KEY_COLUMN_NAME); + return rawKey == null ? EMPTY_KEY : StaticArrayBuffer.of(rawKey); default: throw new UnsupportedOperationException("Unsupported meta data: " + metaData); } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java index 6528a93a1a1..62e6232d727 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java @@ -20,6 +20,7 @@ import org.janusgraph.diskstorage.configuration.ConfigOption; import org.janusgraph.diskstorage.configuration.Configuration; import org.janusgraph.diskstorage.configuration.ExecutorServiceBuilder; +import org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategyBuilder; import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; import org.janusgraph.diskstorage.util.backpressure.builder.QueryBackPressureBuilder; import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; @@ -753,9 +754,22 @@ public interface CQLConfigOptions { ConfigOption.Type.LOCAL, String.class); - ConfigOption SLICE_GROUPING_ALLOWED = new ConfigOption<>( + ConfigNamespace CQL_GROUPING_NS = new ConfigNamespace( CQL_NS, - "slice-grouping-allowed", + "grouping", + "Configuration options for controlling CQL queries grouping"); + + String MAX_IN_CONFIG_MSG = "Notice, for ScyllaDB this option should not exceed the maximum number of distinct " + + "clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` " + + "(https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions). " + + "For AstraDB this limit is set to 20 and usually it's fixed. However, you can ask customer support for a possibility to change " + + "the default threshold to your desired configuration via `partition_keys_in_select_failure_threshold` " + + "and `in_select_cartesian_product_failure_threshold` threshold configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml).\n" + + "Ensure that your storage backends allows more IN selectors than the one set via this configuration."; + + ConfigOption SLICE_GROUPING_ALLOWED = new ConfigOption<>( + CQL_GROUPING_NS, + "slice-allowed", "If `true` this allows multiple Slice queries which are allowed to be performed as non-range " + "queries (i.e. direct equality operation) to be grouped together into a single CQL query via `IN` operator. " + "Notice, currently only operations to fetch properties with Cardinality.SINGLE are allowed to be performed as non-range queries " + @@ -766,10 +780,61 @@ public interface CQLConfigOptions { true); ConfigOption SLICE_GROUPING_LIMIT = new ConfigOption<>( - CQL_NS, - "slice-grouping-limit", - "Maximum amount of grouped together slice queries into a single CQL query.\n" + - "This option is used only when `"+SLICE_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.", + CQL_GROUPING_NS, + "slice-limit", + "Maximum amount of grouped together slice queries into a single CQL query.\n" + MAX_IN_CONFIG_MSG + + "\nThis option is used only when `"+SLICE_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.", + ConfigOption.Type.MASKABLE, + 20); + + ConfigOption KEYS_GROUPING_ALLOWED = new ConfigOption<>( + CQL_GROUPING_NS, + "keys-allowed", + "If `true` this allows multiple partition keys to be grouped together into a single CQL query via `IN` operator based on the keys grouping strategy provided " + + "(usually grouping is done by same token-ranges or same replica sets, but may also involve shard ids for custom implementations).\n" + + "Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched.\n" + + "This option is useful when less amount of CQL queries is desired to be sent for read requests in expense of fetching more data (partition key per each fetched value).\n" + + "Notice, different storage backends may have different way of executing multi-partition `IN` queries " + + "(including, but not limited to how the checksum queries are sent for different consistency levels, processing node CPU usage, disk access pattern, etc.). Thus, a proper " + + "benchmarking is needed to determine if keys grouping is useful or not per case by case scenario.\n" + + "This option can be enabled only for storage backends which support `PER PARTITION LIMIT`. As such, this feature can't be used with Amazon Keyspaces because it doesn't support `PER PARTITION LIMIT`.\n" + + "If this option is `false` then each partition key will be executed in a separate asynchronous CQL query even when multiple keys from the same token range are queried.\n" + + "Notice, the default grouping strategy is not taking into account shards. Thus, this might be inefficient with ScyllaDB storage backend. " + + "ScyllaDB specific keys grouping strategy should be implemented after the resolution of the [ticket #232](https://github.com/scylladb/java-driver/issues/232).", + ConfigOption.Type.MASKABLE, + false); + + ConfigOption KEYS_GROUPING_CLASS = new ConfigOption<>( + CQL_GROUPING_NS, + "keys-class", + "Full class path of the keys grouping execution strategy. The class should implement " + + "`org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategy` interface and have a public constructor " + + "with two arguments `org.janusgraph.diskstorage.configuration.Configuration` and `org.janusgraph.diskstorage.cql.CQLStoreManager`.\n" + + "Shortcuts available:\n" + + "- `"+GroupedExecutionStrategyBuilder.TOKEN_RANGE_AWARE +"` - groups partition keys which belong to the same token range. Notice, this strategy is not taking into account shards. Thus, this might be inefficient with ScyllaDB storage backend.\n" + + "- `"+GroupedExecutionStrategyBuilder.REPLICAS_AWARE +"` - groups partition keys which belong to the same replica sets (same nodes). Notice, this strategy is not taking into account shards. Thus, this might be inefficient with ScyllaDB storage backend.\n" + + "\nThis option takes effect only when `"+KEYS_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.", + ConfigOption.Type.MASKABLE, + GroupedExecutionStrategyBuilder.REPLICAS_AWARE); + + ConfigOption KEYS_GROUPING_LIMIT = new ConfigOption<>( + CQL_GROUPING_NS, + "keys-limit", + "Maximum amount of the keys which belong to the same token range to be grouped together into a single CQL query. " + + "If more keys for the same token range are queried, they are going to be grouped into separate CQL queries.\n" + + MAX_IN_CONFIG_MSG + + "\nThis option takes effect only when `"+KEYS_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.", + ConfigOption.Type.MASKABLE, + 20); + + ConfigOption KEYS_GROUPING_MIN = new ConfigOption<>( + CQL_GROUPING_NS, + "keys-min", + "Minimum amount of keys to consider for grouping. Grouping will be skipped for any multi-key query " + + "which has less than this amount of keys (i.e. a separate CQL query will be executed for each key in such case).\n" + + "Usually this configuration should always be set to `2`. It is useful to increase the value only in cases when queries " + + "with more keys should not be grouped, but be performed separately to increase parallelism in expense of the network overhead." + + "\nThis option takes effect only when `"+KEYS_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.", ConfigOption.Type.MASKABLE, - 100); + 2); } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index 201b25ecdbc..0584fee5b16 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -45,36 +45,27 @@ import org.janusgraph.diskstorage.StaticBuffer; import org.janusgraph.diskstorage.TemporaryBackendException; import org.janusgraph.diskstorage.configuration.Configuration; -import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLMultiColumnFunction; -import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLSliceFunction; +import org.janusgraph.diskstorage.cql.service.AsyncQueryExecutionService; +import org.janusgraph.diskstorage.cql.service.GroupingAsyncQueryExecutionService; import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; -import org.janusgraph.diskstorage.keycolumnvalue.KeyMultiColumnQuery; import org.janusgraph.diskstorage.keycolumnvalue.KeyRangeQuery; import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; import org.janusgraph.diskstorage.keycolumnvalue.KeySlicesIterator; -import org.janusgraph.diskstorage.keycolumnvalue.KeysQueriesGroup; import org.janusgraph.diskstorage.keycolumnvalue.MultiKeysQueryGroups; import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery; import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; import org.janusgraph.diskstorage.util.CompletableFutureUtil; -import org.janusgraph.diskstorage.util.EntryArrayList; import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.function.Function; -import java.util.stream.Collectors; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; @@ -95,8 +86,6 @@ import static org.janusgraph.diskstorage.cql.CQLConfigOptions.COMPACTION_OPTIONS; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.COMPACTION_STRATEGY; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.GC_GRACE_SECONDS; -import static org.janusgraph.diskstorage.cql.CQLConfigOptions.SLICE_GROUPING_ALLOWED; -import static org.janusgraph.diskstorage.cql.CQLConfigOptions.SLICE_GROUPING_LIMIT; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.SPECULATIVE_RETRY; import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORE_META_TIMESTAMPS; @@ -161,7 +150,9 @@ private static boolean isKeySizeTooLarge(Throwable cause) { private final CQLStoreManager storeManager; private final CqlSession session; private final String tableName; - private final CQLColValGetter getter; + private final CQLColValGetter singleKeyGetter; + private final CQLColValGetter multiKeysGetter; + private final Runnable closer; private final PreparedStatement getKeysAll; @@ -170,12 +161,8 @@ private static boolean isKeySizeTooLarge(Throwable cause) { private final PreparedStatement insertColumn; private final PreparedStatement insertColumnWithTTL; - private final AsyncCQLSliceFunction cqlSliceFunction; - private final AsyncCQLMultiColumnFunction cqlMultiColumnFunction; - private final boolean sliceGroupingAllowed; - private final int sliceGroupingLimit; - private final QueryBackPressure queryBackPressure; + private final AsyncQueryExecutionService asyncQueryExecutionService; /** * Creates an instance of the {@link KeyColumnValueStore} that stores the data in a CQL backed table. @@ -191,35 +178,17 @@ public CQLKeyColumnValueStore(final CQLStoreManager storeManager, final String t this.tableName = tableName; this.closer = closer; this.session = this.storeManager.getSession(); - this.getter = new CQLColValGetter(storeManager.getMetaDataSchema(this.tableName)); + EntryMetaData[] defaultEntryMetadata = storeManager.getMetaDataSchema(this.tableName); + this.singleKeyGetter = new CQLColValGetter(defaultEntryMetadata); + EntryMetaData[] multiKeyEntryMetadata = new EntryMetaData[defaultEntryMetadata.length+1]; + System.arraycopy(defaultEntryMetadata,0,multiKeyEntryMetadata,0, defaultEntryMetadata.length); + multiKeyEntryMetadata[multiKeyEntryMetadata.length-1] = EntryMetaData.ROW_KEY; + this.multiKeysGetter = new CQLColValGetter(multiKeyEntryMetadata); if(shouldInitializeTable()) { initializeTable(this.session, this.storeManager.getKeyspaceName(), tableName, configuration); } - // @formatter:off - final Select getSliceSelect = selectFrom(this.storeManager.getKeyspaceName(), this.tableName) - .column(COLUMN_COLUMN_NAME) - .column(VALUE_COLUMN_NAME) - .where( - Relation.column(KEY_COLUMN_NAME).isEqualTo(bindMarker(KEY_BINDING)), - Relation.column(COLUMN_COLUMN_NAME).isGreaterThanOrEqualTo(bindMarker(SLICE_START_BINDING)), - Relation.column(COLUMN_COLUMN_NAME).isLessThan(bindMarker(SLICE_END_BINDING)) - ) - .limit(bindMarker(LIMIT_BINDING)); - PreparedStatement getSlice = this.session.prepare(addTTLFunction(addTimestampFunction(getSliceSelect)).build()); - - // @formatter:off - final Select getMultiColumnSelect = selectFrom(this.storeManager.getKeyspaceName(), this.tableName) - .column(COLUMN_COLUMN_NAME) - .column(VALUE_COLUMN_NAME) - .where( - Relation.column(KEY_COLUMN_NAME).isEqualTo(bindMarker(KEY_BINDING)), - Relation.column(COLUMN_COLUMN_NAME).in(bindMarker(COLUMN_BINDING)) - ) - .limit(bindMarker(LIMIT_BINDING)); - PreparedStatement getMultiColumn = this.session.prepare(addTTLFunction(addTimestampFunction(getMultiColumnSelect)).build()); - if (this.storeManager.getFeatures().hasOrderedScan()) { final Select getKeysRangedSelect = selectFrom(this.storeManager.getKeyspaceName(), this.tableName) .column(KEY_COLUMN_NAME) @@ -268,19 +237,10 @@ public CQLKeyColumnValueStore(final CQLStoreManager storeManager, final String t this.insertColumnWithTTL = null; } - ExecutorService executorService = storeManager.getExecutorService(); queryBackPressure = storeManager.getQueriesBackPressure(); - cqlSliceFunction = new AsyncCQLSliceFunction(session, getSlice, getter, executorService, queryBackPressure); - - if(configuration.get(SLICE_GROUPING_ALLOWED)){ - cqlMultiColumnFunction = new AsyncCQLMultiColumnFunction(session, getMultiColumn, getter, executorService, queryBackPressure); - sliceGroupingAllowed = true; - } else { - cqlMultiColumnFunction = null; - sliceGroupingAllowed = false; - } - sliceGroupingLimit = configuration.get(SLICE_GROUPING_LIMIT); + asyncQueryExecutionService = new GroupingAsyncQueryExecutionService(configuration, storeManager, tableName, + this::addTTLFunction, this::addTimestampFunction, singleKeyGetter, multiKeysGetter); // @formatter:on } @@ -416,7 +376,7 @@ public String getName() { @Override public EntryList getSlice(final KeySliceQuery query, final StoreTransaction txh) throws BackendException { try { - return cqlSliceFunction.execute(query, txh).get(); + return asyncQueryExecutionService.executeSingleKeySingleSlice(query, txh).get(); } catch (Throwable throwable){ throw EXCEPTION_MAPPER.apply(throwable); } @@ -425,11 +385,7 @@ public EntryList getSlice(final KeySliceQuery query, final StoreTransaction txh) @Override public Map getSlice(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { try { - Map> futureResult = new HashMap<>(keys.size()); - for(StaticBuffer key : keys){ - futureResult.put(key, cqlSliceFunction.execute(new KeySliceQuery(key, query), txh)); - } - return CompletableFutureUtil.unwrap(futureResult); + return CompletableFutureUtil.unwrap(asyncQueryExecutionService.executeMultiKeySingleSlice(keys, query, txh)); } catch (Throwable e) { throw EXCEPTION_MAPPER.apply(e); } @@ -443,101 +399,12 @@ public Map getSlice(final List keys, fina @Override public Map> getMultiSlices(final MultiKeysQueryGroups multiSliceQueriesForKeys, StoreTransaction txh) throws BackendException { try { - final Map>> futureResult = new HashMap<>(multiSliceQueriesForKeys.getMultiQueryContext().getTotalAmountOfQueries()); - if(sliceGroupingAllowed){ - fillMultiSlicesWithGrouping(futureResult, multiSliceQueriesForKeys, txh); - } else { - fillMultiSlicesWithoutGrouping(futureResult, multiSliceQueriesForKeys, txh); - } - return CompletableFutureUtil.unwrapMapOfMaps(futureResult); + return CompletableFutureUtil.unwrapMapOfMaps(asyncQueryExecutionService.executeMultiKeyMultiSlice(multiSliceQueriesForKeys, txh)); } catch (Throwable e) { throw EXCEPTION_MAPPER.apply(e); } } - private void fillMultiSlicesWithoutGrouping(final Map>> futureResult, - final MultiKeysQueryGroups multiSliceQueriesForKeys, - final StoreTransaction txh){ - for(KeysQueriesGroup queryGroup : multiSliceQueriesForKeys.getQueryGroups()){ - executeRangeQueries(futureResult, queryGroup.getKeysGroup(), queryGroup.getQueries(), txh); - } - } - - private void executeRangeQueries(final Map>> futureResult, - Collection keys, - Collection queries, - final StoreTransaction txh){ - for(SliceQuery query : queries){ - Map> futureQueryResult = futureResult.computeIfAbsent(query, sliceQuery -> new HashMap<>(keys.size())); - for(StaticBuffer key : keys){ - futureQueryResult.put(key, cqlSliceFunction.execute(new KeySliceQuery(key, query), txh)); - } - } - } - - private void fillMultiSlicesWithGrouping(final Map>> futureResult, - final MultiKeysQueryGroups multiSliceQueriesForKeys, - final StoreTransaction txh){ - for(KeysQueriesGroup queryGroup : multiSliceQueriesForKeys.getQueryGroups()){ - Collection keys = queryGroup.getKeysGroup(); - - Map> directEqualityGroupedQueriesByLimit = new HashMap<>(); - List separateRangeQueries = new ArrayList<>(); - for(SliceQuery query : queryGroup.getQueries()){ - if(query.isDirectColumnByStartOnlyAllowed()){ - List directEqualityQueries = directEqualityGroupedQueriesByLimit.get(query.getLimit()); - if(directEqualityQueries == null){ - directEqualityQueries = new ArrayList<>(multiSliceQueriesForKeys.getQueryGroups().size()); - directEqualityQueries.add(query); - directEqualityGroupedQueriesByLimit.put(query.getLimit(), directEqualityQueries); - } else if(directEqualityQueries.size() < sliceGroupingLimit && (!query.hasLimit() || directEqualityQueries.size() < query.getLimit())){ - // We cannot group more than `query.getLimit()` queries together. - // Even so it seems that it makes sense to group them together because we don't need - // more column values than limit - we are still obliged to compute the result because - // any separate SliceQuery can be cached into a tx-cache or db-cache with incomplete result - // which may result in the wrong results for future calls of the cached Slice queries. - // Thus, we add a query into the group only if it doesn't have any limit set OR the total - // amount of grouped together direct equality queries is <= than the limit requested. - // I.e. in other words, we must ensure that the limit won't influence the final result. - directEqualityQueries.add(query); - } else { - // In this case we couldn't group a query. Thus, we should execute this query separately. - separateRangeQueries.add(query); - } - } else { - // We cannot group range queries together. Thus, they are executed separately. - separateRangeQueries.add(query); - } - } - - // execute grouped queries - for(Map.Entry> sliceQueriesGroup : directEqualityGroupedQueriesByLimit.entrySet()){ - List queryStarts = sliceQueriesGroup.getValue().stream().map(q -> q.getSliceStart().asByteBuffer()).collect(Collectors.toList()); - for(StaticBuffer key : keys){ - CompletableFuture multiColumnResult = cqlMultiColumnFunction.execute(new KeyMultiColumnQuery(key.asByteBuffer(), queryStarts, sliceQueriesGroup.getKey()), txh); - Map> queryKeyFutureResult = new HashMap<>(sliceQueriesGroup.getValue().size()); - for(SliceQuery query : sliceQueriesGroup.getValue()){ - CompletableFuture futureQueryKeyResult = new CompletableFuture<>(); - queryKeyFutureResult.put(query, futureQueryKeyResult); - futureResult.computeIfAbsent(query, sliceQuery -> new HashMap<>(keys.size())).put(key, futureQueryKeyResult); - } - multiColumnResult.whenComplete((entries, throwable) -> { - if (throwable == null){ - Map columnToFilteredResult = new HashMap<>(sliceQueriesGroup.getValue().size()); - entries.forEach(entry -> columnToFilteredResult.computeIfAbsent(entry.getColumn(), column -> new EntryArrayList()).add(entry)); - queryKeyFutureResult.forEach((query, futureQueryResult) -> futureQueryResult.complete(columnToFilteredResult.getOrDefault(query.getSliceStart(), EntryList.EMPTY_LIST))); - } else { - queryKeyFutureResult.values().forEach(futureQueryResult -> futureQueryResult.completeExceptionally(throwable)); - } - }); - } - } - - // execute non-grouped queries - executeRangeQueries(futureResult, keys, separateRangeQueries, txh); - } - } - public BatchableStatement deleteColumn(final StaticBuffer key, final StaticBuffer column) { return deleteColumn(key, column, null); } @@ -593,7 +460,7 @@ public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh TokenMap tokenMap = this.session.getMetadata().getTokenMap().get(); return Try.of(() -> new CQLResultSetKeyIterator( query, - this.getter, + this.singleKeyGetter, new CQLPagingIterator( getKeysRanged.boundStatementBuilder() .setToken(KEY_START_BINDING, tokenMap.newToken(query.getKeyStart().asByteBuffer())) @@ -613,7 +480,7 @@ public KeyIterator getKeys(final SliceQuery query, final StoreTransaction txh) t return Try.of(() -> new CQLResultSetKeyIterator( query, - this.getter, + this.singleKeyGetter, new CQLPagingIterator( getKeysAll.boundStatementBuilder() .setByteBuffer(SLICE_START_BINDING, query.getSliceStart().asByteBuffer()) diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java index c3eb8f725bf..8283363a7e5 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java @@ -234,15 +234,15 @@ void initializeKeyspace(){ .build()); } - ExecutorService getExecutorService() { + public ExecutorService getExecutorService() { return executorService; } - CqlSession getSession() { + public CqlSession getSession() { return this.session; } - String getKeyspaceName() { + public String getKeyspaceName() { return this.keyspace; } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/QueryGroups.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/QueryGroups.java new file mode 100644 index 00000000000..045b9a7dcd3 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/QueryGroups.java @@ -0,0 +1,39 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; + +import java.util.List; +import java.util.Map; + +public class QueryGroups { + + private final Map> directEqualityGroupedQueriesByLimit; + private final List separateRangeQueries; + + public QueryGroups(Map> directEqualityGroupedQueriesByLimit, List separateRangeQueries) { + this.directEqualityGroupedQueriesByLimit = directEqualityGroupedQueriesByLimit; + this.separateRangeQueries = separateRangeQueries; + } + + public Map> getDirectEqualityGroupedQueriesByLimit() { + return directEqualityGroupedQueriesByLimit; + } + + public List getSeparateRangeQueries() { + return separateRangeQueries; + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLFunction.java index da7c327b93f..b934e1ba112 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLFunction.java @@ -69,7 +69,6 @@ public CompletableFuture execute(Q query, StoreTransaction txh) { queryBackPressure.acquireBeforeQuery(); - try{ this.session.executeAsync(bindMarkers(query, this.getSlice.boundStatementBuilder()) .setConsistencyLevel(getTransaction(txh).getReadConsistencyLevel()).build()) diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiKeyMultiColumnFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiKeyMultiColumnFunction.java new file mode 100644 index 00000000000..84b58f5a2a2 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiKeyMultiColumnFunction.java @@ -0,0 +1,47 @@ +// Copyright 2021 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.function.slice; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import org.janusgraph.diskstorage.cql.CQLColValGetter; +import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; +import org.janusgraph.diskstorage.cql.query.MultiKeysMultiColumnQuery; +import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; + +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; + +public class AsyncCQLMultiKeyMultiColumnFunction extends AsyncCQLFunction{ + public AsyncCQLMultiKeyMultiColumnFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { + super(session, getSlice, getter, executorService, queryBackPressure); + } + + @Override + BoundStatementBuilder bindMarkers(MultiKeysMultiColumnQuery query, BoundStatementBuilder statementBuilder) { + return statementBuilder + .setList(CQLKeyColumnValueStore.KEY_BINDING, query.getKeys(), ByteBuffer.class) + .setList(CQLKeyColumnValueStore.COLUMN_BINDING, query.getColumns(), ByteBuffer.class) + .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()) + .setRoutingToken(query.getRoutingToken()) + // usually routing key isn't needed when routingToken is specified, + // but in some cases different driver implementations (like ScyllaDB) + // or load balancing strategies may relay on routing key instead. + // Thus, we also set routing key here to any of the keys because we + // are sure they all need to be executed on the same node (or shard). + .setRoutingKey(query.getKeys().get(0)); + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiKeySliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiKeySliceFunction.java new file mode 100644 index 00000000000..78a07653815 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiKeySliceFunction.java @@ -0,0 +1,48 @@ +// Copyright 2021 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.function.slice; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import org.janusgraph.diskstorage.cql.CQLColValGetter; +import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; +import org.janusgraph.diskstorage.cql.query.MultiKeysSingleSliceQuery; +import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; + +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; + +public class AsyncCQLMultiKeySliceFunction extends AsyncCQLFunction{ + public AsyncCQLMultiKeySliceFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { + super(session, getSlice, getter, executorService, queryBackPressure); + } + + @Override + BoundStatementBuilder bindMarkers(MultiKeysSingleSliceQuery query, BoundStatementBuilder statementBuilder) { + return statementBuilder + .setList(CQLKeyColumnValueStore.KEY_BINDING, query.getKeys(), ByteBuffer.class) + .setByteBuffer(CQLKeyColumnValueStore.SLICE_START_BINDING, query.getQuery().getSliceStart().asByteBuffer()) + .setByteBuffer(CQLKeyColumnValueStore.SLICE_END_BINDING, query.getQuery().getSliceEnd().asByteBuffer()) + .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()) + .setRoutingToken(query.getRoutingToken()) + // usually routing key isn't needed when routingToken is specified, + // but in some cases different driver implementations (like ScyllaDB) + // or load balancing strategies may relay on routing key instead. + // Thus, we also set routing key here to any of the keys because we + // are sure they all need to be executed on the same node (or shard). + .setRoutingKey(query.getKeys().get(0)); + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiColumnFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSingleKeyMultiColumnFunction.java similarity index 73% rename from janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiColumnFunction.java rename to janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSingleKeyMultiColumnFunction.java index 8865886d53f..45801487432 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLMultiColumnFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSingleKeyMultiColumnFunction.java @@ -19,21 +19,22 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement; import org.janusgraph.diskstorage.cql.CQLColValGetter; import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; -import org.janusgraph.diskstorage.keycolumnvalue.KeyMultiColumnQuery; +import org.janusgraph.diskstorage.cql.query.SingleKeyMultiColumnQuery; import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; -public class AsyncCQLMultiColumnFunction extends AsyncCQLFunction{ - public AsyncCQLMultiColumnFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { +public class AsyncCQLSingleKeyMultiColumnFunction extends AsyncCQLFunction{ + public AsyncCQLSingleKeyMultiColumnFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { super(session, getSlice, getter, executorService, queryBackPressure); } @Override - BoundStatementBuilder bindMarkers(KeyMultiColumnQuery query, BoundStatementBuilder statementBuilder) { + BoundStatementBuilder bindMarkers(SingleKeyMultiColumnQuery query, BoundStatementBuilder statementBuilder) { return statementBuilder.setByteBuffer(CQLKeyColumnValueStore.KEY_BINDING, query.getKey()) .setList(CQLKeyColumnValueStore.COLUMN_BINDING, query.getColumns(), ByteBuffer.class) - .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()); + .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()) + .setRoutingKey(query.getKey()); } } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSingleKeySliceFunction.java similarity index 79% rename from janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java rename to janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSingleKeySliceFunction.java index 3e2e8b9ef88..62524db8cae 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSingleKeySliceFunction.java @@ -22,18 +22,21 @@ import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; -public class AsyncCQLSliceFunction extends AsyncCQLFunction{ - public AsyncCQLSliceFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { +public class AsyncCQLSingleKeySliceFunction extends AsyncCQLFunction{ + public AsyncCQLSingleKeySliceFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { super(session, getSlice, getter, executorService, queryBackPressure); } @Override BoundStatementBuilder bindMarkers(KeySliceQuery query, BoundStatementBuilder statementBuilder) { - return statementBuilder.setByteBuffer(CQLKeyColumnValueStore.KEY_BINDING, query.getKey().asByteBuffer()) + ByteBuffer key = query.getKey().asByteBuffer(); + return statementBuilder.setByteBuffer(CQLKeyColumnValueStore.KEY_BINDING, key) .setByteBuffer(CQLKeyColumnValueStore.SLICE_START_BINDING, query.getSliceStart().asByteBuffer()) .setByteBuffer(CQLKeyColumnValueStore.SLICE_END_BINDING, query.getSliceEnd().asByteBuffer()) - .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()); + .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()) + .setRoutingKey(key); } } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/MultiKeysMultiColumnQuery.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/MultiKeysMultiColumnQuery.java new file mode 100644 index 00000000000..f47c9b1c29e --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/MultiKeysMultiColumnQuery.java @@ -0,0 +1,52 @@ +// Copyright 2021 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.query; + +import com.datastax.oss.driver.api.core.metadata.token.Token; +import org.janusgraph.graphdb.query.BaseQuery; + +import java.nio.ByteBuffer; +import java.util.List; + +public class MultiKeysMultiColumnQuery extends BaseQuery { + private final Token routingToken; + private final List keys; + private final List columns; + + public MultiKeysMultiColumnQuery(Token routingToken, List keys, List columns, int limit) { + super(limit); + this.routingToken = routingToken; + this.keys = keys; + this.columns = columns; + } + + public List getKeys() { + return keys; + } + + public List getColumns() { + return columns; + } + + @Override + public MultiKeysMultiColumnQuery setLimit(final int limit) { + super.setLimit(limit); + return this; + } + + public Token getRoutingToken() { + return routingToken; + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/MultiKeysSingleSliceQuery.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/MultiKeysSingleSliceQuery.java new file mode 100644 index 00000000000..45bf71957ef --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/MultiKeysSingleSliceQuery.java @@ -0,0 +1,53 @@ +// Copyright 2021 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.query; + +import com.datastax.oss.driver.api.core.metadata.token.Token; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.graphdb.query.BaseQuery; + +import java.nio.ByteBuffer; +import java.util.List; + +public class MultiKeysSingleSliceQuery extends BaseQuery { + private final Token routingToken; + private final List keys; + private final SliceQuery query; + + public MultiKeysSingleSliceQuery(Token routingToken, List keys, SliceQuery query, int limit) { + super(limit); + this.routingToken = routingToken; + this.keys = keys; + this.query = query; + } + + public List getKeys() { + return keys; + } + + public SliceQuery getQuery() { + return query; + } + + @Override + public MultiKeysSingleSliceQuery setLimit(final int limit) { + super.setLimit(limit); + return this; + } + + public Token getRoutingToken() { + return routingToken; + } +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyMultiColumnQuery.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/SingleKeyMultiColumnQuery.java similarity index 80% rename from janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyMultiColumnQuery.java rename to janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/SingleKeyMultiColumnQuery.java index e369d23003e..a1de766d02e 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyMultiColumnQuery.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/query/SingleKeyMultiColumnQuery.java @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package org.janusgraph.diskstorage.keycolumnvalue; +package org.janusgraph.diskstorage.cql.query; import org.janusgraph.graphdb.query.BaseQuery; import java.nio.ByteBuffer; import java.util.List; -public class KeyMultiColumnQuery extends BaseQuery { +public class SingleKeyMultiColumnQuery extends BaseQuery { private final ByteBuffer key; private final List columns; - public KeyMultiColumnQuery(ByteBuffer key, List columns, int limit) { + public SingleKeyMultiColumnQuery(ByteBuffer key, List columns, int limit) { super(limit); this.key = key; this.columns = columns; @@ -38,7 +38,7 @@ public List getColumns() { } @Override - public KeyMultiColumnQuery setLimit(final int limit) { + public SingleKeyMultiColumnQuery setLimit(final int limit) { super.setLimit(limit); return this; } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/service/AsyncQueryExecutionService.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/service/AsyncQueryExecutionService.java new file mode 100644 index 00000000000..095a2d460f1 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/service/AsyncQueryExecutionService.java @@ -0,0 +1,32 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.service; + +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.MultiKeysQueryGroups; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface AsyncQueryExecutionService { + CompletableFuture executeSingleKeySingleSlice(final KeySliceQuery query, final StoreTransaction txh); + Map> executeMultiKeySingleSlice(final List keys, final SliceQuery query, final StoreTransaction txh); + Map>> executeMultiKeyMultiSlice(final MultiKeysQueryGroups multiSliceQueriesForKeys, StoreTransaction txh); +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/service/GroupingAsyncQueryExecutionService.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/service/GroupingAsyncQueryExecutionService.java new file mode 100644 index 00000000000..f4cfa412a59 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/service/GroupingAsyncQueryExecutionService.java @@ -0,0 +1,433 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.service; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.querybuilder.relation.Relation; +import com.datastax.oss.driver.api.querybuilder.select.Select; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.cql.CQLColValGetter; +import org.janusgraph.diskstorage.cql.CQLStoreManager; +import org.janusgraph.diskstorage.cql.QueryGroups; +import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLMultiKeyMultiColumnFunction; +import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLMultiKeySliceFunction; +import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLSingleKeyMultiColumnFunction; +import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLSingleKeySliceFunction; +import org.janusgraph.diskstorage.cql.query.MultiKeysMultiColumnQuery; +import org.janusgraph.diskstorage.cql.query.MultiKeysSingleSliceQuery; +import org.janusgraph.diskstorage.cql.query.SingleKeyMultiColumnQuery; +import org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategy; +import org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategyBuilder; +import org.janusgraph.diskstorage.cql.strategy.ResultFiller; +import org.janusgraph.diskstorage.cql.util.CQLSliceQueryUtil; +import org.janusgraph.diskstorage.cql.util.KeysGroup; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeysQueriesGroup; +import org.janusgraph.diskstorage.keycolumnvalue.MultiKeysQueryGroups; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.EntryArrayList; +import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_ALLOWED; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_CLASS; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_LIMIT; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_MIN; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.SLICE_GROUPING_ALLOWED; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.SLICE_GROUPING_LIMIT; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.COLUMN_BINDING; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.COLUMN_COLUMN_NAME; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.KEY_BINDING; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.KEY_COLUMN_NAME; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.LIMIT_BINDING; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.SLICE_END_BINDING; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.SLICE_START_BINDING; +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.VALUE_COLUMN_NAME; + +public class GroupingAsyncQueryExecutionService implements AsyncQueryExecutionService { + + private final ResultFiller>, SliceQuery, KeysGroup> SINGLE_QUERY_WITH_KEYS_GROUPING_FILLER; + private final ResultFiller>, SliceQuery, List> SINGLE_QUERY_WITHOUT_KEYS_GROUPING_FILLER; + private final ResultFiller>>, QueryGroups, KeysGroup> MULTI_QUERY_WITH_KEYS_GROUPING_FILLER; + private final ResultFiller>>, QueryGroups, List> MULTI_QUERY_WITHOUT_KEYS_GROUPING_FILLER; + + private final AsyncCQLSingleKeySliceFunction cqlSingleKeySliceFunction; + private final AsyncCQLSingleKeyMultiColumnFunction cqlSingleKeyMultiColumnFunction; + private final AsyncCQLMultiKeySliceFunction cqlMultiKeySliceFunction; + private final AsyncCQLMultiKeyMultiColumnFunction cqlMultiKeyMultiColumnFunction; + private final boolean sliceGroupingAllowed; + private final int sliceGroupingLimit; + private final boolean keysGroupingAllowed; + private final int keysGroupingLimit; + private final int keysGroupingMin; + private final GroupedExecutionStrategy groupedExecutionStrategy; + + public GroupingAsyncQueryExecutionService(Configuration configuration, + final CQLStoreManager storeManager, + String tableName, + Function addTTLFunction, + Function addTimestampFunction, + CQLColValGetter singleKeyGetter, + CQLColValGetter multiKeysGetter) { + + sliceGroupingLimit = Math.max(1, configuration.get(SLICE_GROUPING_LIMIT)); + keysGroupingLimit = Math.max(1, configuration.get(KEYS_GROUPING_LIMIT)); + keysGroupingMin = Math.max(2, configuration.get(KEYS_GROUPING_MIN)); + keysGroupingAllowed = keysGroupingLimit > 1 && configuration.get(KEYS_GROUPING_ALLOWED); + sliceGroupingAllowed = sliceGroupingLimit > 1 && configuration.get(SLICE_GROUPING_ALLOWED); + String keyspaceName = storeManager.getKeyspaceName(); + CqlSession session = storeManager.getSession(); + ExecutorService executorService = storeManager.getExecutorService(); + QueryBackPressure queryBackPressure = storeManager.getQueriesBackPressure(); + + // @formatter:off + final Select getSliceSelect = selectFrom(keyspaceName, tableName) + .column(COLUMN_COLUMN_NAME) + .column(VALUE_COLUMN_NAME) + .where( + Relation.column(KEY_COLUMN_NAME).isEqualTo(bindMarker(KEY_BINDING)), + Relation.column(COLUMN_COLUMN_NAME).isGreaterThanOrEqualTo(bindMarker(SLICE_START_BINDING)), + Relation.column(COLUMN_COLUMN_NAME).isLessThan(bindMarker(SLICE_END_BINDING)) + ) + .limit(bindMarker(LIMIT_BINDING)); + PreparedStatement getSlice = session.prepare(addTTLFunction.apply(addTimestampFunction.apply(getSliceSelect)).build()); + cqlSingleKeySliceFunction = new AsyncCQLSingleKeySliceFunction(session, getSlice, singleKeyGetter, executorService, queryBackPressure); + + if(sliceGroupingAllowed){ + // @formatter:off + final Select getMultiColumnSelect = selectFrom(keyspaceName, tableName) + .column(COLUMN_COLUMN_NAME) + .column(VALUE_COLUMN_NAME) + .where( + Relation.column(KEY_COLUMN_NAME).isEqualTo(bindMarker(KEY_BINDING)), + Relation.column(COLUMN_COLUMN_NAME).in(bindMarker(COLUMN_BINDING)) + ) + .limit(bindMarker(LIMIT_BINDING)); + PreparedStatement getMultiColumn = session.prepare(addTTLFunction.apply(addTimestampFunction.apply(getMultiColumnSelect)).build()); + cqlSingleKeyMultiColumnFunction = new AsyncCQLSingleKeyMultiColumnFunction(session, getMultiColumn, singleKeyGetter, executorService, queryBackPressure); + } else { + cqlSingleKeyMultiColumnFunction = null; + } + + if(keysGroupingAllowed){ + // @formatter:off + final Select getMultiKeySliceSelect = selectFrom(keyspaceName, tableName) + .column(KEY_COLUMN_NAME) + .column(COLUMN_COLUMN_NAME) + .column(VALUE_COLUMN_NAME) + .where( + Relation.column(KEY_COLUMN_NAME).in(bindMarker(KEY_BINDING)), + Relation.column(COLUMN_COLUMN_NAME).isGreaterThanOrEqualTo(bindMarker(SLICE_START_BINDING)), + Relation.column(COLUMN_COLUMN_NAME).isLessThan(bindMarker(SLICE_END_BINDING)) + ) + .perPartitionLimit(bindMarker(LIMIT_BINDING)); + PreparedStatement getMultiKeySlice = session.prepare(addTTLFunction.apply(addTimestampFunction.apply(getMultiKeySliceSelect)).build()); + cqlMultiKeySliceFunction = new AsyncCQLMultiKeySliceFunction(session, getMultiKeySlice, multiKeysGetter, executorService, queryBackPressure); + + if(sliceGroupingAllowed){ + // @formatter:off + final Select getMultiKeyMultiColumnSelect = selectFrom(keyspaceName, tableName) + .column(KEY_COLUMN_NAME) + .column(COLUMN_COLUMN_NAME) + .column(VALUE_COLUMN_NAME) + .where( + Relation.column(KEY_COLUMN_NAME).in(bindMarker(KEY_BINDING)), + Relation.column(COLUMN_COLUMN_NAME).in(bindMarker(COLUMN_BINDING)) + ) + .perPartitionLimit(bindMarker(LIMIT_BINDING)); + PreparedStatement getMultiKeyMultiColumn = session.prepare(addTTLFunction.apply(addTimestampFunction.apply(getMultiKeyMultiColumnSelect)).build()); + cqlMultiKeyMultiColumnFunction = new AsyncCQLMultiKeyMultiColumnFunction(session, getMultiKeyMultiColumn, multiKeysGetter, executorService, queryBackPressure); + } else { + cqlMultiKeyMultiColumnFunction = null; + } + + } else { + cqlMultiKeySliceFunction = null; + cqlMultiKeyMultiColumnFunction = null; + } + + SINGLE_QUERY_WITH_KEYS_GROUPING_FILLER = this::fillSingleQueryWithKeysGrouping; + SINGLE_QUERY_WITHOUT_KEYS_GROUPING_FILLER = this::fillSingleQueryWithoutKeysGrouping; + MULTI_QUERY_WITH_KEYS_GROUPING_FILLER = this::fillMultiQueryWithKeysGrouping; + MULTI_QUERY_WITHOUT_KEYS_GROUPING_FILLER = this::fillMultiQueryWithoutKeysGrouping; + + groupedExecutionStrategy = GroupedExecutionStrategyBuilder.build(configuration, storeManager, configuration.get(KEYS_GROUPING_CLASS)); + } + + @Override + public CompletableFuture executeSingleKeySingleSlice(final KeySliceQuery query, final StoreTransaction txh) { + return cqlSingleKeySliceFunction.execute(query, txh); + } + + @Override + public Map> executeMultiKeySingleSlice(final List keys, final SliceQuery query, final StoreTransaction txh) { + Map> futureResult = new HashMap<>(keys.size()); + if(isKeysGroupingAllowed(keys)){ + groupedExecutionStrategy.execute( + futureResult, + query, + keys, + SINGLE_QUERY_WITH_KEYS_GROUPING_FILLER, + SINGLE_QUERY_WITHOUT_KEYS_GROUPING_FILLER, + txh, + keysGroupingLimit + ); + } else { + fillSingleQueryWithoutKeysGrouping( + futureResult, + query, + keys, + txh + ); + } + return futureResult; + } + + @Override + public Map>> executeMultiKeyMultiSlice(final MultiKeysQueryGroups multiSliceQueriesForKeys, StoreTransaction txh) { + final Map>> futureResult = new HashMap<>(multiSliceQueriesForKeys.getMultiQueryContext().getTotalAmountOfQueries()); + if(sliceGroupingAllowed){ + fillMultiSlicesWithQueryGrouping(futureResult, multiSliceQueriesForKeys, txh); + } else { + fillMultiSlicesWithoutQueryGrouping(futureResult, multiSliceQueriesForKeys, txh); + } + return futureResult; + } + + private void fillMultiSlicesWithoutQueryGrouping(final Map>> futureResult, + final MultiKeysQueryGroups multiSliceQueriesForKeys, + final StoreTransaction txh){ + for(KeysQueriesGroup queryGroup : multiSliceQueriesForKeys.getQueryGroups()){ + List keys = queryGroup.getKeysGroup(); + if(isKeysGroupingAllowed(keys)){ + for(SliceQuery query : queryGroup.getQueries()){ + groupedExecutionStrategy.execute( + futureResult.computeIfAbsent(query, q -> new HashMap<>(keys.size())), + query, + keys, + SINGLE_QUERY_WITH_KEYS_GROUPING_FILLER, + SINGLE_QUERY_WITHOUT_KEYS_GROUPING_FILLER, + txh, + keysGroupingLimit + ); + } + } else { + for(SliceQuery query : queryGroup.getQueries()){ + fillSingleQueryWithoutKeysGrouping( + futureResult.computeIfAbsent(query, q -> new HashMap<>(keys.size())), + query, + keys, + txh + ); + } + } + } + } + + private void fillMultiSlicesWithQueryGrouping(final Map>> futureResult, + final MultiKeysQueryGroups multiSliceQueriesForKeys, + final StoreTransaction txh){ + + for(KeysQueriesGroup queryGroup : multiSliceQueriesForKeys.getQueryGroups()){ + List keys = queryGroup.getKeysGroup(); + QueryGroups queryGroups = CQLSliceQueryUtil.getQueriesGroupedByDirectEqualityQueries(queryGroup, multiSliceQueriesForKeys.getQueryGroups().size(), sliceGroupingLimit); + if(isKeysGroupingAllowed(keys)){ + groupedExecutionStrategy.execute(futureResult, queryGroups, keys, + MULTI_QUERY_WITH_KEYS_GROUPING_FILLER, MULTI_QUERY_WITHOUT_KEYS_GROUPING_FILLER, + txh, keysGroupingLimit); + } else { + fillMultiQueryWithoutKeysGrouping(futureResult, queryGroups, keys, txh); + } + } + } + + private void fillMultiQueryWithKeysGrouping(final Map>> futureResult, + QueryGroups queryGroups, + KeysGroup keysGroup, + final StoreTransaction txh){ + + // execute grouped queries + for(Map.Entry> sliceQueriesGroup : queryGroups.getDirectEqualityGroupedQueriesByLimit().entrySet()){ + int limit = sliceQueriesGroup.getKey(); + List queryStarts = new ArrayList<>(sliceQueriesGroup.getValue().size()); + Map columnToQueryMap = new HashMap<>(sliceQueriesGroup.getValue().size()); + for(SliceQuery sliceQuery : sliceQueriesGroup.getValue()){ + StaticBuffer column = sliceQuery.getSliceStart(); + queryStarts.add(column.asByteBuffer()); + columnToQueryMap.put(column, sliceQuery); + } + + CompletableFuture multiKeyMultiColumnResult = cqlMultiKeyMultiColumnFunction.execute(new MultiKeysMultiColumnQuery(keysGroup.getRoutingToken(), keysGroup.getRawKeys(), queryStarts, limit), txh); + Map>> partialResultToCompute = new HashMap<>(queryStarts.size()); + for(SliceQuery sliceQuery : sliceQueriesGroup.getValue()){ + Map> perKeyQueryPartialResult = new HashMap<>(keysGroup.size()); + partialResultToCompute.put(sliceQuery, perKeyQueryPartialResult); + Map> perKeyQueryFutureResult = futureResult.computeIfAbsent(sliceQuery, q -> new HashMap<>(keysGroup.size())); + for(StaticBuffer key : keysGroup.getKeys()){ + CompletableFuture future = new CompletableFuture<>(); + perKeyQueryFutureResult.put(key, future); + perKeyQueryPartialResult.put(key, future); + } + } + multiKeyMultiColumnResult.whenComplete((entries, throwable) -> { + if (throwable == null){ + + Map> returnedResult = new HashMap<>(partialResultToCompute.size()); + for(Entry entry : entries){ + StaticBuffer column = entry.getColumn(); + StaticBuffer key = (StaticBuffer) entry.getMetaData().get(EntryMetaData.ROW_KEY); + assert key != null; + SliceQuery query = columnToQueryMap.get(column); + returnedResult.computeIfAbsent(query, q -> new HashMap<>(keysGroup.size())).computeIfAbsent(key, k -> new EntryArrayList()).add(entry); + } + + for(Map.Entry>> futureResultEntry : partialResultToCompute.entrySet()){ + SliceQuery query = futureResultEntry.getKey(); + Map> futureKeysResults = futureResultEntry.getValue(); + Map queryResults = returnedResult.get(query); + if(queryResults == null){ + for(CompletableFuture keyResult : futureKeysResults.values()){ + keyResult.complete(EntryList.EMPTY_LIST); + } + } else { + for(Map.Entry> futureKeyResultEntry : futureKeysResults.entrySet()){ + futureKeyResultEntry.getValue().complete(queryResults.getOrDefault(futureKeyResultEntry.getKey(), EntryList.EMPTY_LIST)); + } + } + + } + + } else { + partialResultToCompute.values().forEach(keysMapToFail -> keysMapToFail.values().forEach(futureToFail -> futureToFail.completeExceptionally(throwable))); + } + }); + } + + // execute non-grouped queries + for(SliceQuery separateQuery : queryGroups.getSeparateRangeQueries()){ + Map> perKeyQueryFutureResult = futureResult.computeIfAbsent(separateQuery, q -> new HashMap<>(keysGroup.size())); + fillSingleQueryWithKeysGrouping( + perKeyQueryFutureResult, + separateQuery, + keysGroup, + txh + ); + } + } + + private void fillMultiQueryWithoutKeysGrouping(final Map>> futureResult, + QueryGroups queryGroups, + List keys, + final StoreTransaction txh){ + + // execute grouped queries + for(Map.Entry> sliceQueriesGroup : queryGroups.getDirectEqualityGroupedQueriesByLimit().entrySet()){ + List queryStarts = new ArrayList<>(sliceQueriesGroup.getValue().size()); + for(SliceQuery sliceQuery : sliceQueriesGroup.getValue()){ + queryStarts.add(sliceQuery.getSliceStart().asByteBuffer()); + futureResult.computeIfAbsent(sliceQuery, q -> new HashMap<>(keys.size())); + } + for(StaticBuffer key : keys){ + CompletableFuture multiColumnResult = cqlSingleKeyMultiColumnFunction.execute(new SingleKeyMultiColumnQuery(key.asByteBuffer(), queryStarts, sliceQueriesGroup.getKey()), txh); + Map> queryKeyFutureResult = new HashMap<>(sliceQueriesGroup.getValue().size()); + for(SliceQuery query : sliceQueriesGroup.getValue()){ + CompletableFuture futureQueryKeyResult = new CompletableFuture<>(); + queryKeyFutureResult.put(query, futureQueryKeyResult); + futureResult.get(query).put(key, futureQueryKeyResult); + } + multiColumnResult.whenComplete((entries, throwable) -> { + if (throwable == null){ + Map columnToFilteredResult = new HashMap<>(sliceQueriesGroup.getValue().size()); + entries.forEach(entry -> columnToFilteredResult.computeIfAbsent(entry.getColumn(), c -> new EntryArrayList()).add(entry)); + queryKeyFutureResult.forEach((query, futureQueryResult) -> futureQueryResult.complete(columnToFilteredResult.getOrDefault(query.getSliceStart(), EntryList.EMPTY_LIST))); + } else { + queryKeyFutureResult.values().forEach(futureQueryResult -> futureQueryResult.completeExceptionally(throwable)); + } + }); + } + } + + // execute non-grouped queries + for(SliceQuery separateQuery : queryGroups.getSeparateRangeQueries()){ + Map> perKeyQueryFutureResult = futureResult.computeIfAbsent(separateQuery, q -> new HashMap<>(keys.size())); + fillSingleQueryWithoutKeysGrouping( + perKeyQueryFutureResult, + separateQuery, + keys, + txh + ); + } + } + + private void fillSingleQueryWithKeysGrouping(final Map> futureQueryResult, + final SliceQuery query, + final KeysGroup keysGroup, + final StoreTransaction txh){ + + CompletableFuture multiKeySingleSliceResult = cqlMultiKeySliceFunction.execute(new MultiKeysSingleSliceQuery(keysGroup.getRoutingToken(), keysGroup.getRawKeys(), query, query.getLimit()), txh); + Map> perKeyQueryPartialResult = new HashMap<>(keysGroup.size()); + for(StaticBuffer key : keysGroup.getKeys()){ + CompletableFuture futureKeyResult = new CompletableFuture<>(); + futureQueryResult.put(key, futureKeyResult); + perKeyQueryPartialResult.put(key, futureKeyResult); + } + + multiKeySingleSliceResult.whenComplete((entries, throwable) -> { + if (throwable == null){ + Map returnedResult = new HashMap<>(perKeyQueryPartialResult.size()); + for(Entry entry : entries){ + StaticBuffer key = (StaticBuffer) entry.getMetaData().get(EntryMetaData.ROW_KEY); + assert key != null; + returnedResult.computeIfAbsent(key, k -> new EntryArrayList()).add(entry); + } + for(Map.Entry> futureKeyResultEntry : perKeyQueryPartialResult.entrySet()){ + futureKeyResultEntry.getValue().complete(returnedResult.getOrDefault(futureKeyResultEntry.getKey(), EntryList.EMPTY_LIST)); + } + } else { + perKeyQueryPartialResult.values().forEach(futureToFail -> futureToFail.completeExceptionally(throwable)); + } + }); + } + + private void fillSingleQueryWithoutKeysGrouping(final Map> futureQueryResult, + final SliceQuery query, + final List keys, + final StoreTransaction txh){ + for(StaticBuffer key : keys){ + futureQueryResult.put(key, cqlSingleKeySliceFunction.execute(new KeySliceQuery(key, query), txh)); + } + } + + private boolean isKeysGroupingAllowed(List keys){ + return keysGroupingAllowed && keys.size() >= keysGroupingMin; + } + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategy.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategy.java new file mode 100644 index 00000000000..c07cbd9839a --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategy.java @@ -0,0 +1,33 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.strategy; + +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.cql.util.KeysGroup; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; + +import java.util.List; + +public interface GroupedExecutionStrategy { + + void execute(R futureResult, + Q queries, + List keys, + ResultFiller withKeysGroupingFiller, + ResultFiller> withoutKeysGroupingFiller, + StoreTransaction txh, + int keysGroupingLimit); + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategyBuilder.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategyBuilder.java new file mode 100644 index 00000000000..3363451f7dc --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategyBuilder.java @@ -0,0 +1,89 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.strategy; + +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.cql.CQLStoreManager; +import org.janusgraph.diskstorage.util.backpressure.builder.QueryBackPressureBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +public class GroupedExecutionStrategyBuilder { + + private static final Logger log = LoggerFactory.getLogger(QueryBackPressureBuilder.class); + + public static final String TOKEN_RANGE_AWARE = "tokenRangeAware"; + public static final String REPLICAS_AWARE = "replicasAware"; + + private GroupedExecutionStrategyBuilder(){} + + public static GroupedExecutionStrategy build(final Configuration configuration, final CQLStoreManager storeManager, final String className){ + + switch (className){ + case TOKEN_RANGE_AWARE: return new TokenRangeAwareGroupedExecutionStrategy(configuration, storeManager); + case REPLICAS_AWARE: return new ReplicasAwareGroupedExecutionStrategy(configuration, storeManager); + default: return buildFromClassName(className, configuration, storeManager); + } + } + + private static GroupedExecutionStrategy buildFromClassName(final String className, final Configuration configuration, final CQLStoreManager storeManager){ + + + Class implementationClass; + try { + implementationClass = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("No class found with class name: "+className); + } + + if(!GroupedExecutionStrategy.class.isAssignableFrom(implementationClass)){ + throw new IllegalArgumentException(className + "isn't a subclass of "+GroupedExecutionStrategy.class.getName()); + } + + final GroupedExecutionStrategy result; + Constructor constructorWithConfigurationAndSessionParams = null; + + for (Constructor constructor : implementationClass.getDeclaredConstructors()) { + if (constructor.getParameterCount() == 2 && Configuration.class.isAssignableFrom(constructor.getParameterTypes()[0]) && + CQLStoreManager.class.isAssignableFrom(constructor.getParameterTypes()[1])){ + constructorWithConfigurationAndSessionParams = constructor; + break; + } + } + + try { + + if(constructorWithConfigurationAndSessionParams == null){ + throw new IllegalArgumentException(className + " has no a public constructor which accepts " + +Configuration.class.getName() + " and "+CQLStoreManager.class.getName()+" parameters."); + } + + result = (GroupedExecutionStrategy) constructorWithConfigurationAndSessionParams.newInstance(configuration, storeManager); + + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException("Couldn't create a new instance of "+className + + ". Please, check that the constructor which accepts "+Configuration.class.getName()+" and "+CQLStoreManager.class.getName() + +" is public. If the necessary public constructor exists, please, check that invocation of this constructor doesn't throw an exception.", e); + } + + log.info("Initiated custom GroupedExecutionStrategy {}", className); + + return result; + } + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/ReplicasAwareGroupedExecutionStrategy.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/ReplicasAwareGroupedExecutionStrategy.java new file mode 100644 index 00000000000..846f1c1261c --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/ReplicasAwareGroupedExecutionStrategy.java @@ -0,0 +1,110 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.strategy; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.TokenMap; +import com.datastax.oss.driver.api.core.metadata.token.Token; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.cql.CQLStoreManager; +import org.janusgraph.diskstorage.cql.util.KeysGroup; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +/** + * Strategy which groups partition keys together if all their replicas (Replica set) are the same. + */ +public class ReplicasAwareGroupedExecutionStrategy implements GroupedExecutionStrategy{ + + private final CqlSession session; + private final CqlIdentifier keyspace; + public ReplicasAwareGroupedExecutionStrategy(Configuration configuration, CQLStoreManager storeManager){ + // `configuration` is ignored + this.session = storeManager.getSession(); + this.keyspace = CqlIdentifier.fromCql(storeManager.getKeyspaceName()); + } + + @Override + public void execute(R futureResult, + Q queries, + List keys, + ResultFiller withKeysGroupingFiller, + ResultFiller> withoutKeysGroupingFiller, + StoreTransaction txh, + int keysGroupingLimit){ + + Optional optionalTokenMap = session.getMetadata().getTokenMap(); + if(!optionalTokenMap.isPresent()){ + withoutKeysGroupingFiller.execute(futureResult, queries, keys, txh); + return; + } + + final int groupLimit = Math.min(keys.size(), keysGroupingLimit); + + TokenMap tokenMap = optionalTokenMap.get(); + + Map, KeysGroup> keyGroupBuildersByNodes = new HashMap<>(); + + for(StaticBuffer key : keys){ + ByteBuffer keyByteBuffer = key.asByteBuffer(); + Token token = tokenMap.newToken(keyByteBuffer); + Set replicas = toReplicasUUIDs(tokenMap, token); + if(replicas.isEmpty()){ + withKeysGroupingFiller.execute(futureResult, queries, new KeysGroup(Collections.singletonList(key), Collections.singletonList(keyByteBuffer), token), txh); + } else { + KeysGroup keyGroup = keyGroupBuildersByNodes.get(replicas); + if(keyGroup == null){ + keyGroup = new KeysGroup(groupLimit, token); + keyGroupBuildersByNodes.put(replicas, keyGroup); + } + keyGroup.addKey(key, keyByteBuffer); + if(keyGroup.size() >= groupLimit){ + keyGroupBuildersByNodes.put(replicas, new KeysGroup(groupLimit, token)); + withKeysGroupingFiller.execute(futureResult, queries, keyGroup, txh); + } + } + } + + for(KeysGroup keyGroup : keyGroupBuildersByNodes.values()){ + if(!keyGroup.isEmpty()){ + withKeysGroupingFiller.execute(futureResult, queries, keyGroup, txh); + } + } + } + + private Set toReplicasUUIDs(TokenMap tokenMap, Token token){ + Set replicas = tokenMap.getReplicas(keyspace, token); + if(replicas.isEmpty()){ + return Collections.emptySet(); + } + Set uuids = new HashSet<>(replicas.size()); + for(Node node : replicas){ + uuids.add(node.getHostId()); + } + return uuids; + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/ResultFiller.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/ResultFiller.java new file mode 100644 index 00000000000..a7245a44ed2 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/ResultFiller.java @@ -0,0 +1,24 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.strategy; + +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; + +public interface ResultFiller { + void execute(R futureResult, + Q queries, + K keys, + final StoreTransaction txh); +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/TokenRangeAwareGroupedExecutionStrategy.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/TokenRangeAwareGroupedExecutionStrategy.java new file mode 100644 index 00000000000..18ac7cc112c --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/strategy/TokenRangeAwareGroupedExecutionStrategy.java @@ -0,0 +1,102 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.strategy; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.TokenMap; +import com.datastax.oss.driver.api.core.metadata.token.Token; +import com.datastax.oss.driver.api.core.metadata.token.TokenRange; +import org.apache.commons.collections.CollectionUtils; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.cql.CQLStoreManager; +import org.janusgraph.diskstorage.cql.util.CQLSliceQueryUtil; +import org.janusgraph.diskstorage.cql.util.KeysGroup; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Strategy which groups partition keys together if they belong to the same TokenRange. + */ +public class TokenRangeAwareGroupedExecutionStrategy implements GroupedExecutionStrategy{ + + private final CqlSession session; + + public TokenRangeAwareGroupedExecutionStrategy(Configuration configuration, CQLStoreManager storeManager){ + // `configuration` is ignored + this.session = storeManager.getSession(); + } + + @Override + public void execute(R futureResult, + Q queries, + List keys, + ResultFiller withKeysGroupingFiller, + ResultFiller> withoutKeysGroupingFiller, + StoreTransaction txh, + int keysGroupingLimit){ + + Optional optionalTokenMap = session.getMetadata().getTokenMap(); + if(!optionalTokenMap.isPresent()){ + withoutKeysGroupingFiller.execute(futureResult, queries, keys, txh); + return; + } + + TokenMap tokenMap = optionalTokenMap.get(); + Set tokenRanges = tokenMap.getTokenRanges(); + + if(CollectionUtils.isEmpty(tokenRanges)){ + withoutKeysGroupingFiller.execute(futureResult, queries, keys, txh); + return; + } + + Map keyGroupBuildersByTokenRanges = new HashMap<>(tokenRanges.size()); + final int groupLimit = Math.min(keys.size(), keysGroupingLimit); + + for(StaticBuffer key : keys){ + ByteBuffer keyByteBuffer = key.asByteBuffer(); + Token token = tokenMap.newToken(keyByteBuffer); + TokenRange tokenRange = CQLSliceQueryUtil.findTokenRange(token, tokenRanges); + if(tokenRange == null){ + withKeysGroupingFiller.execute(futureResult, queries, new KeysGroup(Collections.singletonList(key), Collections.singletonList(keyByteBuffer), token), txh); + } else { + KeysGroup keyGroup = keyGroupBuildersByTokenRanges.get(tokenRange); + if(keyGroup == null){ + keyGroup = new KeysGroup(groupLimit, token); + keyGroupBuildersByTokenRanges.put(tokenRange, keyGroup); + } + keyGroup.addKey(key, keyByteBuffer); + if(keyGroup.size() >= groupLimit){ + keyGroupBuildersByTokenRanges.put(tokenRange, new KeysGroup(groupLimit, token)); + withKeysGroupingFiller.execute(futureResult, queries, keyGroup, txh); + } + } + } + + for(KeysGroup keyGroup : keyGroupBuildersByTokenRanges.values()){ + if(!keyGroup.isEmpty()){ + withKeysGroupingFiller.execute(futureResult, queries, keyGroup, txh); + } + } + } + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/CQLSliceQueryUtil.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/CQLSliceQueryUtil.java new file mode 100644 index 00000000000..84d9eb3cf91 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/CQLSliceQueryUtil.java @@ -0,0 +1,76 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.util; + +import com.datastax.oss.driver.api.core.metadata.token.Token; +import com.datastax.oss.driver.api.core.metadata.token.TokenRange; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.cql.QueryGroups; +import org.janusgraph.diskstorage.keycolumnvalue.KeysQueriesGroup; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CQLSliceQueryUtil { + + private CQLSliceQueryUtil(){} + + public static QueryGroups getQueriesGroupedByDirectEqualityQueries(KeysQueriesGroup queryGroup, int totalQueryGroupsSize, int sliceGroupingLimit){ + Map> directEqualityGroupedQueriesByLimit = new HashMap<>(queryGroup.getQueries().size()); + List separateRangeQueries = new ArrayList<>(queryGroup.getQueries().size()); + for(SliceQuery query : queryGroup.getQueries()){ + if(query.isDirectColumnByStartOnlyAllowed()){ + List directEqualityQueries = directEqualityGroupedQueriesByLimit.get(query.getLimit()); + if(directEqualityQueries == null){ + directEqualityQueries = new ArrayList<>(totalQueryGroupsSize); + directEqualityQueries.add(query); + directEqualityGroupedQueriesByLimit.put(query.getLimit(), directEqualityQueries); + } else if(directEqualityQueries.size() < sliceGroupingLimit && (!query.hasLimit() || directEqualityQueries.size() < query.getLimit())){ + // We cannot group more than `query.getLimit()` queries together. + // Even so it seems that it makes sense to group them together because we don't need + // more column values than limit - we are still obliged to compute the result because + // any separate SliceQuery can be cached into a tx-cache or db-cache with incomplete result + // which may result in the wrong results for future calls of the cached Slice queries. + // Thus, we add a query into the group only if it doesn't have any limit set OR the total + // amount of grouped together direct equality queries is <= than the limit requested. + // I.e. in other words, we must ensure that the limit won't influence the final result. + directEqualityQueries.add(query); + } else { + // In this case we couldn't group a query. Thus, we should execute this query separately. + separateRangeQueries.add(query); + } + } else { + // We cannot group range queries together. Thus, they are executed separately. + separateRangeQueries.add(query); + } + } + + return new QueryGroups(directEqualityGroupedQueriesByLimit, separateRangeQueries); + } + + public static TokenRange findTokenRange(Token token, Collection tokenRanges){ + for(TokenRange tokenRange : tokenRanges){ + if(tokenRange.contains(token)){ + return tokenRange; + } + } + return null; + } + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/KeysGroup.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/KeysGroup.java new file mode 100644 index 00000000000..771897e7aea --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/KeysGroup.java @@ -0,0 +1,66 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.util; + +import com.datastax.oss.driver.api.core.metadata.token.Token; +import org.janusgraph.diskstorage.StaticBuffer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class KeysGroup { + + private final Token routingToken; + private final List keys; + private final List rawKeys; + + public KeysGroup(int size, Token routingToken) { + this.keys = new ArrayList<>(size); + this.rawKeys = new ArrayList<>(size); + this.routingToken = routingToken; + } + + public KeysGroup(List keys, List rawKeys, Token routingToken) { + this.keys = keys; + this.rawKeys = rawKeys; + this.routingToken = routingToken; + } + + public List getKeys() { + return keys; + } + + public List getRawKeys() { + return rawKeys; + } + + public void addKey(StaticBuffer key, ByteBuffer rawKey){ + keys.add(key); + rawKeys.add(rawKey); + } + + public boolean isEmpty(){ + return keys.isEmpty(); + } + + public int size(){ + return keys.size(); + } + + public Token getRoutingToken() { + return routingToken; + } +} diff --git a/janusgraph-cql/src/test/java/org/janusgraph/JanusGraphCassandraContainer.java b/janusgraph-cql/src/test/java/org/janusgraph/JanusGraphCassandraContainer.java index 163b7da8ebe..1289875de01 100644 --- a/janusgraph-cql/src/test/java/org/janusgraph/JanusGraphCassandraContainer.java +++ b/janusgraph-cql/src/test/java/org/janusgraph/JanusGraphCassandraContainer.java @@ -102,10 +102,47 @@ private static String storageBackend() { property; } - private boolean isScylla(){ + private static boolean isScylla(){ return getCassandraImage().contains("scylla"); } + public static boolean supportsPerPartitionLimit(){ + if(isScylla()){ + return true; + } + + // Cassandra < 3.11 doesn't support PER PARTITION LIMIT. + + String version = getVersion(); + int majorVersionEnd = version.indexOf("."); + if(majorVersionEnd < 1){ + return false; + } + String majorVersionStr = version.substring(0, majorVersionEnd); + try{ + if(Integer.parseInt(majorVersionStr) > 3){ + return true; + } + } catch (Throwable t){ + return false; + } + int minorVersionEnd = version.indexOf(".", majorVersionEnd+1); + if(minorVersionEnd == -1){ + return false; + } + String minorVersionStr = version.substring(majorVersionEnd+1, minorVersionEnd); + + try{ + if(Integer.parseInt(minorVersionStr) >= 11){ + return true; + } + } catch (Throwable t){ + return false; + } + + return false; + } + private String getConfigPrefix() { if(isScylla()){ return "scylla"; @@ -130,7 +167,7 @@ public JanusGraphCassandraContainer(boolean bindDefaultPort) { if (useDynamicConfig()) { String commandArguments = isScylla() ? - "--skip-wait-for-gossip-to-settle 0 --memory 2G --smp 1 --developer-mode 1" : + "--skip-wait-for-gossip-to-settle 0 --memory 2G --smp 1 --developer-mode 1 --max-partition-key-restrictions-per-query "+Integer.MAX_VALUE : "-Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.load_ring_state=false"; withCommand(commandArguments); switch (getPartitioner()){ diff --git a/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategyBuilderTest.java b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategyBuilderTest.java new file mode 100644 index 00000000000..aa64107bd0b --- /dev/null +++ b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/strategy/GroupedExecutionStrategyBuilderTest.java @@ -0,0 +1,120 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.strategy; + +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.cql.CQLStoreManager; +import org.janusgraph.diskstorage.cql.util.KeysGroup; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.List; + +public class GroupedExecutionStrategyBuilderTest { + + @Test + public void shouldBuildTokenRangeAwareStrategy(){ + GroupedExecutionStrategy strategy = GroupedExecutionStrategyBuilder.build(null, + Mockito.mock(CQLStoreManager.class), GroupedExecutionStrategyBuilder.TOKEN_RANGE_AWARE); + Assertions.assertEquals(TokenRangeAwareGroupedExecutionStrategy.class, strategy.getClass()); + } + + @Test + public void shouldBuildReplicasAwareStrategy(){ + CQLStoreManager storeManager = Mockito.mock(CQLStoreManager.class); + Mockito.when(storeManager.getKeyspaceName()).thenReturn("testKeyspace"); + GroupedExecutionStrategy strategy = GroupedExecutionStrategyBuilder.build(null, + storeManager, GroupedExecutionStrategyBuilder.REPLICAS_AWARE); + Assertions.assertEquals(ReplicasAwareGroupedExecutionStrategy.class, strategy.getClass()); + } + + @Test + public void shouldBuildSpecificStrategy(){ + GroupedExecutionStrategy strategy = GroupedExecutionStrategyBuilder.build(null, + Mockito.mock(CQLStoreManager.class), TestGroupedExecutionStrategy.class.getName()); + Assertions.assertEquals(TestGroupedExecutionStrategy.class, strategy.getClass()); + } + + @Test + public void shouldFailBuildCustomStrategyWhichThrowsException(){ + Assertions.assertThrows(IllegalStateException.class, () -> GroupedExecutionStrategyBuilder.build(null, + Mockito.mock(CQLStoreManager.class), TestFailingGroupedExecutionStrategy.class.getName())); + } + + @Test + public void shouldFailBuildCustomStrategyWithoutNecessaryConstructor(){ + Assertions.assertThrows(IllegalArgumentException.class, () -> GroupedExecutionStrategyBuilder.build(null, + Mockito.mock(CQLStoreManager.class), TestNoArgsGroupedExecutionStrategy.class.getName())); + } + + @Test + public void shouldFailBuildNonExistingClass(){ + Assertions.assertThrows(IllegalArgumentException.class, () -> GroupedExecutionStrategyBuilder.build(null, + Mockito.mock(CQLStoreManager.class), "NonExistingStrategyImplementation")); + } + + @Test + public void shouldFailBuildIfNotImplementsProperInterface(){ + Assertions.assertThrows(IllegalArgumentException.class, () -> GroupedExecutionStrategyBuilder.build(null, + Mockito.mock(CQLStoreManager.class), Integer.class.getName())); + } + + private static class TestGroupedExecutionStrategy implements GroupedExecutionStrategy{ + + public TestGroupedExecutionStrategy(Configuration configuration, CQLStoreManager session){ + // `configuration` and `session` is ignored + } + + @Override + public void execute(R futureResult, + Q queries, + List keys, + ResultFiller withKeysGroupingFiller, + ResultFiller> withoutKeysGroupingFiller, + StoreTransaction txh, + int keysGroupingLimit){ + // ignored + } + } + + public static class TestNoArgsGroupedExecutionStrategy implements GroupedExecutionStrategy{ + + @Override + public void execute(R futureResult, Q queries, List keys, ResultFiller withKeysGroupingFiller, ResultFiller> withoutKeysGroupingFiller, StoreTransaction txh, int keysGroupingLimit) { + // ignored + } + } + + private static class TestFailingGroupedExecutionStrategy implements GroupedExecutionStrategy{ + + public TestFailingGroupedExecutionStrategy(Configuration configuration, CQLStoreManager storeManager){ + throw new RuntimeException(); + } + + @Override + public void execute(R futureResult, + Q queries, + List keys, + ResultFiller withKeysGroupingFiller, + ResultFiller> withoutKeysGroupingFiller, + StoreTransaction txh, + int keysGroupingLimit){ + // ignored + } + } +} diff --git a/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java b/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java index 93ea06fdffa..0c1f80102f6 100644 --- a/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java +++ b/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java @@ -17,9 +17,11 @@ import io.github.artsok.RepeatedIfExceptionsTest; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMetrics; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.janusgraph.JanusGraphCassandraContainer; import org.janusgraph.core.Cardinality; import org.janusgraph.core.JanusGraphException; @@ -31,6 +33,7 @@ import org.janusgraph.diskstorage.TemporaryBackendException; import org.janusgraph.diskstorage.configuration.Configuration; import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategyBuilder; import org.janusgraph.diskstorage.util.backpressure.SemaphoreQueryBackPressure; import org.janusgraph.diskstorage.util.backpressure.builder.QueryBackPressureBuilder; import org.janusgraph.graphdb.JanusGraphTest; @@ -54,12 +57,17 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import java.util.stream.Stream; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.ATOMIC_BATCH_MUTATE; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BACK_PRESSURE_CLASS; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BACK_PRESSURE_LIMIT; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BATCH_STATEMENT_SIZE; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_ALLOWED; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_CLASS; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_LIMIT; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYS_GROUPING_MIN; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.LOCAL_MAX_CONNECTIONS_PER_HOST; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.MAX_REQUESTS_PER_CONNECTION; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.REMOTE_MAX_CONNECTIONS_PER_HOST; @@ -106,6 +114,109 @@ protected static Stream generateSemaphoreBackPressureConfigs() { }); } + protected static Stream generateGroupingConfigs() { + + // We should always disable keys grouping option for cases when CQL storage backend doesn't support PER PARTITION LIMIT. + boolean keysGroupingAllowed = JanusGraphCassandraContainer.supportsPerPartitionLimit(); + List argumentsList = new ArrayList<>(); + if(keysGroupingAllowed){ + argumentsList.add( + arguments(1, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(100, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(2000, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(1, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 200}) + ); + argumentsList.add( + arguments(100, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 200}) + ); + argumentsList.add( + arguments(2000, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 200}) + ); + argumentsList.add( + arguments(1, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(100, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(2000, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + } + + argumentsList.add( + arguments(1, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), false, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(100, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), false, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(2000, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), false, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + + argumentsList.add( + arguments(1, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), false, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(100, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), false, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + argumentsList.add( + arguments(2000, new Object[]{option(USE_MULTIQUERY), true, option(PROPERTY_PREFETCHING), false, option(LIMITED_BATCH), true, + option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), false, option(KEYS_GROUPING_LIMIT), 100, + option(KEYS_GROUPING_MIN), 2}) + ); + + return argumentsList.stream(); + } + @Override public WriteConfiguration getConfiguration() { return cqlContainer.getConfiguration(getClass().getSimpleName()).getConfiguration(); @@ -188,11 +299,11 @@ public void testQueryLongForPropertyKey() { GraphTraversalSource g = graph.traversal(); JanusGraphException ex = assertThrows(JanusGraphException.class, - () -> g.V().has("name", testName).hasNext()); + () -> g.V().has("name", testName).hasNext()); assertEquals(-1, ExceptionUtils.indexOfType(ex, TemporaryBackendException.class), - "Query should not produce a TemporaryBackendException"); + "Query should not produce a TemporaryBackendException"); assertNotEquals(-1, ExceptionUtils.indexOfType(ex, PermanentBackendException.class), - "Query should produce a PermanentBackendException"); + "Query should produce a PermanentBackendException"); } @ParameterizedTest @@ -232,7 +343,7 @@ public void testCustomBackPressureClassIsSet() { @Override @Test @Disabled("Use Parametrized test instead") public void testLimitBatchSizeForMultiQueryMultiCardinalityProperties(){ - // ignored. Used testLimitBatchSizeForMultiQueryMultiCardinalityProperties(boolean sliceGroupingAllowed) instead + // ignored. Used testLimitBatchSizeForMultiQueryMultiCardinalityProperties(boolean sliceGroupingAllowed) instead } @ParameterizedTest @@ -252,6 +363,75 @@ public void testLimitBatchSizeForMultiQueryMultiCardinalityProperties(boolean sl assertEquals(1, countBackendQueriesOfSize(lastBatch + lastBatch * 4 + lastBatch * 4, profile.getMetrics())); } + @MethodSource("generateGroupingConfigs") + @ParameterizedTest + public void testBatchWithCQLGrouping(int elementsAmount, Object[] configuration){ + + mgmt.makeVertexLabel("testVertex").make(); + mgmt.makePropertyKey("singleProperty1").cardinality(Cardinality.SINGLE).dataType(String.class).make(); + mgmt.makePropertyKey("singleProperty2").cardinality(Cardinality.SINGLE).dataType(String.class).make(); + mgmt.makePropertyKey("singleProperty3").cardinality(Cardinality.SINGLE).dataType(String.class).make(); + mgmt.makePropertyKey("setProperty").cardinality(Cardinality.SET).dataType(String.class).make(); + mgmt.makePropertyKey("listProperty").cardinality(Cardinality.LIST).dataType(String.class).make(); + finishSchema(); + JanusGraphVertex[] cs = new JanusGraphVertex[elementsAmount]; + for (int i = 0; i < elementsAmount; ++i) { + cs[i] = graph.addVertex("testVertex"); + cs[i].property("singleProperty1", "single value1 "+i); + cs[i].property("singleProperty2", "single value1 "+i); + cs[i].property("singleProperty3", "single value1 "+i); + cs[i].property(VertexProperty.Cardinality.set, "setProperty", "setValue1"); + cs[i].property(VertexProperty.Cardinality.set, "setProperty", "setValue2"); + cs[i].property(VertexProperty.Cardinality.set, "setProperty", "setValue3"); + cs[i].property(VertexProperty.Cardinality.set, "setProperty", "setValue4"); + cs[i].property(VertexProperty.Cardinality.list, "listProperty", "listValue1"); + cs[i].property(VertexProperty.Cardinality.list, "listProperty", "listValue2"); + cs[i].property(VertexProperty.Cardinality.list, "listProperty", "listValue3"); + cs[i].property(VertexProperty.Cardinality.list, "listProperty", "listValue4"); + } + + newTx(); + + clopen(configuration); + + // test batching for `values()` + TraversalMetrics profile = testGroupingBatch(() -> graph.traversal().V(cs).barrier(elementsAmount).values("singleProperty1", "singleProperty2", "singleProperty3", "setProperty", "listProperty"), configuration); + assertEquals(1, countBackendQueriesOfSize(elementsAmount * 11, profile.getMetrics())); + } + + protected TraversalMetrics testGroupingBatch(Supplier> traversal, Object... settings){ + clopen(settings); + final List resultWithConfiguredOptions = traversal.get().toList(); + if(JanusGraphCassandraContainer.supportsPerPartitionLimit()){ + clopen(option(USE_MULTIQUERY), true, option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_CLASS), GroupedExecutionStrategyBuilder.REPLICAS_AWARE); + final List resultWithKeysEnabledSliceEnabledReplicasAware = traversal.get().toList(); + assertEquals(resultWithKeysEnabledSliceEnabledReplicasAware, resultWithConfiguredOptions); + clopen(option(USE_MULTIQUERY), true, option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_CLASS), GroupedExecutionStrategyBuilder.REPLICAS_AWARE); + final List resultWithKeysEnabledSliceDisabledReplicasAware = traversal.get().toList(); + assertEquals(resultWithKeysEnabledSliceDisabledReplicasAware, resultWithConfiguredOptions); + clopen(option(USE_MULTIQUERY), true, option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_CLASS), GroupedExecutionStrategyBuilder.TOKEN_RANGE_AWARE); + final List resultWithKeysEnabledSliceEnabledTokenRangeAware = traversal.get().toList(); + assertEquals(resultWithKeysEnabledSliceEnabledTokenRangeAware, resultWithConfiguredOptions); + clopen(option(USE_MULTIQUERY), true, option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), true, option(KEYS_GROUPING_CLASS), GroupedExecutionStrategyBuilder.TOKEN_RANGE_AWARE); + final List resultWithKeysEnabledSliceDisabledTokenRangeAware = traversal.get().toList(); + assertEquals(resultWithKeysEnabledSliceDisabledTokenRangeAware, resultWithConfiguredOptions); + } + clopen(option(USE_MULTIQUERY), true, option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), true, option(KEYS_GROUPING_ALLOWED), false); + final List resultWithKeysDisabledSliceEnabled = traversal.get().toList(); + assertEquals(resultWithKeysDisabledSliceEnabled, resultWithConfiguredOptions); + clopen(option(USE_MULTIQUERY), true, option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(), + option(SLICE_GROUPING_ALLOWED), false, option(KEYS_GROUPING_ALLOWED), false); + final List resultWithKeysDisabledSliceDisabled = traversal.get().toList(); + assertEquals(resultWithKeysDisabledSliceDisabled, resultWithConfiguredOptions); + clopen(settings); + return traversal.get().profile().next(); + } + @Override @Test @Disabled("Use Parametrized test instead") public void testMultiQueryPropertiesWithLimit() { // ignored. Used testMultiQueryPropertiesWithLimit(boolean sliceGroupingAllowed) instead