Skip to content

Commit

Permalink
Merge pull request #2017 from beyonnex-io/bugfix/background-sync-stuck
Browse files Browse the repository at this point in the history
fix search background sync being stuck
  • Loading branch information
thjaeckle authored Sep 13, 2024
2 parents 5d6059b + cb31b34 commit 9baafda
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 30 deletions.
2 changes: 1 addition & 1 deletion deployment/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version: '3.4'

services:
mongodb:
image: docker.io/mongo:6.0
image: docker.io/mongo:7.0
deploy:
resources:
limits:
Expand Down
2 changes: 1 addition & 1 deletion deployment/docker/sandbox/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version: '3.4'

services:
mongodb:
image: docker.io/mongo:6.0
image: docker.io/mongo:7.0
deploy:
resources:
limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ spec:
terminationGracePeriodSeconds: 30
containers:
- name: mongodb
image: docker.io/mongo:6.0
image: docker.io/mongo:7.0
command:
- mongod
- --bind_ip
Expand Down
2 changes: 1 addition & 1 deletion deployment/kubernetes/deploymentFiles/mongodb/mongodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ spec:
spec:
containers:
- name: mongodb
image: docker.io/mongo:6.0
image: docker.io/mongo:7.0
command:
- mongod
- --storageEngine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,23 @@

import javax.annotation.Nullable;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.BroadcastHub;
import org.apache.pekko.stream.javadsl.RestartSource;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.Document;
import org.eclipse.ditto.internal.utils.pekko.streaming.TimestampPersistence;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.MongoCommandException;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.BroadcastHub;
import org.apache.pekko.stream.javadsl.RestartSource;
import org.apache.pekko.stream.javadsl.Source;

/**
* MongoDB implementation of {@link TimestampPersistence}.
*/
Expand Down Expand Up @@ -187,21 +183,13 @@ private static Source<Done, NotUsed> repeatableCreateCappedCollectionSource(
.sizeInBytes(cappedCollectionSizeInBytes)
.maxDocuments(1);

return Source.lazySource(
() -> Source.fromPublisher(database.createCollection(collectionName, collectionOptions)))
.mapMaterializedValue(whatever -> NotUsed.getInstance())
.map(nullValue -> Done.done())
.withAttributes(Attributes.inputBuffer(1, 1))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Done, NotUsed>>()
.match(MongoCommandException.class,
MongoTimestampPersistence::isCollectionAlreadyExistsError,
error -> Source.single(Done.done()))
.build());

}

private static boolean isCollectionAlreadyExistsError(final MongoCommandException error) {
return error.getErrorCode() == COLLECTION_ALREADY_EXISTS_ERROR_CODE;
return Source.fromPublisher(database.listCollectionNames())
.filter(name -> name.equals(collectionName))
.map(name -> Done.done())
.orElse(
Source.fromPublisher(database.createCollection(collectionName, collectionOptions))
.map(aVoid -> Done.done())
);
}

}

0 comments on commit 9baafda

Please sign in to comment.