From 1972edf47c3c852607218b9b23d5c5402531ff80 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 6 Dec 2022 08:24:34 -0700 Subject: [PATCH 1/3] Add $documents stage --- .../com/mongodb/client/model/Aggregates.java | 35 ++++++++++++++++--- .../mongodb/client/model/AggregatesTest.java | 25 +++++++++++++ .../mongodb/client/test/CollectionHelper.java | 15 ++++++-- .../org/mongodb/scala/model/Aggregates.scala | 10 ++++++ .../mongodb/scala/model/AggregatesSpec.scala | 11 ++++++ 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/driver-core/src/main/com/mongodb/client/model/Aggregates.java b/driver-core/src/main/com/mongodb/client/model/Aggregates.java index 54e8744cc67..c3089cb7a90 100644 --- a/driver-core/src/main/com/mongodb/client/model/Aggregates.java +++ b/driver-core/src/main/com/mongodb/client/model/Aggregates.java @@ -928,7 +928,7 @@ public static Bson searchMeta(final SearchCollector collector, final SearchOptio * * @param fields the fields to exclude. May use dot notation. * @return the $unset pipeline stage - * @mongodb.driver.manual reference/operator/aggregation/project/ $unset + * @mongodb.driver.manual reference/operator/aggregation/unset/ $unset * @mongodb.server.release 4.2 * @since 4.8 */ @@ -941,7 +941,7 @@ public static Bson unset(final String... fields) { * * @param fields the fields to exclude. May use dot notation. * @return the $unset pipeline stage - * @mongodb.driver.manual reference/operator/aggregation/project/ $unset + * @mongodb.driver.manual reference/operator/aggregation/unset/ $unset * @mongodb.server.release 4.2 * @since 4.8 */ @@ -962,7 +962,7 @@ public static Bson unset(final List fields) { * To specify a field within an embedded document, use dot notation. * @param options {@link GeoNearOptions} * @return the $geoNear pipeline stage - * @mongodb.driver.manual reference/operator/aggregation/project/ $geoNear + * @mongodb.driver.manual reference/operator/aggregation/geoNear/ $geoNear * @since 4.8 */ public static Bson geoNear( @@ -1012,7 +1012,7 @@ public String toString() { * @param distanceField The output field that contains the calculated distance. * To specify a field within an embedded document, use dot notation. * @return the $geoNear pipeline stage - * @mongodb.driver.manual reference/operator/aggregation/project/ $geoNear + * @mongodb.driver.manual reference/operator/aggregation/geoNear/ $geoNear * @since 4.8 */ public static Bson geoNear( @@ -1021,6 +1021,33 @@ public static Bson geoNear( return geoNear(near, distanceField, geoNearOptions()); } + /** + * Creates a $documents pipeline stage. + * + * @param documents the documents. + * @return the $documents pipeline stage. + * @mongodb.driver.manual reference/operator/aggregation/documents/ $documents + * @mongodb.server.release 5.1 + * @since 4.9 + */ + public static Bson documents(final List documents) { + notNull("documents", documents); + return new Bson() { + @Override + public BsonDocument toBsonDocument(final Class documentClass, final CodecRegistry codecRegistry) { + BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); + writer.writeStartDocument(); + writer.writeStartArray("$documents"); + for (Bson bson : documents) { + BuildersHelper.encodeValue(writer, bson, codecRegistry); + } + writer.writeEndArray(); + writer.writeEndDocument(); + return writer.getDocument(); + } + }; + } + static void writeBucketOutput(final CodecRegistry codecRegistry, final BsonDocumentWriter writer, @Nullable final List output) { if (output != null) { diff --git a/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java b/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java index 0afa1a5488a..c746e00f2a4 100644 --- a/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java +++ b/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java @@ -21,6 +21,7 @@ import org.bson.BsonDocument; import org.bson.Document; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -160,4 +161,28 @@ public void testGeoNear() { + " }\n" + "}]"); } + + @Test + public void testDocuments() { + assumeTrue(serverVersionAtLeast(5, 1)); + Bson stage = Aggregates.documents(asList( + Document.parse("{a: 1, b: {$add: [1, 1]} }"), + BsonDocument.parse("{a: 3, b: 4}"))); + assertPipeline( + "{$documents: [{a: 1, b: {$add: [1, 1]}}, {a: 3, b: 4}]}", + stage); + + List pipeline = Arrays.asList(stage); + getCollectionHelper().aggregateDb(pipeline); + + assertEquals( + parseToList("[{a: 1, b: 2}, {a: 3, b: 4}]"), + getCollectionHelper().aggregateDb(pipeline)); + + // accepts lists of Documents and BsonDocuments + List documents = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}")); + assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(documents)); + List bsonDocuments = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}")); + assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(bsonDocuments)); + } } diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java index f0e55a1e23e..e2216629a7a 100644 --- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java +++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java @@ -33,6 +33,7 @@ import com.mongodb.internal.bulk.InsertRequest; import com.mongodb.internal.bulk.UpdateRequest; import com.mongodb.internal.bulk.WriteRequest; +import com.mongodb.internal.client.model.AggregationLevel; import com.mongodb.internal.operation.AggregateOperation; import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.CommandReadOperation; @@ -281,12 +282,20 @@ public List aggregate(final List pipeline) { } public List aggregate(final List pipeline, final Decoder decoder) { - List bsonDocumentPipeline = new ArrayList<>(); + return aggregate(pipeline, decoder, AggregationLevel.COLLECTION); + } + + public List aggregateDb(final List pipeline) { + return aggregate(pipeline, codec, AggregationLevel.DATABASE); + } + + private List aggregate(final List pipeline, final Decoder decoder, final AggregationLevel level) { + List bsonDocumentPipeline = new ArrayList(); for (Bson cur : pipeline) { bsonDocumentPipeline.add(cur.toBsonDocument(Document.class, registry)); } - BatchCursor cursor = new AggregateOperation<>(namespace, bsonDocumentPipeline, decoder) - .execute(getBinding()); + BatchCursor cursor = new AggregateOperation(namespace, bsonDocumentPipeline, decoder, level) + .execute(getBinding()); List results = new ArrayList<>(); while (cursor.hasNext()) { results.addAll(cursor.next()); diff --git a/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala b/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala index 2d5a71cf960..6b56b73105e 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala @@ -746,4 +746,14 @@ object Aggregates { */ def geoNear(near: Point, distanceField: String): Bson = JAggregates.geoNear(near, distanceField) + + /** + * Creates a `\$documents` pipeline stage. + * + * @param documents the documents. + * @return the `\$documents` pipeline stage + * @see [[https://www.mongodb.com/docs/manual/reference/operator/aggregation/documents/ \$documents]] + * @since 4.9 + */ + def documents(documents: Bson*): Bson = JAggregates.documents(documents.asJava) } diff --git a/driver-scala/src/test/scala/org/mongodb/scala/model/AggregatesSpec.scala b/driver-scala/src/test/scala/org/mongodb/scala/model/AggregatesSpec.scala index 61e1ed2382b..9e4217bbc86 100644 --- a/driver-scala/src/test/scala/org/mongodb/scala/model/AggregatesSpec.scala +++ b/driver-scala/src/test/scala/org/mongodb/scala/model/AggregatesSpec.scala @@ -805,4 +805,15 @@ class AggregatesSpec extends BaseSpec { |}""".stripMargin) ) } + + it should "render $documents" in { + toBson( + Aggregates.documents( + org.mongodb.scala.bson.BsonDocument("""{a: 1, b: {$add: [1, 1]} }"""), + Document("""{a: 3, b: 4}""") + ) + ) should equal( + Document("""{$documents: [{a: 1, b: {$add: [1, 1]}}, {a: 3, b: 4}]}""") + ) + } } From 298b406e797b394305df400d46d21820e62ff4b7 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 6 Dec 2022 13:28:52 -0700 Subject: [PATCH 2/3] Specify "from" behaviour in lookup when $documents is first stage --- .../com/mongodb/client/model/Aggregates.java | 10 ++++++++-- .../mongodb/client/model/AggregatesTest.java | 20 +++++++++++++++++++ .../org/mongodb/scala/model/Aggregates.scala | 10 ++++++++-- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/driver-core/src/main/com/mongodb/client/model/Aggregates.java b/driver-core/src/main/com/mongodb/client/model/Aggregates.java index c3089cb7a90..be409568531 100644 --- a/driver-core/src/main/com/mongodb/client/model/Aggregates.java +++ b/driver-core/src/main/com/mongodb/client/model/Aggregates.java @@ -299,7 +299,10 @@ public static Bson lookup(final String from, final String localField, final Stri } /** - * Creates a $lookup pipeline stage, joining the current collection with the one specified in from using the given pipeline + * Creates a $lookup pipeline stage, joining the current collection with the + * one specified in from using the given pipeline. If the first stage in the + * pipeline is a {@link Aggregates#documents(List) $documents} stage, then + * the {@code from} collection is overridden (and therefore ignored). * * @param from the name of the collection in the same database to perform the join with. * @param pipeline the pipeline to run on the joined collection. @@ -315,7 +318,10 @@ public static Bson lookup(final String from, final List pipeline } /** - * Creates a $lookup pipeline stage, joining the current collection with the one specified in from using the given pipeline + * Creates a $lookup pipeline stage, joining the current collection with the + * one specified in from using the given pipeline. If the first stage in the + * pipeline is a {@link Aggregates#documents(List) $documents} stage, then + * the {@code from} collection is overridden (and therefore ignored). * * @param the Variable value expression type * @param from the name of the collection in the same database to perform the join with. diff --git a/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java b/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java index c746e00f2a4..f78cff8a9c1 100644 --- a/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java +++ b/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java @@ -185,4 +185,24 @@ public void testDocuments() { List bsonDocuments = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}")); assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(bsonDocuments)); } + + @Test + public void testDocumentsLookup() { + assumeTrue(serverVersionAtLeast(5, 1)); + + getCollectionHelper().insertDocuments("[{_id: 1, a: 8}, {_id: 2, a: 9}]"); + + Bson docstage = Aggregates.documents(asList(Document.parse("{a: 5}"))); + Bson stage = Aggregates.lookup("ignored", Arrays.asList(docstage), "added"); + assertPipeline( + "{'$lookup': {'from': 'ignored', 'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}", + stage); + + List pipeline = Arrays.asList(stage); + getCollectionHelper().aggregate(pipeline); + + assertEquals( + parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"), + getCollectionHelper().aggregate(pipeline)); + } } diff --git a/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala b/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala index 6b56b73105e..787cdee4528 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala @@ -324,7 +324,10 @@ object Aggregates { JAggregates.lookup(from, localField, foreignField, as) /** - * Creates a `\$lookup` pipeline stage, joining the current collection with the one specified in from using the given pipeline + * Creates a `\$lookup` pipeline stage, joining the current collection with + * the one specified in from using the given pipeline. If the first stage in + * the pipeline is a `\$documents` stage, then the "from" collection is + * overridden (and therefore ignored). * * @param from the name of the collection in the same database to perform the join with. * @param pipeline the pipeline to run on the joined collection. @@ -338,7 +341,10 @@ object Aggregates { JAggregates.lookup(from, pipeline.asJava, as) /** - * Creates a `\$lookup` pipeline stage, joining the current collection with the one specified in from using the given pipeline + * Creates a `\$lookup` pipeline stage, joining the current collection with + * the one specified in from using the given pipeline. If the first stage in + * the pipeline is a `\$documents` stage, then the "from" collection is + * overridden (and therefore ignored). * * @param from the name of the collection in the same database to perform the join with. * @param let the variables to use in the pipeline field stages. From 763fb0a15746dd3265ee5b4944df25739fc3dd70 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 8 Dec 2022 10:13:58 -0700 Subject: [PATCH 3/3] Allow from to be nullable, test --- .../com/mongodb/client/model/Aggregates.java | 27 ++++++++++++------- .../mongodb/client/model/AggregatesTest.java | 19 ++++++++----- .../org/mongodb/scala/model/Aggregates.scala | 12 ++++++--- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/driver-core/src/main/com/mongodb/client/model/Aggregates.java b/driver-core/src/main/com/mongodb/client/model/Aggregates.java index be409568531..83c63e2b642 100644 --- a/driver-core/src/main/com/mongodb/client/model/Aggregates.java +++ b/driver-core/src/main/com/mongodb/client/model/Aggregates.java @@ -302,9 +302,11 @@ public static Bson lookup(final String from, final String localField, final Stri * Creates a $lookup pipeline stage, joining the current collection with the * one specified in from using the given pipeline. If the first stage in the * pipeline is a {@link Aggregates#documents(List) $documents} stage, then - * the {@code from} collection is overridden (and therefore ignored). + * the {@code from} collection is ignored. * - * @param from the name of the collection in the same database to perform the join with. + * @param from the name of the collection in the same database to + * perform the join with. May be {$code null} if the + * first pipeline stage is $documents. * @param pipeline the pipeline to run on the joined collection. * @param as the name of the new array field to add to the input documents. * @return the $lookup pipeline stage @@ -313,7 +315,7 @@ public static Bson lookup(final String from, final String localField, final Stri * @since 3.7 * */ - public static Bson lookup(final String from, final List pipeline, final String as) { + public static Bson lookup(@Nullable final String from, final List pipeline, final String as) { return lookup(from, null, pipeline, as); } @@ -321,10 +323,12 @@ public static Bson lookup(final String from, final List pipeline * Creates a $lookup pipeline stage, joining the current collection with the * one specified in from using the given pipeline. If the first stage in the * pipeline is a {@link Aggregates#documents(List) $documents} stage, then - * the {@code from} collection is overridden (and therefore ignored). + * the {@code from} collection is ignored. * * @param the Variable value expression type - * @param from the name of the collection in the same database to perform the join with. + * @param from the name of the collection in the same database to + * perform the join with. May be {$code null} if the + * first pipeline stage is $documents. * @param let the variables to use in the pipeline field stages. * @param pipeline the pipeline to run on the joined collection. * @param as the name of the new array field to add to the input documents. @@ -333,7 +337,7 @@ public static Bson lookup(final String from, final List pipeline * @mongodb.server.release 3.6 * @since 3.7 */ - public static Bson lookup(final String from, @Nullable final List> let, + public static Bson lookup(@Nullable final String from, @Nullable final List> let, final List pipeline, final String as) { return new LookupStage<>(from, let, pipeline, as); } @@ -1275,8 +1279,11 @@ private static final class LookupStage implements Bson { private final List pipeline; private final String as; - private LookupStage(final String from, @Nullable final List> let, final List pipeline, - final String as) { + private LookupStage( + @Nullable final String from, + @Nullable final List> let, + final List pipeline, + final String as) { this.from = from; this.let = let; this.pipeline = pipeline; @@ -1291,7 +1298,9 @@ public BsonDocument toBsonDocument(final Class tDocumentC writer.writeStartDocument("$lookup"); - writer.writeString("from", from); + if (from != null) { + writer.writeString("from", from); + } if (let != null) { writer.writeStartDocument("let"); diff --git a/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java b/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java index f78cff8a9c1..0db3482dfe5 100644 --- a/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java +++ b/driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java @@ -191,18 +191,23 @@ public void testDocumentsLookup() { assumeTrue(serverVersionAtLeast(5, 1)); getCollectionHelper().insertDocuments("[{_id: 1, a: 8}, {_id: 2, a: 9}]"); + Bson documentsStage = Aggregates.documents(asList(Document.parse("{a: 5}"))); - Bson docstage = Aggregates.documents(asList(Document.parse("{a: 5}"))); - Bson stage = Aggregates.lookup("ignored", Arrays.asList(docstage), "added"); + Bson lookupStage = Aggregates.lookup("ignored", Arrays.asList(documentsStage), "added"); assertPipeline( "{'$lookup': {'from': 'ignored', 'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}", - stage); - - List pipeline = Arrays.asList(stage); - getCollectionHelper().aggregate(pipeline); + lookupStage); + assertEquals( + parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"), + getCollectionHelper().aggregate(Arrays.asList(lookupStage))); + // null variant + Bson lookupStageNull = Aggregates.lookup(null, Arrays.asList(documentsStage), "added"); + assertPipeline( + "{'$lookup': {'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}", + lookupStageNull); assertEquals( parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"), - getCollectionHelper().aggregate(pipeline)); + getCollectionHelper().aggregate(Arrays.asList(lookupStageNull))); } } diff --git a/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala b/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala index 787cdee4528..6f39034c1f9 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala @@ -327,9 +327,11 @@ object Aggregates { * Creates a `\$lookup` pipeline stage, joining the current collection with * the one specified in from using the given pipeline. If the first stage in * the pipeline is a `\$documents` stage, then the "from" collection is - * overridden (and therefore ignored). + * ignored. * - * @param from the name of the collection in the same database to perform the join with. + * @param from the name of the collection in the same database to + * perform the join with. May be null if the + * first pipeline stage is `\$documents`. * @param pipeline the pipeline to run on the joined collection. * @param as the name of the new array field to add to the input documents. * @return the `\$lookup` pipeline stage: @@ -344,9 +346,11 @@ object Aggregates { * Creates a `\$lookup` pipeline stage, joining the current collection with * the one specified in from using the given pipeline. If the first stage in * the pipeline is a `\$documents` stage, then the "from" collection is - * overridden (and therefore ignored). + * ignored. * - * @param from the name of the collection in the same database to perform the join with. + * @param from the name of the collection in the same database to + * perform the join with. May be null if the + * first pipeline stage is `\$documents`. * @param let the variables to use in the pipeline field stages. * @param pipeline the pipeline to run on the joined collection. * @param as the name of the new array field to add to the input documents.