Skip to content

Commit

Permalink
issue #528: work by chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-vi committed Jul 27, 2023
1 parent 6b2c308 commit 99f11b6
Showing 1 changed file with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashSet;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -30,7 +33,8 @@ public sealed interface Command {}
private enum Start implements Command {INSTANCE}
private enum Stop implements Command {INSTANCE}
private enum FetchDataIndexOrphans implements Command {INSTANCE}
private record SetDataIndexOrphan(List<DataIndex> dataIndices) implements Command {}
private record PrepareChunks(List<DataIndex> dataIndices) implements Command {}
private enum WorkNextChunk implements Command {INSTANCE}
private enum DeleteEsIndices implements Command {INSTANCE}
private record EsDeleteError(Throwable error) implements Command {}
private enum DeleteDataIndices implements Command {INSTANCE}
Expand All @@ -51,7 +55,8 @@ private enum DeleteDataIndices implements Command {INSTANCE}
private final RestHighLevelClient esClient;
private final TransactionInvoker txInvoker;
private final Duration maxAge;
private Set<DataIndex> dataIndexOrphans;
private final Deque<List<DataIndex>> chunks = new ArrayDeque<>();
private List<DataIndex> currentChunk;

public DatasourcePurge(
ActorContext<Command> context, String tenantName, long datasourceId,
Expand All @@ -78,7 +83,8 @@ public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessageEquals(Start.INSTANCE, this::onStart)
.onMessageEquals(FetchDataIndexOrphans.INSTANCE, this::onFetchDataIndexOrphans)
.onMessage(SetDataIndexOrphan.class, this::onSetDataIndexOrphanIds)
.onMessage(PrepareChunks.class, this::onPrepareChunks)
.onMessageEquals(WorkNextChunk.INSTANCE, this::onWorkNextChunk)
.onMessageEquals(DeleteEsIndices.INSTANCE, this::onDeleteEsIndices)
.onMessageEquals(DeleteDataIndices.INSTANCE, this::onDeleteDataIndices)
.onMessage(EsDeleteError.class, this::onEsDeleteError)
Expand Down Expand Up @@ -110,25 +116,71 @@ private Behavior<Command> onFetchDataIndexOrphans() {
)
.getResultList()
).invoke(dataIndices ->
getContext().getSelf().tell(new SetDataIndexOrphan(dataIndices))));
getContext().getSelf().tell(new PrepareChunks(dataIndices))));

return Behaviors.same();
}

private Behavior<Command> onSetDataIndexOrphanIds(SetDataIndexOrphan sdioi) {
this.dataIndexOrphans = new HashSet<>(sdioi.dataIndices);
private Behavior<Command> onPrepareChunks(PrepareChunks prepareChunks) {
List<DataIndex> dataIndices = prepareChunks.dataIndices;

getContext().getLog().info(
"DataIndex orphans found for datasource {}-{}: {}",
tenantName, datasourceId, dataIndexOrphans.size());
if (dataIndices != null && !dataIndices.isEmpty()) {
int chunkSize = 10;

int listSize = dataIndices.size();

int lastIndex = listSize - 1;

boolean lastChunk = false;

int i = 0;

while (!lastChunk) {
int fromIndex = chunkSize * i;
int toIndex = chunkSize * ++i;
lastChunk = toIndex >= lastIndex;

chunks.add(new ArrayList<>(
dataIndices.subList(fromIndex, lastChunk ? lastIndex : toIndex))
);
}

getContext().getLog().info(
"DataIndex orphans found for datasource {}-{}: {}",
tenantName, datasourceId, listSize);

getContext().getSelf().tell(DeleteEsIndices.INSTANCE);
getContext().getLog().info("Chunks to work for datasource {}-{}: {}",
tenantName, datasourceId, i);

getContext().getSelf().tell(WorkNextChunk.INSTANCE);
}
else {
getContext().getLog().info(
"No DataIndex orphans found for datasource {}-{}", tenantName, datasourceId);
getContext().getSelf().tell(Stop.INSTANCE);
}

return Behaviors.same();
}

private Behavior<Command> onWorkNextChunk() {
try {
this.currentChunk = chunks.pop();
getContext().getLog().info(
"Working on a chunk for datasource {}-{}", tenantName, datasourceId);
getContext().getSelf().tell(DeleteEsIndices.INSTANCE);
}
catch (NoSuchElementException e) {
getContext().getLog().info(
"No more chunks to work for datasource {}-{}", tenantName, datasourceId);
getContext().getSelf().tell(Stop.INSTANCE);
}

return Behaviors.same();
}

private Behavior<Command> onDeleteEsIndices() {
String[] names = dataIndexOrphans
String[] names = currentChunk
.stream()
.map(DataIndex::getName)
.toArray(String[]::new);
Expand Down Expand Up @@ -164,7 +216,7 @@ private Behavior<Command> onDeleteDataIndices() {
getContext().getLog().info(
"Deleting DataIndex orphans for datasource {}-{}", tenantName, datasourceId);

Set<Long> ids = dataIndexOrphans
Set<Long> ids = currentChunk
.stream()
.map(K9Entity::getId)
.collect(Collectors.toSet());
Expand All @@ -180,7 +232,7 @@ private Behavior<Command> onDeleteDataIndices() {
.executeUpdate()
)
)
.invoke(ignore -> getContext().getSelf().tell(Stop.INSTANCE)));
.invoke(ignore -> getContext().getSelf().tell(WorkNextChunk.INSTANCE)));

return Behaviors.same();
}
Expand Down

0 comments on commit 99f11b6

Please sign in to comment.