Skip to content

Commit dedda29

Browse files
authored
NoSQL persistence: add Java/Vert.X executor abstraction layer (#2527)
Provides an abstraction to submit asynchronous tasks, optionally with a delay or delay + repetition and implementations based on Java's `ThreadPoolExecutor` and Vert.X.
1 parent b8f956a commit dedda29

File tree

29 files changed

+1941
-0
lines changed

29 files changed

+1941
-0
lines changed

bom/build.gradle.kts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ dependencies {
3636
api(project(":polaris-version"))
3737
api(project(":polaris-persistence-nosql-varint"))
3838

39+
api(project(":polaris-async-api"))
40+
api(project(":polaris-async-java"))
41+
api(project(":polaris-async-vertx"))
42+
43+
api(project(":polaris-idgen-api"))
44+
api(project(":polaris-idgen-impl"))
45+
api(project(":polaris-idgen-spi"))
46+
3947
api(project(":polaris-config-docs-annotations"))
4048
api(project(":polaris-config-docs-generator"))
4149

gradle/libs.versions.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,16 @@ s3mock-testcontainers = { module = "com.adobe.testing:s3mock-testcontainers", ve
9999
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
100100
smallrye-common-annotation = { module = "io.smallrye.common:smallrye-common-annotation", version = "2.13.9" }
101101
smallrye-config-core = { module = "io.smallrye.config:smallrye-config-core", version = "3.14.0" }
102+
smallrye-jandex = { module = "io.smallrye:jandex", version = "3.4.0" }
102103
spark35-sql-scala212 = { module = "org.apache.spark:spark-sql_2.12", version.ref = "spark35" }
103104
swagger-annotations = { module = "io.swagger:swagger-annotations", version.ref = "swagger" }
104105
swagger-jaxrs = { module = "io.swagger:swagger-jaxrs", version.ref = "swagger" }
105106
testcontainers-bom = { module = "org.testcontainers:testcontainers-bom", version = "1.21.3" }
106107
testcontainers-keycloak = { module = "com.github.dasniko:testcontainers-keycloak", version = "3.8.0" }
107108
threeten-extra = { module = "org.threeten:threeten-extra", version = "1.8.0" }
109+
vertx-core = { module = "io.vertx:vertx-core", version = "5.0.4" }
110+
weld-se-core = { module = "org.jboss.weld.se:weld-se-core", version = "6.0.3.Final" }
111+
weld-junit5 = { module = "org.jboss.weld:weld-junit5", version = "5.0.1.Final" }
108112

109113
[plugins]
110114
jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.15" }

gradle/projects.main.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ polaris-config-docs-annotations=tools/config-docs/annotations
4848
polaris-config-docs-generator=tools/config-docs/generator
4949
polaris-config-docs-site=tools/config-docs/site
5050

51+
# executor abstraction
52+
polaris-async-api=persistence/nosql/async/api
53+
polaris-async-java=persistence/nosql/async/java
54+
polaris-async-vertx=persistence/nosql/async/vertx
5155
# id generation
5256
polaris-idgen-api=persistence/nosql/idgen/api
5357
polaris-idgen-impl=persistence/nosql/idgen/impl

persistence/nosql/async/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Async execution API
21+
22+
Provides an abstraction to submit asynchronous tasks, optionally with a delay or delay + repetition and implementations
23+
based on Java's `ThreadPoolExecutor` and Vert.X.
24+
25+
## Code structure
26+
27+
The code is structured into multiple modules. Consuming code should almost always pull in only the API module.
28+
29+
* `polaris-async-api` provides the necessary Java interfaces and immutable types.
30+
* `polaris-async-java` implementation leveraging `CompletableFuture.delayedExecutor` for delayed/scheduled invocations.
31+
* `polaris-async-vertx` implementation leveraging Vert.X for delayed/scheduled invocations.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins {
21+
id("org.kordamp.gradle.jandex")
22+
id("polaris-server")
23+
}
24+
25+
description = "Polaris async execution API"
26+
27+
dependencies {
28+
compileOnly(libs.jakarta.annotation.api)
29+
compileOnly(libs.jakarta.validation.api)
30+
compileOnly(libs.jakarta.inject.api)
31+
compileOnly(libs.jakarta.enterprise.cdi.api)
32+
33+
compileOnly(libs.smallrye.config.core)
34+
compileOnly(platform(libs.quarkus.bom))
35+
compileOnly("io.quarkus:quarkus-core")
36+
37+
compileOnly(project(":polaris-immutables"))
38+
annotationProcessor(project(":polaris-immutables", configuration = "processor"))
39+
40+
implementation(platform(libs.jackson.bom))
41+
implementation("com.fasterxml.jackson.core:jackson-databind")
42+
43+
testFixturesCompileOnly(platform(libs.jackson.bom))
44+
testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind")
45+
46+
testFixturesApi(libs.jakarta.annotation.api)
47+
testFixturesApi(libs.jakarta.validation.api)
48+
testFixturesApi(libs.jakarta.inject.api)
49+
testFixturesApi(libs.jakarta.enterprise.cdi.api)
50+
51+
testFixturesApi(project(":polaris-idgen-api"))
52+
53+
testFixturesImplementation(libs.weld.se.core)
54+
testFixturesImplementation(libs.weld.junit5)
55+
testFixturesImplementation(libs.guava)
56+
testFixturesRuntimeOnly(libs.smallrye.jandex)
57+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.nosql.async;
20+
21+
import com.fasterxml.jackson.annotation.JsonFormat;
22+
import com.fasterxml.jackson.annotation.JsonInclude;
23+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
24+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
25+
import io.smallrye.config.ConfigMapping;
26+
import io.smallrye.config.WithDefault;
27+
import java.time.Duration;
28+
import java.util.Optional;
29+
import java.util.OptionalInt;
30+
import org.apache.polaris.immutables.PolarisImmutable;
31+
32+
/** Advanced configuration options to tune async activities. */
33+
@PolarisImmutable
34+
@ConfigMapping(prefix = "polaris.async")
35+
@JsonSerialize(as = ImmutableAsyncConfiguration.class)
36+
@JsonDeserialize(as = ImmutableAsyncConfiguration.class)
37+
public interface AsyncConfiguration {
38+
39+
String DEFAULT_THREAD_KEEP_ALIVE_STRING = "PT1S";
40+
Duration DEFAULT_THREAD_KEEP_ALIVE = Duration.parse(DEFAULT_THREAD_KEEP_ALIVE_STRING);
41+
42+
String DEFAULT_MAX_THREADS_STRING = "256";
43+
int DEFAULT_MAX_THREADS = Integer.parseInt(DEFAULT_MAX_THREADS_STRING);
44+
45+
/** Duration to keep idle threads alive. */
46+
@WithDefault(DEFAULT_THREAD_KEEP_ALIVE_STRING)
47+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
48+
@JsonFormat(shape = JsonFormat.Shape.STRING)
49+
Optional<Duration> threadKeepAlive();
50+
51+
/** Maximum number of threads available for asynchronous execution. Default is 256. */
52+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
53+
OptionalInt maxThreads();
54+
55+
static ImmutableAsyncConfiguration.Builder builder() {
56+
return ImmutableAsyncConfiguration.builder();
57+
}
58+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.nosql.async;
20+
21+
import static java.util.concurrent.Executors.callable;
22+
23+
import jakarta.enterprise.context.ApplicationScoped;
24+
import java.time.Duration;
25+
import java.util.concurrent.Callable;
26+
27+
/**
28+
* Abstraction for platform/environment-specific scheduler implementations for delayed and
29+
* optionally repeated executions.
30+
*
31+
* <p>Quarkus production systems use Vert.x, tests usually use Java executors.
32+
*
33+
* <p>Implementations, like Java executors or Vert.X, are usually {@link
34+
* ApplicationScoped @ApplicationScoped} in CDI environments.
35+
*/
36+
public interface AsyncExec {
37+
38+
default <R> Cancelable<R> submit(Callable<R> callable) {
39+
return schedule(callable, Duration.ZERO);
40+
}
41+
42+
/**
43+
* Asynchronously run the given {@linkplain Callable callable} after the provided {@linkplain
44+
* Duration delay}. If the delay is not positive, the function is scheduled for immediate
45+
* execution.
46+
*
47+
* @param callable the callable to execute
48+
* @param delay the execution delay, zero and negative values mean immediate scheduling
49+
* @param <R> return type of the callable propagated through the returned cancelable
50+
* @return the cancelable for the scheduled task
51+
*/
52+
<R> Cancelable<R> schedule(Callable<R> callable, Duration delay);
53+
54+
/**
55+
* This is a convenience function for {@link #schedule(Callable, Duration)} with a void result,
56+
* using a {@link Runnable}.
57+
*/
58+
default Cancelable<Void> schedule(Runnable runnable, Duration delay) {
59+
return schedule(callable(runnable, null), delay);
60+
}
61+
62+
/**
63+
* Schedules a runnable to be executed repeatedly using the given initial delay. This is
64+
* equivalent to calling {@link #schedulePeriodic(Runnable, Duration, Duration)} with the {@code
65+
* initialDelay} and {@code delay} having the same values.
66+
*
67+
* <p>There is intentionally no variant of {@code schedulePeriodic()} that takes a {@link Callable
68+
* Callable<R>} because there are multiple invocations of the runnable.
69+
*/
70+
default Cancelable<Void> schedulePeriodic(Runnable runnable, Duration delay) {
71+
return schedulePeriodic(runnable, delay, delay);
72+
}
73+
74+
/**
75+
* Schedules a runnable to be executed repeatedly, starting after the given initial delay.
76+
*
77+
* <p>There is intentionally no variant of {@code schedulePeriodic()} that takes a {@link Callable
78+
* Callable<R>} because there are multiple invocations of the runnable.
79+
*
80+
* @param runnable the runnable to execute
81+
* @param initialDelay initial delay, zero and negative values mean immediate scheduling
82+
* @param delay repetition delay, zero and negative cause an {@link IllegalArgumentException}
83+
* @return cancelable instance
84+
*/
85+
Cancelable<Void> schedulePeriodic(Runnable runnable, Duration initialDelay, Duration delay);
86+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.nosql.async;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
/**
24+
* Implementation agnostic interface for asynchronous delayed and optionally repeated executions.
25+
*
26+
* <p>Implementations may use JVM-local backends, like Java executors or Vert.X. Vert.X's API is not
27+
* based on Java's {@code (Completable)Future} or {@code CompletionStage}. The cancellation
28+
* semantics/guarantees are different for Vert.X and Java.
29+
*
30+
* @param <R>
31+
*/
32+
public interface Cancelable<R> {
33+
/**
34+
* Attempt to cancel the delayed execution of a callable. Already running callables are not
35+
* interrupted. A callable may still be invoked after calling this function, because of side
36+
* effects and race conditions.
37+
*
38+
* <p>After cancellation, the result of this instance's might be either in state "completed
39+
* exceptionally" ({@link java.util.concurrent.CancellationException}) or successfully completed.
40+
*/
41+
void cancel();
42+
43+
/**
44+
* Retrieve the {@link CompletionStage} associated with this {@link Cancelable} for the submitted
45+
* async and potentially periodic execution.
46+
*/
47+
CompletionStage<R> completionStage();
48+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing,
13+
~ software distributed under the License is distributed on an
14+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
~ KIND, either express or implied. See the License for the
16+
~ specific language governing permissions and limitations
17+
~ under the License.
18+
-->
19+
20+
<beans xmlns="https://jakarta.ee/xml/ns/jakartaee"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd">
23+
<!-- File required by Weld (used for testing), not by Quarkus -->
24+
</beans>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.nosql.async;
20+
21+
import jakarta.enterprise.context.ApplicationScoped;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
/**
25+
* Async-API-specific tests check utility, only used to verify that invocations into CDI beans work,
26+
* <em>not</em> for "general reuse".
27+
*/
28+
@ApplicationScoped
29+
class AppScopedChecker {
30+
static final AtomicInteger COUNTER = new AtomicInteger();
31+
32+
int getAndIncrement() {
33+
return COUNTER.getAndIncrement();
34+
}
35+
}

0 commit comments

Comments
 (0)