Skip to content

Commit

Permalink
Batch custom scopes activation
Browse files Browse the repository at this point in the history
Added job/step/partition scope management
  • Loading branch information
luca-bassoricci committed Dec 18, 2023
1 parent 2ae6d52 commit 0f976d5
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
import jakarta.enterprise.inject.AmbiguousResolutionException;
import jakarta.inject.Named;

import org.jberet.cdi.JobScoped;
import org.jberet.cdi.PartitionScoped;
import org.jberet.cdi.StepScoped;
import org.jberet.creation.ArchiveXmlLoader;
import org.jberet.creation.BatchBeanProducer;
import org.jberet.creation.JobScopedContextImpl;
import org.jberet.creation.PartitionScopedContextImpl;
import org.jberet.creation.StepScopedContextImpl;
import org.jberet.job.model.BatchArtifacts;
import org.jberet.job.model.Decision;
import org.jberet.job.model.Flow;
Expand Down Expand Up @@ -62,12 +68,17 @@
import io.quarkiverse.jberet.runtime.JBeretRecorder;
import io.quarkiverse.jberet.runtime.JobsProducer;
import io.quarkiverse.jberet.runtime.QuarkusJobScheduler;
import io.quarkiverse.jberet.runtime.scope.QuarkusJobScopedContextImpl;
import io.quarkiverse.jberet.runtime.scope.QuarkusPartitionScopedContextImpl;
import io.quarkiverse.jberet.runtime.scope.QuarkusStepScopedContextImpl;
import io.quarkus.agroal.spi.JdbcDataSourceBuildItem;
import io.quarkus.arc.Unremovable;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem;
import io.quarkus.arc.deployment.BeanContainerBuildItem;
import io.quarkus.arc.deployment.BeanDiscoveryFinishedBuildItem;
import io.quarkus.arc.deployment.ContextRegistrationPhaseBuildItem;
import io.quarkus.arc.deployment.ContextRegistrationPhaseBuildItem.ContextConfiguratorBuildItem;
import io.quarkus.arc.deployment.ValidationPhaseBuildItem;
import io.quarkus.arc.deployment.ValidationPhaseBuildItem.ValidationErrorBuildItem;
import io.quarkus.arc.processor.AnnotationsTransformer;
Expand Down Expand Up @@ -99,28 +110,37 @@ public class JBeretProcessor {
private static final DotName JOB = DotName.createSimple(Job.class);

@BuildStep
public void registerExtension(BuildProducer<FeatureBuildItem> feature, BuildProducer<CapabilityBuildItem> capability) {
public void registerExtension(BuildProducer<FeatureBuildItem> feature,
BuildProducer<CapabilityBuildItem> capability) {
feature.produce(new FeatureBuildItem("jberet"));
}

/**
* Prevent JobOperatorContext$DefaultHolder from eagerly initializing because it depends on a ServiceLoader
* entry for the BatchRuntime, which we don't use. With this trigger turned off, it won't ever be initialized.
* Prevent JobOperatorContext$DefaultHolder from eagerly initializing because it
* depends on a ServiceLoader entry for the BatchRuntime, which we don't use.
* With this trigger turned off, it won't ever be initialized.
*/
@BuildStep
public RuntimeInitializedClassBuildItem runtimeInitializedDefaultHolder() {
return new RuntimeInitializedClassBuildItem("org.jberet.spi.JobOperatorContext$DefaultHolder");
}

@BuildStep
public void batchScopes(ContextRegistrationPhaseBuildItem c, BuildProducer<ContextConfiguratorBuildItem> v) {
v.produce(new ContextConfiguratorBuildItem(
c.getContext().configure(JobScoped.class).contextClass(QuarkusJobScopedContextImpl.class),
c.getContext().configure(StepScoped.class).contextClass(QuarkusStepScopedContextImpl.class),
c.getContext().configure(PartitionScoped.class).contextClass(QuarkusPartitionScopedContextImpl.class)));
}

@BuildStep
public void config(BuildProducer<RunTimeConfigBuilderBuildItem> runTimeConfigBuilder) {
runTimeConfigBuilder.produce(new RunTimeConfigBuilderBuildItem(JBeretConfigSourceFactoryBuilder.class.getName()));
runTimeConfigBuilder
.produce(new RunTimeConfigBuilderBuildItem(JBeretConfigSourceFactoryBuilder.class.getName()));
}

@BuildStep
public void additionalBeans(
JBeretConfig config,
CombinedIndexBuildItem combinedIndex,
public void additionalBeans(JBeretConfig config, CombinedIndexBuildItem combinedIndex,
BuildProducer<AdditionalBeanBuildItem> additionalBeans,
BuildProducer<AnnotationsTransformerBuildItem> annotationsTransformer,
BuildProducer<BatchArtifactBuildItem> batchArtifact) throws Exception {
Expand Down Expand Up @@ -164,9 +184,7 @@ public void transform(final TransformationContext context) {
String className = context.getTarget().asClass().name().toString();
if (batchArtifacts.containsKey(className)) {
String named = batchArtifacts.get(className);
context.transform()
.add(Unremovable.class)
.add(Named.class, createStringValue("value", named))
context.transform().add(Unremovable.class).add(Named.class, createStringValue("value", named))
.done();
}
}
Expand All @@ -175,46 +193,36 @@ public void transform(final TransformationContext context) {

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
public void validateRepository(
JBeretRecorder recorder,
JBeretConfig config,
BeanDiscoveryFinishedBuildItem beanDiscoveryFinishedBuildItem,
List<JdbcDataSourceBuildItem> datasources) {
public void validateRepository(JBeretRecorder recorder, JBeretConfig config,
BeanDiscoveryFinishedBuildItem beanDiscoveryFinishedBuildItem, List<JdbcDataSourceBuildItem> datasources) {
switch (normalize(config.repository().type())) {
case JBeretJdbcJobRepositoryProducer.TYPE:
final String datasource = config.repository().jdbc().datasource();
if (datasources.stream().noneMatch(item -> item.getName().equals(datasource))) {
throw new ConfigurationException("Datasource name "
+ datasource
throw new ConfigurationException("Datasource name " + datasource
+ " does not exist. Available datasources: "
+ datasources.stream()
.map(JdbcDataSourceBuildItem::getName)
.collect(Collectors.joining(",")));
+ datasources.stream().map(JdbcDataSourceBuildItem::getName).collect(Collectors.joining(",")));
}

break;
case JBeretInMemoryJobRepositoryProducer.TYPE:
break;
default:
final DotName dotName = DotName.createSimple(JobRepository.class);
final List<BeanInfo> beanInfos = beanDiscoveryFinishedBuildItem.beanStream().filter(
beanInfo -> beanInfo.hasType(dotName) && beanInfo.hasDefaultQualifiers()).collect();
final List<BeanInfo> beanInfos = beanDiscoveryFinishedBuildItem.beanStream()
.filter(beanInfo -> beanInfo.hasType(dotName) && beanInfo.hasDefaultQualifiers()).collect();
if (beanInfos.isEmpty()) {
throw new ConfigurationException("There is no injectable and @Default JobRepository bean");
} else if (beanInfos.size() > 1) {
throw new ConfigurationException(
"Multiple injectable and @Default JobRepository beans are not allowed : "
+ beanInfos);
"Multiple injectable and @Default JobRepository beans are not allowed : " + beanInfos);
}
}
}

@BuildStep
public void loadJobs(
JBeretConfig config,
ValidationPhaseBuildItem validationPhase,
BuildProducer<HotDeploymentWatchedFileBuildItem> watchedFiles,
BuildProducer<BatchJobBuildItem> batchJobs,
public void loadJobs(JBeretConfig config, ValidationPhaseBuildItem validationPhase,
BuildProducer<HotDeploymentWatchedFileBuildItem> watchedFiles, BuildProducer<BatchJobBuildItem> batchJobs,
BuildProducer<ValidationErrorBuildItem> validationErrors) throws Exception {

Map<String, BeanInfo> jobBeans = new HashMap<>();
Expand All @@ -238,7 +246,8 @@ public void loadJobs(
Job job = ArchiveXmlLoader.loadJobXml(jobXmlName, contextClassLoader, loadedJobs, jobXmlResolver);
job.setJobXmlName(jobXmlName);
JobConfig jobConfig = config.job().get(jobXmlName);
watchedFiles.produce(new HotDeploymentWatchedFileBuildItem("META-INF/batch-jobs/" + jobXmlName + ".xml"));
watchedFiles
.produce(new HotDeploymentWatchedFileBuildItem("META-INF/batch-jobs/" + jobXmlName + ".xml"));
watchJobScripts(job, watchedFiles);
batchJobs.produce(new BatchJobBuildItem(job, parseCron(job, jobConfig)));
log.debug("Processed job with ID " + job.getId() + " from file " + jobXmlName);
Expand All @@ -248,12 +257,8 @@ public void loadJobs(

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
public void registerJobs(
RecorderContext recorderContext,
JBeretRecorder recorder,
JBeretConfig config,
List<BatchJobBuildItem> batchJobs,
List<BatchArtifactBuildItem> batchArtifacts,
public void registerJobs(RecorderContext recorderContext, JBeretRecorder recorder, JBeretConfig config,
List<BatchJobBuildItem> batchJobs, List<BatchArtifactBuildItem> batchArtifacts,
BeanContainerBuildItem beanContainer) throws Exception {
registerNonDefaultConstructors(recorderContext);

Expand All @@ -274,9 +279,7 @@ public void registerJobs(

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
ServiceStartBuildItem init(JBeretRecorder recorder,
JBeretConfig config,
ThreadPoolConfig threadPoolConfig,
ServiceStartBuildItem init(JBeretRecorder recorder, JBeretConfig config, ThreadPoolConfig threadPoolConfig,
BeanContainerBuildItem beanContainer) {

recorder.initJobOperator(config, threadPoolConfig, beanContainer.getValue());
Expand All @@ -287,21 +290,20 @@ ServiceStartBuildItem init(JBeretRecorder recorder,

@BuildStep
public void nativeImage(BuildProducer<NativeImageResourceBuildItem> resources,
BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
JBeretConfig config) {
BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, JBeretConfig config) {
resources.produce(new NativeImageResourceBuildItem("sql/jberet-sql.properties"));
resources.produce(new NativeImageResourceBuildItem("sql/jberet.ddl"));
if (JBeretJdbcJobRepositoryProducer.TYPE.equals(normalize(config.repository().type()))) {
config.repository().jdbc().ddlFileName().map(String::trim)
.filter(Predicate.not(String::isEmpty))
config.repository().jdbc().ddlFileName().map(String::trim).filter(Predicate.not(String::isEmpty))
.ifPresent(v -> resources.produce(new NativeImageResourceBuildItem(v)));
config.repository().jdbc().sqlFileName().map(String::trim)
.filter(Predicate.not(String::isEmpty))
config.repository().jdbc().sqlFileName().map(String::trim).filter(Predicate.not(String::isEmpty))
.ifPresent(v -> resources.produce(new NativeImageResourceBuildItem(v)));
}
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(QuarkusJobScheduler.class).methods().build());
reflectiveClasses
.produce(ReflectiveClassBuildItem.builder(JobInstanceImpl.class).constructors().methods().fields().build());
reflectiveClasses.produce(
ReflectiveClassBuildItem.builder(JobInstanceImpl.class).constructors().methods().fields().build());
reflectiveClasses.produce(new ReflectiveClassBuildItem(true, true, false, JobScopedContextImpl.class,
StepScopedContextImpl.class, PartitionScopedContextImpl.class));
}

private static void registerNonDefaultConstructors(RecorderContext recorderContext) throws Exception {
Expand Down Expand Up @@ -334,29 +336,28 @@ private static void registerNonDefaultConstructors(RecorderContext recorderConte
recorderContext.registerNonDefaultConstructor(Transition.Next.class.getConstructor(String.class),
next -> Collections.singletonList(next.getOn()));

recorderContext.registerNonDefaultConstructor(Script.class.getConstructor(String.class, String.class, String.class),
script -> Stream
.of(script.getType(), script.getSrc(),
script.getSrc() != null ? script.getContent(Thread.currentThread().getContextClassLoader())
recorderContext
.registerNonDefaultConstructor(Script.class.getConstructor(String.class, String.class, String.class),
script -> Stream.of(script.getType(), script.getSrc(),
script.getSrc() != null
? script.getContent(Thread.currentThread().getContextClassLoader())
: script.getContent())
.collect(toList()));
.collect(toList()));
}

private static Set<String> findBatchFilesFromPath(Path path, List<Pattern> includes, List<Pattern> excludes) {
try {
Stream<String> filePaths = Files.walk(path)
.filter(Files::isRegularFile)
.map(file -> file.getFileName().toString())
.filter(file -> file.endsWith(".xml"));
Stream<String> filePaths = Files.walk(path).filter(Files::isRegularFile)
.map(file -> file.getFileName().toString()).filter(file -> file.endsWith(".xml"));

if (!includes.isEmpty()) {
filePaths = filePaths
.filter(filePath -> includes.stream().allMatch(pattern -> pattern.matcher(filePath).matches()));
}

if (!excludes.isEmpty()) {
filePaths = filePaths
.filter(filePath -> excludes.stream().noneMatch(pattern -> pattern.matcher(filePath).matches()));
filePaths = filePaths.filter(
filePath -> excludes.stream().noneMatch(pattern -> pattern.matcher(filePath).matches()));
}

return filePaths.map(file -> file.substring(0, file.length() - 4)).collect(Collectors.toSet());
Expand All @@ -366,7 +367,8 @@ private static Set<String> findBatchFilesFromPath(Path path, List<Pattern> inclu
}

private static List<Pattern> toPatterns(Optional<List<String>> pattern) {
return pattern.map(patterns -> patterns.stream().map(GlobUtil::toRegexPattern).map(Pattern::compile).collect(toList()))
return pattern.map(
patterns -> patterns.stream().map(GlobUtil::toRegexPattern).map(Pattern::compile).collect(toList()))
.orElseGet(ArrayList::new);
}

Expand All @@ -382,9 +384,8 @@ private static String parseCron(Job job, JobConfig jobConfig) {
return jobConfig.cron().get();
} catch (Exception e) {
e.printStackTrace();
throw new ConfigurationException(
String.format("The cron expression %s configured in %s is not valid", jobConfig.cron().get(),
"quarkus.jberet.job." + job.getJobXmlName() + ".cron"));
throw new ConfigurationException(String.format("The cron expression %s configured in %s is not valid",
jobConfig.cron().get(), "quarkus.jberet.job." + job.getJobXmlName() + ".cron"));
}
}

Expand Down Expand Up @@ -415,7 +416,8 @@ private static void watchJobScripts(Job job, BuildProducer<HotDeploymentWatchedF
}
}

private static void watchJobScripts(JobElement jobElement, BuildProducer<HotDeploymentWatchedFileBuildItem> watchedFiles) {
private static void watchJobScripts(JobElement jobElement,
BuildProducer<HotDeploymentWatchedFileBuildItem> watchedFiles) {
if (jobElement instanceof Step) {
final Step step = (Step) jobElement;

Expand Down Expand Up @@ -454,9 +456,7 @@ private static void watchJobScripts(JobElement jobElement, BuildProducer<HotDepl

private static void watchJobScripts(RefArtifact refArtifact,
BuildProducer<HotDeploymentWatchedFileBuildItem> watchedFiles) {
if (refArtifact != null &&
refArtifact.getScript() != null &&
refArtifact.getScript().getSrc() != null) {
if (refArtifact != null && refArtifact.getScript() != null && refArtifact.getScript().getSrc() != null) {
watchedFiles.produce(new HotDeploymentWatchedFileBuildItem(refArtifact.getScript().getSrc()));
}
}
Expand Down
Loading

0 comments on commit 0f976d5

Please sign in to comment.