Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mongo): Replace UPDATE_UPSERT_AFTER_OPTIONS with ensureIdSegment #619

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cosid-mongo/src/main/java/me/ahoo/cosid/mongo/Documents.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@

public interface Documents {
String ID_FIELD = "_id";

FindOneAndUpdateOptions UPDATE_AFTER_OPTIONS = new FindOneAndUpdateOptions()
.returnDocument(ReturnDocument.AFTER)
.maxTime(BlockingAdapter.DEFAULT_TIME_OUT.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS);

FindOneAndUpdateOptions UPDATE_UPSERT_AFTER_OPTIONS = new FindOneAndUpdateOptions()
.upsert(true)
.returnDocument(ReturnDocument.AFTER)
.maxTime(BlockingAdapter.DEFAULT_TIME_OUT.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS);


}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ public long incrementAndGet(String namespacedName, long step) {
Document afterDoc = cosidCollection.findOneAndUpdate(
Filters.eq(Documents.ID_FIELD, namespacedName),
incrementAndGetUpdates(step),
Documents.UPDATE_AFTER_OPTIONS);
Documents.UPDATE_UPSERT_AFTER_OPTIONS);

assert afterDoc != null;
Preconditions.checkNotNull(afterDoc, "IdSegment[%s] can not be null!", namespacedName);
Long lastMaxId = afterDoc.getLong(IdSegmentOperates.LAST_MAX_ID_FIELD);
return Objects.requireNonNull(lastMaxId);
}


@Deprecated
@Override
public boolean ensureIdSegment(String segmentName, long offset) {
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,15 @@
*/
public class MongoIdSegmentDistributorFactory implements IdSegmentDistributorFactory {
private final MongoDatabase mongoDatabase;
private final boolean enableAutoInitIdSegment;

public MongoIdSegmentDistributorFactory(MongoDatabase mongoDatabase, boolean enableAutoInitIdSegment) {

public MongoIdSegmentDistributorFactory(MongoDatabase mongoDatabase) {
this.mongoDatabase = mongoDatabase;
this.enableAutoInitIdSegment = enableAutoInitIdSegment;
}

@Override
public IdSegmentDistributor create(IdSegmentDistributorDefinition definition) {
MongoIdSegmentCollection cosIdSegmentCollection = new MongoIdSegmentCollection(mongoDatabase.getCollection(COLLECTION_NAME));
if (enableAutoInitIdSegment) {
cosIdSegmentCollection.ensureIdSegment(definition.getNamespacedName(), definition.getOffset());
}


return new MongoIdSegmentDistributor(definition.getNamespace(),
definition.getName(),
definition.getStep(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public long incrementAndGet(String namespacedName, long step) {
Publisher<Document> publisher = cosidCollection.findOneAndUpdate(
Filters.eq(Documents.ID_FIELD, namespacedName),
incrementAndGetUpdates(step),
Documents.UPDATE_AFTER_OPTIONS);
Documents.UPDATE_UPSERT_AFTER_OPTIONS);
Document afterDoc = BlockingAdapter.block(publisher);

assert afterDoc != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,14 @@

public class MongoReactiveIdSegmentDistributorFactory implements IdSegmentDistributorFactory {
private final MongoDatabase mongoDatabase;
private final boolean enableAutoInitIdSegment;

public MongoReactiveIdSegmentDistributorFactory(MongoDatabase mongoDatabase, boolean enableAutoInitIdSegment) {

public MongoReactiveIdSegmentDistributorFactory(MongoDatabase mongoDatabase) {
this.mongoDatabase = mongoDatabase;
this.enableAutoInitIdSegment = enableAutoInitIdSegment;
}

@Override
public IdSegmentDistributor create(IdSegmentDistributorDefinition definition) {
IdSegmentCollection idSegmentCollection = new MongoReactiveIdSegmentCollection(mongoDatabase.getCollection(COLLECTION_NAME));
if (enableAutoInitIdSegment) {
idSegmentCollection.ensureIdSegment(definition.getNamespacedName(), definition.getOffset());
}

return new MongoIdSegmentDistributor(definition.getNamespace(),
definition.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void setup() {
idSegmentInitializer = new MongoReactiveIdSegmentInitializer(mongoDatabase);
idSegmentInitializer.ensureCosIdCollection();
distributorFactory =
new MongoReactiveIdSegmentDistributorFactory(mongoDatabase, true);
new MongoReactiveIdSegmentDistributorFactory(mongoDatabase);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void setup() {
idSegmentInitializer = new MongoIdSegmentInitializer(mongoDatabase);
idSegmentInitializer.ensureCosIdCollection();
distributorFactory =
new MongoIdSegmentDistributorFactory(mongoDatabase, true);
new MongoIdSegmentDistributorFactory(mongoDatabase);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void setup() {

idSegmentInitializer.ensureCosIdCollection();
distributorFactory =
new MongoReactiveIdSegmentDistributorFactory(mongoDatabase, true);
new MongoReactiveIdSegmentDistributorFactory(mongoDatabase);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public MongoIdSegmentDistributorFactory mongoIdSegmentDistributorFactory(MongoCl
segmentIdProperties.getDistributor().getMongo().getDatabase()
);
idSegmentInitializer.ensureCosIdCollection();
return new MongoIdSegmentDistributorFactory(mongoDatabase, true);
return new MongoIdSegmentDistributorFactory(mongoDatabase);
}
}

Expand All @@ -103,8 +103,7 @@ public MongoReactiveIdSegmentDistributorFactory mongoReactiveIdSegmentDistributo
segmentIdProperties.getDistributor().getMongo().getDatabase()
);
return new MongoReactiveIdSegmentDistributorFactory(
mongoDatabase,
true);
mongoDatabase);
}
}
}
Loading