From 99f11b6bf58ff0221934505757530d72ce82aa92 Mon Sep 17 00:00:00 2001 From: Mirko Zizzari Date: Thu, 27 Jul 2023 10:46:27 +0200 Subject: [PATCH] issue #528: work by chunks --- .../datasource/listener/DatasourcePurge.java | 80 +++++++++++++++---- 1 file changed, 66 insertions(+), 14 deletions(-) diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/listener/DatasourcePurge.java b/core/app/datasource/src/main/java/io/openk9/datasource/listener/DatasourcePurge.java index 3e3b64b9d..3661174cc 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/listener/DatasourcePurge.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/listener/DatasourcePurge.java @@ -19,8 +19,11 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.HashSet; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; import java.util.stream.Collectors; @@ -30,7 +33,8 @@ public sealed interface Command {} private enum Start implements Command {INSTANCE} private enum Stop implements Command {INSTANCE} private enum FetchDataIndexOrphans implements Command {INSTANCE} - private record SetDataIndexOrphan(List dataIndices) implements Command {} + private record PrepareChunks(List dataIndices) implements Command {} + private enum WorkNextChunk implements Command {INSTANCE} private enum DeleteEsIndices implements Command {INSTANCE} private record EsDeleteError(Throwable error) implements Command {} private enum DeleteDataIndices implements Command {INSTANCE} @@ -51,7 +55,8 @@ private enum DeleteDataIndices implements Command {INSTANCE} private final RestHighLevelClient esClient; private final TransactionInvoker txInvoker; private final Duration maxAge; - private Set dataIndexOrphans; + private final Deque> chunks = new ArrayDeque<>(); + private List currentChunk; public DatasourcePurge( ActorContext context, String tenantName, long datasourceId, @@ -78,7 +83,8 @@ public Receive createReceive() { return newReceiveBuilder() .onMessageEquals(Start.INSTANCE, this::onStart) .onMessageEquals(FetchDataIndexOrphans.INSTANCE, this::onFetchDataIndexOrphans) - .onMessage(SetDataIndexOrphan.class, this::onSetDataIndexOrphanIds) + .onMessage(PrepareChunks.class, this::onPrepareChunks) + .onMessageEquals(WorkNextChunk.INSTANCE, this::onWorkNextChunk) .onMessageEquals(DeleteEsIndices.INSTANCE, this::onDeleteEsIndices) .onMessageEquals(DeleteDataIndices.INSTANCE, this::onDeleteDataIndices) .onMessage(EsDeleteError.class, this::onEsDeleteError) @@ -110,25 +116,71 @@ private Behavior onFetchDataIndexOrphans() { ) .getResultList() ).invoke(dataIndices -> - getContext().getSelf().tell(new SetDataIndexOrphan(dataIndices)))); + getContext().getSelf().tell(new PrepareChunks(dataIndices)))); return Behaviors.same(); } - private Behavior onSetDataIndexOrphanIds(SetDataIndexOrphan sdioi) { - this.dataIndexOrphans = new HashSet<>(sdioi.dataIndices); + private Behavior onPrepareChunks(PrepareChunks prepareChunks) { + List dataIndices = prepareChunks.dataIndices; - getContext().getLog().info( - "DataIndex orphans found for datasource {}-{}: {}", - tenantName, datasourceId, dataIndexOrphans.size()); + if (dataIndices != null && !dataIndices.isEmpty()) { + int chunkSize = 10; + + int listSize = dataIndices.size(); + + int lastIndex = listSize - 1; + + boolean lastChunk = false; + + int i = 0; + + while (!lastChunk) { + int fromIndex = chunkSize * i; + int toIndex = chunkSize * ++i; + lastChunk = toIndex >= lastIndex; + + chunks.add(new ArrayList<>( + dataIndices.subList(fromIndex, lastChunk ? lastIndex : toIndex)) + ); + } + + getContext().getLog().info( + "DataIndex orphans found for datasource {}-{}: {}", + tenantName, datasourceId, listSize); - getContext().getSelf().tell(DeleteEsIndices.INSTANCE); + getContext().getLog().info("Chunks to work for datasource {}-{}: {}", + tenantName, datasourceId, i); + + getContext().getSelf().tell(WorkNextChunk.INSTANCE); + } + else { + getContext().getLog().info( + "No DataIndex orphans found for datasource {}-{}", tenantName, datasourceId); + getContext().getSelf().tell(Stop.INSTANCE); + } + + return Behaviors.same(); + } + + private Behavior onWorkNextChunk() { + try { + this.currentChunk = chunks.pop(); + getContext().getLog().info( + "Working on a chunk for datasource {}-{}", tenantName, datasourceId); + getContext().getSelf().tell(DeleteEsIndices.INSTANCE); + } + catch (NoSuchElementException e) { + getContext().getLog().info( + "No more chunks to work for datasource {}-{}", tenantName, datasourceId); + getContext().getSelf().tell(Stop.INSTANCE); + } return Behaviors.same(); } private Behavior onDeleteEsIndices() { - String[] names = dataIndexOrphans + String[] names = currentChunk .stream() .map(DataIndex::getName) .toArray(String[]::new); @@ -164,7 +216,7 @@ private Behavior onDeleteDataIndices() { getContext().getLog().info( "Deleting DataIndex orphans for datasource {}-{}", tenantName, datasourceId); - Set ids = dataIndexOrphans + Set ids = currentChunk .stream() .map(K9Entity::getId) .collect(Collectors.toSet()); @@ -180,7 +232,7 @@ private Behavior onDeleteDataIndices() { .executeUpdate() ) ) - .invoke(ignore -> getContext().getSelf().tell(Stop.INSTANCE))); + .invoke(ignore -> getContext().getSelf().tell(WorkNextChunk.INSTANCE))); return Behaviors.same(); }