Skip to content

Commit

Permalink
fix read aggregate to stop searching if the last event has same seque…
Browse files Browse the repository at this point in the history
…nce number as the last snapshot (#535)

make number of segments to check when a domain event with aggregate number 0 is stored configurable
fix unit for timeout
  • Loading branch information
MGathier authored Aug 22, 2022
1 parent cf30f27 commit 53a80e2
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 24 deletions.
14 changes: 13 additions & 1 deletion axonserver/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ This is the Axon Server Standard Edition, version 4.5
For information about the Axon Framework and Axon Server,
visit https://docs.axoniq.io.

Release Notes for version 4.5.16
--------------------------------
* Fix: reading aggregate events searches for older events when the last event sequence number
is the same as the snapshot sequence number
* New property for index, axoniq.axonserver.event.segments-for-sequence-number-check, defines the number of segments
that Axon Server will check for events on an aggregate when an event with sequence number 0
is stored. The default value for this property is 10.
For performance reasons, if you increase this property to a value higher than 100 it is recommended
to also increase the axoniq.axonserver.event.max-bloom-filters-in-memory property.

Release Notes for version 4.5.15
--------------------------------
* Fix: reading aggregate events hangs on JVM Error
* Fix: reading aggregate events hangs on JVM Error, this includes a new property
"axoniq.axonserver.event.aggregate.timeout" to set the timeout between two events returned by Axon Server.
The default value for this property is 30 seconds.

Release Notes for version 4.5.14
--------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,8 @@ public long waitingTransactions() {
public void cancelPendingTransactions() {
storageTransactionManager.cancelPendingTransactions();
}

public void clearSequenceNumberCache() {
storageTransactionManager.clearSequenceNumberCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public abstract class SegmentBasedEventStore implements EventStorageEngine {

public static final byte TRANSACTION_VERSION = 2;
protected static final Logger logger = LoggerFactory.getLogger(SegmentBasedEventStore.class);
protected static final int MAX_SEGMENTS_FOR_SEQUENCE_NUMBER_CHECK = 10;
protected static final int VERSION_BYTES = 1;
protected static final int FILE_OPTIONS_BYTES = 4;
protected static final int TX_CHECKSUM_BYTES = 4;
Expand Down Expand Up @@ -254,7 +253,7 @@ protected EventIterator createEventIterator(EventSource e, long segment, long st
@Override
public Optional<Long> getLastSequenceNumber(String aggregateIdentifier, SearchHint[] hints) {
return getLastSequenceNumber(aggregateIdentifier, contains(hints, SearchHint.RECENT_ONLY) ?
MAX_SEGMENTS_FOR_SEQUENCE_NUMBER_CHECK : Integer.MAX_VALUE, Long.MAX_VALUE);
storageProperties.segmentsForSequenceNumberCheck() : Integer.MAX_VALUE, Long.MAX_VALUE);
}

private <T> boolean contains(T[] values, T value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ public SortedMap<Long, IndexEntries> lookupAggregate(String aggregateId, long fi
}
IndexEntries entries = activeIndexes.getOrDefault(segment, Collections.emptyMap()).get(aggregateId);
if (entries != null) {
entries = addToResult(firstSequenceNumber, lastSequenceNumber, results, segment, entries);
maxResults -= entries.size();
int nrOfEntries = addToResult(firstSequenceNumber, lastSequenceNumber, results, segment, entries);
maxResults -= nrOfEntries;
if (allEntriesFound(firstSequenceNumber, maxResults, entries)) {
return results;
}
Expand All @@ -444,8 +444,8 @@ public SortedMap<Long, IndexEntries> lookupAggregate(String aggregateId, long fi
IndexEntries entries = getPositions(index, aggregateId);
logger.debug("{}: lookupAggregate {} in segment {} found {}", context, aggregateId, index, entries);
if (entries != null) {
entries = addToResult(firstSequenceNumber, lastSequenceNumber, results, index, entries);
maxResults -= entries.size();
int nrOfEntries = addToResult(firstSequenceNumber, lastSequenceNumber, results, index, entries);
maxResults -= nrOfEntries;
if (allEntriesFound(firstSequenceNumber, maxResults, entries)) {
return results;
}
Expand All @@ -456,13 +456,13 @@ public SortedMap<Long, IndexEntries> lookupAggregate(String aggregateId, long fi
return results;
}

private IndexEntries addToResult(long firstSequenceNumber, long lastSequenceNumber,
private int addToResult(long firstSequenceNumber, long lastSequenceNumber,
SortedMap<Long, IndexEntries> results, Long segment, IndexEntries entries) {
entries = entries.range(firstSequenceNumber, lastSequenceNumber, EventType.SNAPSHOT.equals(eventType));
if (!entries.isEmpty()) {
results.put(segment, entries);
}
return entries;
return entries.size();
}

private boolean allEntriesFound(long firstSequenceNumber, long maxResults, IndexEntries entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class StorageProperties implements Cloneable {
Duration.ofDays(7)
};
private String indexFormat;
private int segmentsForSequenceNumberCheck = 10;

public StorageProperties(SystemInfoProvider systemInfoProvider) {
this.systemInfoProvider = systemInfoProvider;
}
Expand Down Expand Up @@ -346,6 +348,9 @@ public boolean isCleanRequired() {
return systemInfoProvider.javaOnWindows();
}

public void setSegmentsForSequenceNumberCheck(int segmentsForSequenceNumberCheck) {
this.segmentsForSequenceNumberCheck = segmentsForSequenceNumberCheck;
}

public void setFlags(int flags) {
this.flags = flags;
Expand Down Expand Up @@ -413,4 +418,8 @@ public StorageProperties withRetentionTime(Duration[] retentionTime) {
clone.retentionTime = retentionTime;
return clone;
}

public int segmentsForSequenceNumberCheck() {
return segmentsForSequenceNumberCheck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ public CompletableFuture<Long> store(List<Event> eventList) {
public Runnable reserveSequenceNumbers(List<Event> eventList) {
return sequenceNumberCache.reserveSequenceNumbers(eventList, false);
}

@Override
public void clearSequenceNumberCache() {
sequenceNumberCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ default long waitingTransactions() {
default void cancelPendingTransactions() {

}

default void clearSequenceNumberCache() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void listAggregateEvents(String context, Authentication principal, GetAgg
.set(signal.get().getAggregateSequenceNumber());
}
})
.timeout(Duration.ofSeconds(listEventsTimeoutMillis))
.timeout(Duration.ofMillis(listEventsTimeoutMillis))
.onErrorMap(TimeoutException.class, e -> new MessagingPlatformException(LIST_AGGREGATE_EVENTS_TIMEOUT,
"Timeout exception: No events were emitted from event store in last " + listEventsTimeoutMillis + "ms. Check the logs for virtual machine errors like OutOfMemoryError."))
.retryWhen(retrySpec
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.axoniq.axonserver.localstorage;

import io.axoniq.axonserver.exception.ErrorCode;
import io.axoniq.axonserver.exception.MessagingPlatformException;
import io.axoniq.axonserver.localstorage.file.PrimaryEventStore;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.junit.Assert.*;

public class EventWriteStorageTest {
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
private static TestInputStreamStorageContainer container;

@BeforeClass
public static void setUp() throws Exception {
container = new TestInputStreamStorageContainer(tempFolder.getRoot(), embeddedDBProperties -> {
embeddedDBProperties.getEvent().setSegmentsForSequenceNumberCheck(100);
return embeddedDBProperties;
});
container.createDummyEvents(1000, 150, "sample");
container.getEventWriter().clearSequenceNumberCache();
PrimaryEventStore primaryEventStore = (PrimaryEventStore)container.getPrimary();
while( primaryEventStore.activeSegmentCount() > 1 ) {
Thread.sleep(10);
}

}

@Test
public void store() {
try {
container.createDummyEvents(1, 100, "sample");
fail("should not be able to store new event with sequence number 0 for existing aggregate");
} catch (MessagingPlatformException messagingPlatformException) {
assertEquals(ErrorCode.INVALID_SEQUENCE, messagingPlatformException.getErrorCode());
}
}

@AfterClass
public static void close() {
container.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

import io.axoniq.axonserver.config.FileSystemMonitor;
import io.axoniq.axonserver.config.SystemInfoProvider;
import io.axoniq.axonserver.exception.MessagingPlatformException;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.localstorage.file.EmbeddedDBProperties;
import io.axoniq.axonserver.localstorage.file.StandardEventStoreFactory;
import io.axoniq.axonserver.localstorage.file.SegmentBasedEventStore;
import io.axoniq.axonserver.localstorage.file.StandardEventStoreFactory;
import io.axoniq.axonserver.localstorage.transaction.DefaultStorageTransactionManagerFactory;
import io.axoniq.axonserver.localstorage.transaction.SingleInstanceTransactionManager;
import io.axoniq.axonserver.localstorage.transaction.StorageTransactionManager;
Expand All @@ -29,6 +30,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -46,15 +49,18 @@ public class TestInputStreamStorageContainer {

private FileSystemMonitor fileSystemMonitor = mock(FileSystemMonitor.class);


public TestInputStreamStorageContainer(File location) throws IOException {
this(location, e-> e);
}
public TestInputStreamStorageContainer(File location, Function<EmbeddedDBProperties, EmbeddedDBProperties> propertiesCustomizer) throws IOException {
EmbeddedDBProperties embeddedDBProperties = new EmbeddedDBProperties(new SystemInfoProvider() {
});
embeddedDBProperties.getEvent().setStorage(location.getAbsolutePath());
embeddedDBProperties.getEvent().setSegmentSize(256 * 1024L);
embeddedDBProperties.getEvent().setForceInterval(10000);
embeddedDBProperties.getSnapshot().setStorage(location.getAbsolutePath());
embeddedDBProperties.getSnapshot().setSegmentSize(512 * 1024L);
embeddedDBProperties = propertiesCustomizer.apply(embeddedDBProperties);
MeterFactory meterFactory = new MeterFactory(new SimpleMeterRegistry(), new DefaultMetricCollector());

doNothing().when(fileSystemMonitor).registerPath(any(), any());
Expand All @@ -76,7 +82,9 @@ public void createDummyEvents(int transactions, int transactionSize) {
}
public void createDummyEvents(int transactions, int transactionSize, String prefix) {
CountDownLatch countDownLatch = new CountDownLatch(transactions);
IntStream.range(0, transactions).parallel().forEach(j -> {
AtomicReference<Throwable> error = new AtomicReference<>();

IntStream.range(0, transactions).forEach(j -> {
String aggId = prefix + j;
List<Event> newEvents = new ArrayList<>();
IntStream.range(0, transactionSize).forEach(i -> {
Expand All @@ -85,13 +93,22 @@ public void createDummyEvents(int transactions, int transactionSize, String pref
SerializedObject
.newBuilder().build()).build());
});
eventWriter.store(newEvents).whenComplete((r,t) -> countDownLatch.countDown());
eventWriter.store(newEvents).whenComplete((r,t) -> {
if (t != null) {
error.set(t);
}
countDownLatch.countDown();
});
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

if (error.get() != null) {
throw MessagingPlatformException.create(error.get());
}
}

public EventStorageEngine getDatafileManagerChain() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ public void testMetricsInReadingAggregatesEvents() {
//TODO
}

@Test
public void aggregateEventsReusedAggregateIdentifier() throws InterruptedException {
PrimaryEventStore testSubject = primaryEventStore();
setupEvents(testSubject, 10000, 20);
setupEvents(testSubject, 1, 5);

List<SerializedEvent> events = testSubject.eventsPerAggregate("aggregate-0", 5, Long.MAX_VALUE, 0)
.collectList().block();
assertNotNull(events);
assertEquals(0, events.size());
}

@Test
public void transactionsIterator() throws InterruptedException {
PrimaryEventStore testSubject = primaryEventStore();
Expand Down Expand Up @@ -183,7 +195,7 @@ public void readClosedIterator() throws InterruptedException {
private void setupEvents(PrimaryEventStore testSubject, int numOfTransactions, int numOfEvents) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(numOfTransactions);
IntStream.range(0, numOfTransactions).forEach(j -> {
String aggId = UUID.randomUUID().toString();
String aggId = "aggregate-" + j;
List<Event> newEvents = new ArrayList<>();
IntStream.range(0, numOfEvents).forEach(i -> {
newEvents.add(Event.newBuilder().setAggregateIdentifier(aggId)
Expand All @@ -194,14 +206,7 @@ private void setupEvents(PrimaryEventStore testSubject, int numOfTransactions, i
testSubject.store(newEvents).thenAccept(t -> latch.countDown());
});

latch.await(5, TimeUnit.SECONDS);
}

private void storeEvent(PrimaryEventStore testSubject) {
CountDownLatch latch = new CountDownLatch(1);
Event newEvent = Event.newBuilder().setAggregateIdentifier("11111").setAggregateSequenceNumber(0)
.setAggregateType("Demo").setPayload(SerializedObject.newBuilder().build()).build();
testSubject.store(singletonList(newEvent)).thenAccept(t -> latch.countDown());
assertTrue(latch.await(5, TimeUnit.SECONDS));
}

@Test
Expand All @@ -219,7 +224,7 @@ public void testGlobalIterator() throws InterruptedException {
testSubject.store(newEvents).thenAccept(t -> latch.countDown());
});

latch.await(5, TimeUnit.SECONDS);
assertTrue(latch.await(5, TimeUnit.SECONDS));
try (CloseableIterator<SerializedEventWithToken> iterator = testSubject
.getGlobalIterator(0)) {
SerializedEventWithToken serializedEventWithToken = null;
Expand Down

0 comments on commit 53a80e2

Please sign in to comment.