-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduced new view manager that allows having only a single view class
- Loading branch information
1 parent
f7e7281
commit bd0fd23
Showing
29 changed files
with
1,099 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"event-id": "5601097d-6e2e-4df1-a7b2-ecc4f443c068", | ||
"event-timestamp": "2024-01-07T10:00:00.000+01:00[Europe/Berlin]", | ||
"entity-id-path": "PERSON 954177c4-aeb7-4d1e-b6d7-3e02fe9432cb", | ||
"name": "Harry Osborn" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"event-id": "4bf5bb56-4fe8-47a3-8358-25144e15497d", | ||
"event-timestamp": "2024-01-07T09:00:00.000+01:00[Europe/Berlin]", | ||
"entity-id-path": "PERSON 568df38c-fdc3-4f60-81aa-d3cce9ebfd7b", | ||
"name": "Mary Jane Watson" | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
for file in ./*.json | ||
do | ||
curl -i \ | ||
-H "Content-Type:application/json" \ | ||
-d "@$file" \ | ||
"http://localhost:8081/persons/create" | ||
done |
File renamed without changes.
181 changes: 181 additions & 0 deletions
181
.../src/main/java/org/fuin/cqrs4j/example/quarkus/query/views/common/QuarkusViewManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
package org.fuin.cqrs4j.example.quarkus.query.views.common; | ||
|
||
import io.quarkus.arc.All; | ||
import io.quarkus.narayana.jta.QuarkusTransaction; | ||
import io.quarkus.runtime.Shutdown; | ||
import io.quarkus.runtime.Startup; | ||
import io.quarkus.scheduler.Scheduler; | ||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
import org.fuin.cqrs4j.ProjectionService; | ||
import org.fuin.cqrs4j.example.shared.SharedUtils; | ||
import org.fuin.cqrs4j.example.shared.View; | ||
import org.fuin.ddd4j.ddd.Event; | ||
import org.fuin.ddd4j.ddd.EventType; | ||
import org.fuin.esc.api.*; | ||
import org.fuin.esc.esgrpc.IESGrpcEventStore; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.Semaphore; | ||
|
||
import static org.fuin.cqrs4j.Cqrs4JUtils.tryLocked; | ||
|
||
/** | ||
* Creates scheduler update jobs for all classes implementing the {@link View} interface. | ||
* Avoids boilerplate code: Instead of having a separated "Projector", "EventDispatcher" | ||
* and a "ChunkHandler" class for each view, there is only one simplified "View" class now. | ||
*/ | ||
@ApplicationScoped | ||
public class QuarkusViewManager { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(QuarkusViewManager.class); | ||
|
||
@Inject | ||
Scheduler scheduler; | ||
|
||
@Inject | ||
@All | ||
List<View> rawViews; | ||
|
||
@Inject | ||
IESGrpcEventStore eventstore; | ||
|
||
@Inject | ||
ProjectionAdminEventStore admin; | ||
|
||
@Inject | ||
ProjectionService projectionService; | ||
|
||
private List<ViewExt> views; | ||
|
||
@Startup | ||
void createViews() { | ||
LOG.info("Create views..."); | ||
views = rawViews.stream().map(ViewExt::new).toList(); | ||
for (final ViewExt view : views) { | ||
LOG.info("Create: {}", view.getName()); | ||
scheduler.newJob(view.getName()) | ||
.setCron(view.getCron()) | ||
.setTask(executionContext -> updateView(view)) | ||
.schedule(); | ||
} | ||
} | ||
|
||
@Shutdown | ||
void shutdownViews() { | ||
LOG.info("Shutdown views..."); | ||
for (final ViewExt view : views) { | ||
LOG.info("Shutdown: {}", view.getName()); | ||
scheduler.unscheduleJob(view.getName()); | ||
} | ||
} | ||
|
||
private void updateView(final ViewExt view) { | ||
tryLocked(view.getLock(), () -> { | ||
new Thread(() -> { | ||
try { | ||
readStreamEvents(view); | ||
} catch (final RuntimeException ex) { | ||
LOG.error("Error reading events from stream", ex); | ||
} | ||
} | ||
).start(); | ||
}); | ||
} | ||
|
||
private void readStreamEvents(final ViewExt view) { | ||
|
||
// Create an event store projection if it does not exist. | ||
if (!admin.projectionExists(view.getProjectionStreamId())) { | ||
final List<TypeName> typeNames = asTypeNames(view.getEventTypes()); | ||
LOG.info("Create projection '{}' with events: {}", view.getProjectionStreamId(), typeNames); | ||
admin.createProjection(view.getProjectionStreamId(), true, typeNames); | ||
} | ||
|
||
// Read and dispatch events | ||
final Long nextEventNumber = projectionService.readProjectionPosition(view.getProjectionStreamId()); | ||
eventstore.readAllEventsForward(view.getProjectionStreamId(), nextEventNumber, view.getChunkSize(), | ||
currentSlice -> handleChunk(view, currentSlice)); | ||
|
||
} | ||
|
||
private List<TypeName> asTypeNames(Set<EventType> eventTypes) { | ||
return eventTypes.stream().map(eventType -> new TypeName((eventType.asString()))).toList(); | ||
} | ||
|
||
private void handleChunk(final ViewExt view, final StreamEventsSlice currentSlice) { | ||
QuarkusTransaction.requiringNew() | ||
.timeout(10) | ||
.call(() -> { | ||
LOG.debug("Handle chunk: {}", currentSlice); | ||
view.handleEvents(asEvents(currentSlice.getEvents())); | ||
projectionService.updateProjectionPosition(view.getProjectionStreamId(), currentSlice.getNextEventNumber()); | ||
return 0; | ||
}); | ||
} | ||
|
||
private List<org.fuin.ddd4j.ddd.Event> asEvents(List<CommonEvent> events) { | ||
return events.stream().map(event -> (Event) event.getData()).toList(); | ||
} | ||
|
||
/** | ||
* Extends the view with some necessary values used only by this class. | ||
*/ | ||
private static class ViewExt implements View { | ||
|
||
private final View delegate; | ||
|
||
private final ProjectionStreamId projectionStreamId; | ||
|
||
private final Semaphore lock; | ||
|
||
public ViewExt(final View delegate) { | ||
this.delegate = delegate; | ||
|
||
final Set<EventType> eventTypes = delegate.getEventTypes(); | ||
final String name = delegate.getName() + "-" + SharedUtils.calculateChecksum(eventTypes); | ||
projectionStreamId = new ProjectionStreamId(name); | ||
|
||
this.lock = new Semaphore(1); | ||
|
||
} | ||
|
||
@Override | ||
public String getName() { | ||
return delegate.getName(); | ||
} | ||
|
||
@Override | ||
public String getCron() { | ||
return delegate.getCron(); | ||
} | ||
|
||
@Override | ||
public Set<EventType> getEventTypes() { | ||
return delegate.getEventTypes(); | ||
} | ||
|
||
@Override | ||
public int getChunkSize() { | ||
return delegate.getChunkSize(); | ||
} | ||
|
||
@Override | ||
public void handleEvents(List<Event> events) { | ||
delegate.handleEvents(events); | ||
} | ||
|
||
public ProjectionStreamId getProjectionStreamId() { | ||
return projectionStreamId; | ||
} | ||
|
||
public Semaphore getLock() { | ||
return lock; | ||
} | ||
|
||
} | ||
|
||
} |
5 changes: 3 additions & 2 deletions
5
quarkus/query/src/main/java/org/fuin/cqrs4j/example/quarkus/query/views/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package org.fuin.cqrs4j.example.quarkus.query.views; | ||
|
||
/** | ||
* Contains the views used in this query application. A view never uses code of another view, means all views are completely independent of | ||
* each other. As an exception, the 'commons' package has some small classes that are not view specific. | ||
* Contains the views used in this query application. A view never uses code of another view, | ||
* means all views are completely independent of each other. As an exception, the 'commons' package | ||
* has some small classes that are not view specific. | ||
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5 changes: 0 additions & 5 deletions
5
...ry/src/main/java/org/fuin/cqrs4j/example/quarkus/query/views/personlist/package-info.java
This file was deleted.
Oops, something went wrong.
47 changes: 47 additions & 0 deletions
47
...query/src/main/java/org/fuin/cqrs4j/example/quarkus/query/views/statistic/EntityType.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package org.fuin.cqrs4j.example.quarkus.query.views.statistic; | ||
|
||
import jakarta.validation.constraints.Max; | ||
import jakarta.validation.constraints.NotEmpty; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* Defines the name of a type of entity. | ||
* | ||
* @param name Unique name. Will be converted to lowercase internally - Minimum 1 character, maximum 30 characters. | ||
*/ | ||
public record EntityType(String name) { | ||
|
||
/** | ||
* Maximum allowed length of the name. | ||
*/ | ||
public static final int MAX_LENGTH = 30; | ||
|
||
public EntityType(@NotEmpty @Max(MAX_LENGTH) String name) { | ||
this.name = Objects.requireNonNull(name, "name==null").toLowerCase(); | ||
if (name.isEmpty()) { | ||
throw new IllegalArgumentException("Name cannot be empty"); | ||
} | ||
if (name.length() > MAX_LENGTH) { | ||
throw new IllegalArgumentException("Name has a length of " + name.length() | ||
+ ", but max allowed is " + MAX_LENGTH + " characters: '" + name + "'"); | ||
} | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return name; | ||
} | ||
|
||
/** | ||
* Determines if the name is valid. | ||
* | ||
* @param name Name to be verified. | ||
* @return {@literal true} if the given name can be converted into an instance of this class. | ||
*/ | ||
public static boolean isValid(String name) { | ||
return name != null && !name.isEmpty() && name.length() <= MAX_LENGTH; | ||
} | ||
|
||
|
||
} |
49 changes: 49 additions & 0 deletions
49
...main/java/org/fuin/cqrs4j/example/quarkus/query/views/statistic/QryStatisticResource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package org.fuin.cqrs4j.example.quarkus.query.views.statistic; | ||
|
||
import jakarta.inject.Inject; | ||
import jakarta.persistence.EntityManager; | ||
import jakarta.ws.rs.GET; | ||
import jakarta.ws.rs.Path; | ||
import jakarta.ws.rs.PathParam; | ||
import jakarta.ws.rs.Produces; | ||
import jakarta.ws.rs.core.MediaType; | ||
import jakarta.ws.rs.core.Response; | ||
import org.fuin.cqrs4j.example.quarkus.query.views.personlist.PersonListEntry; | ||
import org.fuin.cqrs4j.example.quarkus.query.views.statistic.EntityType; | ||
import org.fuin.cqrs4j.example.quarkus.query.views.statistic.Statistic; | ||
import org.fuin.cqrs4j.example.quarkus.query.views.statistic.StatisticEntity; | ||
import org.fuin.objects4j.vo.UUIDStrValidator; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* REST resource providing the statistics. | ||
*/ | ||
@Path("/statistics") | ||
public class QryStatisticResource { | ||
|
||
@Inject | ||
EntityManager em; | ||
|
||
@GET | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response getAll() { | ||
final List<Statistic> statistics = em.createNamedQuery(StatisticEntity.FIND_ALL, Statistic.class).getResultList(); | ||
return Response.ok(statistics).build(); | ||
} | ||
|
||
@GET | ||
@Path("{name}") | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response getByName(@PathParam("name") String name) { | ||
if (!EntityType.isValid(name)) { | ||
return Response.status(Response.Status.BAD_REQUEST).entity("Invalid entity type name").build(); | ||
} | ||
final StatisticEntity entity = em.find(StatisticEntity.class, name); | ||
if (entity == null) { | ||
return Response.status(Response.Status.NOT_FOUND).build(); | ||
} | ||
return Response.ok(entity.toDto()).build(); | ||
} | ||
|
||
} |
Oops, something went wrong.