diff --git a/CHANGES.md b/CHANGES.md index 5bbfebec6b1d..e856ef18dab3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* MongoDB IO connector added (Go) ([#24575](https://github.com/apache/beam/issues/24575)). ## New Features / Improvements diff --git a/sdks/go.mod b/sdks/go.mod index c7ef57eedbee..e9d206ee49bb 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -48,6 +48,7 @@ require ( github.com/testcontainers/testcontainers-go v0.15.0 github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c + go.mongodb.org/mongo-driver v1.11.1 golang.org/x/net v0.4.0 golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 golang.org/x/sync v0.1.0 @@ -113,11 +114,12 @@ require ( github.com/googleapis/gax-go/v2 v2.7.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.13.1 // indirect + github.com/klauspost/compress v1.13.6 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/moby/sys/mount v0.3.3 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect @@ -128,7 +130,12 @@ require ( github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.opencensus.io v0.24.0 // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect golang.org/x/tools v0.1.12 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/sdks/go.sum b/sdks/go.sum index 3d7c21812c06..49045a6b9370 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -469,6 +469,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -580,8 +581,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -639,6 +641,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= @@ -816,6 +820,8 @@ github.com/testcontainers/testcontainers-go v0.15.0 h1:3Ex7PUGFv0b2bBsdOv6R42+SK github.com/testcontainers/testcontainers-go v0.15.0/go.mod h1:PkohMRH2X8Hib0IWtifVexDfLPVT+tb5E9hsf7cW12w= github.com/tetratelabs/wazero v1.0.0-pre.4 h1:RBJQT5OzmORkSp6MmZDWoFEr0zXjk4pmvMKAdeUnsaI= github.com/tetratelabs/wazero v1.0.0-pre.4/go.mod h1:u8wrFmpdrykiFK0DFPiFm5a4+0RzsdmXYVtijBKqUVo= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= @@ -831,6 +837,12 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= @@ -843,6 +855,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c h1:UDtocVeACpnwauljUbeHD9UOjjcvF5kLUHruww7VT9A= github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c/go.mod h1:qLb2Itmdcp7KPa5KZKvhE9U1q5bYSOmgeOckF/H2rQA= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -854,6 +868,8 @@ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg= +go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8= +go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= @@ -881,6 +897,8 @@ golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -953,6 +971,7 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1044,6 +1063,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1060,6 +1080,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/sdks/go/pkg/beam/io/mongodbio/coder.go b/sdks/go/pkg/beam/io/mongodbio/coder.go new file mode 100644 index 000000000000..c140f9a8a257 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/coder.go @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "fmt" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func init() { + beam.RegisterCoder( + reflect.TypeOf((*bson.M)(nil)).Elem(), + encodeBSONMap, + decodeBSONMap, + ) + beam.RegisterCoder( + reflect.TypeOf((*primitive.ObjectID)(nil)).Elem(), + encodeObjectID, + decodeObjectID, + ) +} + +func encodeBSONMap(m bson.M) ([]byte, error) { + bytes, err := bson.Marshal(m) + if err != nil { + return nil, fmt.Errorf("error encoding BSON: %w", err) + } + + return bytes, nil +} + +func decodeBSONMap(bytes []byte) (bson.M, error) { + var out bson.M + if err := bson.Unmarshal(bytes, &out); err != nil { + return nil, fmt.Errorf("error decoding BSON: %w", err) + } + + return out, nil +} + +func encodeObjectID(objectID primitive.ObjectID) []byte { + return objectID[:] +} + +func decodeObjectID(bytes []byte) primitive.ObjectID { + var out primitive.ObjectID + + copy(out[:], bytes[:]) + + return out +} diff --git a/sdks/go/pkg/beam/io/mongodbio/coder_test.go b/sdks/go/pkg/beam/io/mongodbio/coder_test.go new file mode 100644 index 000000000000..d5e3bb2974d1 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/coder_test.go @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func Test_encodeBSONMap(t *testing.T) { + tests := []struct { + name string + m bson.M + want []byte + wantErr bool + }{ + { + name: "Encode bson.M", + m: bson.M{"key": "val"}, + want: []byte{18, 0, 0, 0, 2, 107, 101, 121, 0, 4, 0, 0, 0, 118, 97, 108, 0, 0}, + wantErr: false, + }, + { + name: "Encode empty bson.M", + m: bson.M{}, + want: []byte{5, 0, 0, 0, 0}, + wantErr: false, + }, + { + name: "Encode nil bson.M", + m: bson.M(nil), + want: []byte{5, 0, 0, 0, 0}, + wantErr: false, + }, + { + name: "Error - invalid bson.M", + m: bson.M{"key": make(chan int)}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := encodeBSONMap(tt.m) + if (err != nil) != tt.wantErr { + t.Fatalf("encodeBSONMap() error = %v, wantErr %v", err, tt.wantErr) + } + + if !cmp.Equal(got, tt.want) { + t.Errorf("encodeBSONMap() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_decodeBSONMap(t *testing.T) { + tests := []struct { + name string + bytes []byte + want bson.M + wantErr bool + }{ + { + name: "Decode bson.M", + bytes: []byte{18, 0, 0, 0, 2, 107, 101, 121, 0, 4, 0, 0, 0, 118, 97, 108, 0, 0}, + want: bson.M{"key": "val"}, + wantErr: false, + }, + { + name: "Decode empty bson.M", + bytes: []byte{5, 0, 0, 0, 0}, + want: bson.M{}, + wantErr: false, + }, + { + name: "Error - invalid bson.M", + bytes: []byte{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := decodeBSONMap(tt.bytes) + if (err != nil) != tt.wantErr { + t.Fatalf("decodeBSONMap() error = %v, wantErr %v", err, tt.wantErr) + } + + if !cmp.Equal(got, tt.want) { + t.Errorf("decodeBSONMap() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_encodeObjectID(t *testing.T) { + tests := []struct { + name string + objectID primitive.ObjectID + want []byte + }{ + { + name: "Encode object ID", + objectID: objectIDFromHex(t, "5f1b2c3d4e5f60708090a0b0"), + want: []byte{95, 27, 44, 61, 78, 95, 96, 112, 128, 144, 160, 176}, + }, + { + name: "Encode nil object ID", + objectID: primitive.NilObjectID, + want: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := encodeObjectID(tt.objectID); !cmp.Equal(got, tt.want) { + t.Errorf("encodeObjectID() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_decodeObjectID(t *testing.T) { + tests := []struct { + name string + bytes []byte + want primitive.ObjectID + }{ + { + name: "Decode object ID", + bytes: []byte{95, 27, 44, 61, 78, 95, 96, 112, 128, 144, 160, 176}, + want: objectIDFromHex(t, "5f1b2c3d4e5f60708090a0b0"), + }, + { + name: "Decode nil object ID", + bytes: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + want: primitive.NilObjectID, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := decodeObjectID(tt.bytes); !cmp.Equal(got, tt.want) { + t.Errorf("decodeObjectID() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/common.go b/sdks/go/pkg/beam/io/mongodbio/common.go new file mode 100644 index 000000000000..9d6ffbeaa957 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/common.go @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mongodbio contains transforms for reading from and writing to MongoDB. +package mongodbio + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" +) + +const ( + bsonTag = "bson" +) + +type mongoDBFn struct { + URI string + Database string + Collection string + client *mongo.Client + collection *mongo.Collection +} + +func (fn *mongoDBFn) Setup(ctx context.Context) error { + client, err := newClient(ctx, fn.URI) + if err != nil { + return err + } + + fn.client = client + fn.collection = client.Database(fn.Database).Collection(fn.Collection) + + return nil +} + +func newClient(ctx context.Context, uri string) (*mongo.Client, error) { + opts := options.Client().ApplyURI(uri) + + client, err := mongo.Connect(ctx, opts) + if err != nil { + return nil, fmt.Errorf("error connecting to MongoDB: %w", err) + } + + if err := client.Ping(ctx, readpref.Primary()); err != nil { + return nil, fmt.Errorf("error pinging MongoDB: %w", err) + } + + return client, nil +} + +func (fn *mongoDBFn) Teardown(ctx context.Context) error { + if err := fn.client.Disconnect(ctx); err != nil { + return fmt.Errorf("error disconnecting from MongoDB: %w", err) + } + + return nil +} diff --git a/sdks/go/pkg/beam/io/mongodbio/example_test.go b/sdks/go/pkg/beam/io/mongodbio/example_test.go new file mode 100644 index 000000000000..3e77303843e5 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/example_test.go @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio_test + +import ( + "context" + "log" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func ExampleRead_default() { + type Event struct { + ID primitive.ObjectID `bson:"_id"` + Timestamp int64 `bson:"timestamp"` + EventType int32 `bson:"event_type"` + } + + beam.Init() + p, s := beam.NewPipelineWithRoot() + + col := mongodbio.Read( + s, + "mongodb://localhost:27017", + "demo", + "events", + reflect.TypeOf(Event{}), + ) + debug.Print(s, col) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleRead_options() { + type Event struct { + ID primitive.ObjectID `bson:"_id"` + Timestamp int64 `bson:"timestamp"` + EventType int32 `bson:"event_type"` + } + + beam.Init() + p, s := beam.NewPipelineWithRoot() + + col := mongodbio.Read( + s, + "mongodb://localhost:27017", + "demo", + "events", + reflect.TypeOf(Event{}), + mongodbio.WithReadBucketAuto(true), + mongodbio.WithReadBundleSize(32*1024*1024), + mongodbio.WithReadFilter(bson.M{"timestamp": bson.M{"$gt": 1640995200000}}), + ) + debug.Print(s, col) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleWrite_default() { + type Event struct { + ID primitive.ObjectID `bson:"_id"` + Timestamp int64 `bson:"timestamp"` + EventType int32 `bson:"event_type"` + } + + beam.Init() + p, s := beam.NewPipelineWithRoot() + + input := []Event{ + { + ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)), + Timestamp: 1640995200001, + EventType: 1, + }, + { + ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)), + Timestamp: 1640995200002, + EventType: 2, + }, + } + + col := beam.CreateList(s, input) + mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleWrite_options() { + type Event struct { + ID primitive.ObjectID `bson:"_id"` + Timestamp int64 `bson:"timestamp"` + EventType int32 `bson:"event_type"` + } + + beam.Init() + p, s := beam.NewPipelineWithRoot() + + input := []Event{ + { + ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)), + Timestamp: 1640995200001, + EventType: 1, + }, + { + ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)), + Timestamp: 1640995200002, + EventType: 2, + }, + } + + col := beam.CreateList(s, input) + mongodbio.Write( + s, + "mongodb://localhost:27017", + "demo", + "events", + col, + mongodbio.WithWriteBatchSize(500), + mongodbio.WithWriteOrdered(false), + ) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleWrite_generateID() { + type Event struct { + Timestamp int64 `bson:"timestamp"` + EventType int32 `bson:"event_type"` + } + + beam.Init() + p, s := beam.NewPipelineWithRoot() + + input := []Event{ + { + Timestamp: 1640995200001, + EventType: 1, + }, + { + Timestamp: 1640995200002, + EventType: 1, + }, + } + + col := beam.CreateList(s, input) + ids := mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col) + debug.Print(s, ids) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/helper_test.go b/sdks/go/pkg/beam/io/mongodbio/helper_test.go new file mode 100644 index 000000000000..c5a63c15adb4 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/helper_test.go @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "testing" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func objectIDFromHex(t *testing.T, hex string) primitive.ObjectID { + t.Helper() + + id, err := primitive.ObjectIDFromHex(hex) + if err != nil { + t.Fatalf("error parsing hex string to primitive.ObjectID: %v", err) + } + + return id +} diff --git a/sdks/go/pkg/beam/io/mongodbio/read.go b/sdks/go/pkg/beam/io/mongodbio/read.go new file mode 100644 index 000000000000..e09f2f1e1af8 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/read.go @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "context" + "fmt" + "math" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" +) + +const ( + defaultReadBundleSize = 64 * 1024 * 1024 + + minSplitVectorChunkSize = 1024 * 1024 + maxSplitVectorChunkSize = 1024 * 1024 * 1024 + + maxBucketCount = math.MaxInt32 +) + +func init() { + register.DoFn3x1[context.Context, []byte, func(bson.M), error](&bucketAutoFn{}) + register.DoFn3x1[context.Context, []byte, func(bson.M), error](&splitVectorFn{}) + register.Emitter1[bson.M]() + + register.DoFn3x1[context.Context, bson.M, func(beam.Y), error](&readFn{}) + register.Emitter1[beam.Y]() +} + +// Read reads a MongoDB collection and returns a PCollection for a given type T. T must be a +// struct with exported fields that should have a "bson" tag. By default, the transform uses the +// MongoDB internal splitVector command to split the collection into bundles. The transform can be +// configured to use the $bucketAuto aggregation instead to support reading from MongoDB Atlas +// where the splitVector command is not allowed. This is enabled by passing the ReadOptionFn +// WithReadBucketAuto(true). +// +// The Read transform has the required parameters: +// - s: the scope of the pipeline +// - uri: the MongoDB connection string +// - database: the MongoDB database to read from +// - collection: the MongoDB collection to read from +// - t: the type of the elements in the collection +// +// The Read transform takes a variadic number of ReadOptionFn which can set the ReadOption fields: +// - BucketAuto: whether to use the bucketAuto aggregation to split the collection into bundles. +// Defaults to false +// - Filter: a bson.M map that is used to filter the documents in the collection. Defaults to nil, +// which means no filter is applied +// - BundleSize: the size in bytes to bundle the documents into when reading. Defaults to +// 64 * 1024 * 1024 (64 MB) +func Read( + s beam.Scope, + uri string, + database string, + collection string, + t reflect.Type, + opts ...ReadOptionFn, +) beam.PCollection { + s = s.Scope("mongodbio.Read") + + option := &ReadOption{ + BundleSize: defaultReadBundleSize, + } + + for _, opt := range opts { + if err := opt(option); err != nil { + panic(fmt.Sprintf("mongodbio.Read: invalid option: %v", err)) + } + } + + imp := beam.Impulse(s) + + var bundled beam.PCollection + + if option.BucketAuto { + bundled = beam.ParDo(s, newBucketAutoFn(uri, database, collection, option), imp) + } else { + bundled = beam.ParDo(s, newSplitVectorFn(uri, database, collection, option), imp) + } + + return beam.ParDo( + s, + newReadFn(uri, database, collection, t, option), + bundled, + beam.TypeDefinition{Var: beam.YType, T: t}, + ) +} + +type bucketAutoFn struct { + mongoDBFn + BundleSize int64 +} + +func newBucketAutoFn( + uri string, + database string, + collection string, + option *ReadOption, +) *bucketAutoFn { + return &bucketAutoFn{ + mongoDBFn: mongoDBFn{ + URI: uri, + Database: database, + Collection: collection, + }, + BundleSize: option.BundleSize, + } +} + +func (fn *bucketAutoFn) ProcessElement( + ctx context.Context, + _ []byte, + emit func(bson.M), +) error { + collectionSize, err := fn.getCollectionSize(ctx) + if err != nil { + return err + } + + if collectionSize == 0 { + return nil + } + + bucketCount := calculateBucketCount(collectionSize, fn.BundleSize) + + buckets, err := fn.getBuckets(ctx, bucketCount) + if err != nil { + return err + } + + idFilters := idFiltersFromBuckets(buckets) + + for _, filter := range idFilters { + emit(filter) + } + + return nil +} + +type collStats struct { + Size int64 `bson:"size"` +} + +func (fn *bucketAutoFn) getCollectionSize(ctx context.Context) (int64, error) { + cmd := bson.M{"collStats": fn.Collection} + opts := options.RunCmd().SetReadPreference(readpref.Primary()) + + var stats collStats + if err := fn.collection.Database().RunCommand(ctx, cmd, opts).Decode(&stats); err != nil { + return 0, fmt.Errorf("error executing collStats command: %w", err) + } + + return stats.Size, nil +} + +func calculateBucketCount(collectionSize int64, bundleSize int64) int32 { + if bundleSize < 0 { + panic("monogdbio.calculateBucketCount: bundle size must be greater than 0") + } + + count := collectionSize / bundleSize + if collectionSize%bundleSize != 0 { + count++ + } + + if count > int64(maxBucketCount) { + count = maxBucketCount + } + + return int32(count) +} + +type bucket struct { + ID minMax `bson:"_id"` +} + +type minMax struct { + Min any `bson:"min"` + Max any `bson:"max"` +} + +func (fn *bucketAutoFn) getBuckets(ctx context.Context, count int32) ([]bucket, error) { + pipeline := mongo.Pipeline{bson.D{{ + Key: "$bucketAuto", + Value: bson.M{ + "groupBy": "$_id", + "buckets": count, + }, + }}} + + cursor, err := fn.collection.Aggregate(ctx, pipeline) + if err != nil { + return nil, fmt.Errorf("error executing bucketAuto aggregation: %w", err) + } + + var buckets []bucket + if err = cursor.All(ctx, &buckets); err != nil { + return nil, fmt.Errorf("error decoding buckets: %w", err) + } + + return buckets, nil +} + +func idFiltersFromBuckets(buckets []bucket) []bson.M { + idFilters := make([]bson.M, len(buckets)) + + for i := 0; i < len(buckets); i++ { + filter := bson.M{} + + if i != 0 { + filter["$gt"] = buckets[i].ID.Min + } + + if i != len(buckets)-1 { + filter["$lte"] = buckets[i].ID.Max + } + + if len(filter) == 0 { + idFilters[i] = filter + } else { + idFilters[i] = bson.M{"_id": filter} + } + } + + return idFilters +} + +type splitVectorFn struct { + mongoDBFn + BundleSize int64 +} + +func newSplitVectorFn( + uri string, + database string, + collection string, + option *ReadOption, +) *splitVectorFn { + return &splitVectorFn{ + mongoDBFn: mongoDBFn{ + URI: uri, + Database: database, + Collection: collection, + }, + BundleSize: option.BundleSize, + } +} + +func (fn *splitVectorFn) ProcessElement( + ctx context.Context, + _ []byte, + emit func(bson.M), +) error { + chunkSize := getChunkSize(fn.BundleSize) + + splitKeys, err := fn.getSplitKeys(ctx, chunkSize) + if err != nil { + return err + } + + idFilters := idFiltersFromSplits(splitKeys) + + for _, filter := range idFilters { + emit(filter) + } + + return nil +} + +func getChunkSize(bundleSize int64) int64 { + var chunkSize int64 + + if bundleSize < minSplitVectorChunkSize { + chunkSize = minSplitVectorChunkSize + } else if bundleSize > maxSplitVectorChunkSize { + chunkSize = maxSplitVectorChunkSize + } else { + chunkSize = bundleSize + } + + return chunkSize +} + +type splitVector struct { + SplitKeys []splitKey `bson:"splitKeys"` +} + +type splitKey struct { + ID any `bson:"_id"` +} + +func (fn *splitVectorFn) getSplitKeys(ctx context.Context, chunkSize int64) ([]splitKey, error) { + cmd := bson.D{ + {Key: "splitVector", Value: fmt.Sprintf("%s.%s", fn.Database, fn.Collection)}, + {Key: "keyPattern", Value: bson.D{{Key: "_id", Value: 1}}}, + {Key: "maxChunkSizeBytes", Value: chunkSize}, + } + + opts := options.RunCmd().SetReadPreference(readpref.Primary()) + + var vector splitVector + if err := fn.collection.Database().RunCommand(ctx, cmd, opts).Decode(&vector); err != nil { + return nil, fmt.Errorf("error executing splitVector command: %w", err) + } + + return vector.SplitKeys, nil +} + +func idFiltersFromSplits(splitKeys []splitKey) []bson.M { + idFilters := make([]bson.M, len(splitKeys)+1) + + for i := 0; i < len(splitKeys)+1; i++ { + filter := bson.M{} + + if i > 0 { + filter["$gt"] = splitKeys[i-1].ID + } + + if i < len(splitKeys) { + filter["$lte"] = splitKeys[i].ID + } + + if len(filter) == 0 { + idFilters[i] = filter + } else { + idFilters[i] = bson.M{"_id": filter} + } + } + + return idFilters +} + +type readFn struct { + mongoDBFn + Filter []byte + Type beam.EncodedType + projection bson.D + filter bson.M +} + +func newReadFn( + uri string, + database string, + collection string, + t reflect.Type, + option *ReadOption, +) *readFn { + filter, err := encodeBSONMap(option.Filter) + if err != nil { + panic(fmt.Sprintf("mongodbio.newReadFn: %v", err)) + } + + return &readFn{ + mongoDBFn: mongoDBFn{ + URI: uri, + Database: database, + Collection: collection, + }, + Filter: filter, + Type: beam.EncodedType{T: t}, + } +} + +func (fn *readFn) Setup(ctx context.Context) error { + if err := fn.mongoDBFn.Setup(ctx); err != nil { + return err + } + + filter, err := decodeBSONMap(fn.Filter) + if err != nil { + return err + } + + fn.filter = filter + fn.projection = inferProjection(fn.Type.T, bsonTag) + + return nil +} + +func inferProjection(t reflect.Type, tagKey string) bson.D { + names := structx.InferFieldNames(t, tagKey) + if len(names) == 0 { + panic("mongodbio.inferProjection: no names to infer projection from") + } + + projection := make(bson.D, len(names)) + + for i, name := range names { + projection[i] = bson.E{Key: name, Value: 1} + } + + return projection +} + +func (fn *readFn) ProcessElement( + ctx context.Context, + elem bson.M, + emit func(beam.Y), +) (err error) { + mergedFilter := mergeFilters(elem, fn.filter) + + cursor, err := fn.findDocuments(ctx, fn.projection, mergedFilter) + if err != nil { + return err + } + + defer func() { + closeErr := cursor.Close(ctx) + + if err != nil { + if closeErr != nil { + log.Errorf(ctx, "error closing cursor: %v", closeErr) + } + return + } + + err = closeErr + }() + + for cursor.Next(ctx) { + value, err := decodeDocument(cursor, fn.Type.T) + if err != nil { + return err + } + + emit(value) + } + + return cursor.Err() +} + +func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M { + if len(idFilter) == 0 { + return customFilter + } + + if len(customFilter) == 0 { + return idFilter + } + + return bson.M{ + "$and": []bson.M{idFilter, customFilter}, + } +} + +func (fn *readFn) findDocuments( + ctx context.Context, + projection bson.D, + filter bson.M, +) (*mongo.Cursor, error) { + opts := options.Find().SetProjection(projection).SetAllowDiskUse(true) + + cursor, err := fn.collection.Find(ctx, filter, opts) + if err != nil { + return nil, fmt.Errorf("error finding documents: %w", err) + } + + return cursor, nil +} + +func decodeDocument(cursor *mongo.Cursor, t reflect.Type) (any, error) { + out := reflect.New(t).Interface() + if err := cursor.Decode(out); err != nil { + return nil, fmt.Errorf("error decoding document: %w", err) + } + + value := reflect.ValueOf(out).Elem().Interface() + + return value, nil +} diff --git a/sdks/go/pkg/beam/io/mongodbio/read_option.go b/sdks/go/pkg/beam/io/mongodbio/read_option.go new file mode 100644 index 000000000000..b724c306a817 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/read_option.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "errors" + + "go.mongodb.org/mongo-driver/bson" +) + +// ReadOption represents options for reading from MongoDB. +type ReadOption struct { + BucketAuto bool + Filter bson.M + BundleSize int64 +} + +// ReadOptionFn is a function that configures a ReadOption. +type ReadOptionFn func(option *ReadOption) error + +// WithReadBucketAuto configures the ReadOption whether to use the bucketAuto aggregation stage. +func WithReadBucketAuto(bucketAuto bool) ReadOptionFn { + return func(o *ReadOption) error { + o.BucketAuto = bucketAuto + return nil + } +} + +// WithReadFilter configures the ReadOption to use the provided filter. +func WithReadFilter(filter bson.M) ReadOptionFn { + return func(o *ReadOption) error { + o.Filter = filter + return nil + } +} + +// WithReadBundleSize configures the ReadOption to use the provided bundle size in bytes. +func WithReadBundleSize(bundleSize int64) ReadOptionFn { + return func(o *ReadOption) error { + if bundleSize <= 0 { + return errors.New("bundle size must be greater than 0") + } + + o.BundleSize = bundleSize + return nil + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/read_option_test.go b/sdks/go/pkg/beam/io/mongodbio/read_option_test.go new file mode 100644 index 000000000000..d4fe4dfa63a3 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/read_option_test.go @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "go.mongodb.org/mongo-driver/bson" +) + +func TestWithReadBucketAuto(t *testing.T) { + tests := []struct { + name string + bucketAuto bool + want bool + wantErr bool + }{ + { + name: "Set bucket auto to true", + bucketAuto: true, + want: true, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var option ReadOption + + if err := WithReadBucketAuto(tt.bucketAuto)(&option); (err != nil) != tt.wantErr { + t.Fatalf("WithReadBucketAuto() error = %v, wantErr %v", err, tt.wantErr) + } + + if option.BucketAuto != tt.want { + t.Errorf("option.BucketAuto = %v, want %v", option.BucketAuto, tt.want) + } + }) + } +} + +func TestWithReadFilter(t *testing.T) { + tests := []struct { + name string + filter bson.M + want bson.M + wantErr bool + }{ + { + name: "Set filter to {\"key\": \"value\"}", + filter: bson.M{"key": "value"}, + want: bson.M{"key": "value"}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var option ReadOption + + if err := WithReadFilter(tt.filter)(&option); (err != nil) != tt.wantErr { + t.Fatalf("WithReadFilter() error = %v, wantErr %v", err, tt.wantErr) + } + + if !cmp.Equal(option.Filter, tt.want) { + t.Errorf("option.Filter = %v, want %v", option.Filter, tt.want) + } + }) + } +} + +func TestWithReadBundleSize(t *testing.T) { + tests := []struct { + name string + bundleSize int64 + want int64 + wantErr bool + }{ + { + name: "Set bundle size to 1024", + bundleSize: 1024, + want: 1024, + wantErr: false, + }, + { + name: "Error - bundle size must be greater than 0", + bundleSize: 0, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var option ReadOption + + if err := WithReadBundleSize(tt.bundleSize)(&option); (err != nil) != tt.wantErr { + t.Fatalf("WithReadBundleSize() error = %v, wantErr %v", err, tt.wantErr) + } + + if option.BundleSize != tt.want { + t.Errorf("option.BundleSize = %v, want %v", option.BundleSize, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/read_test.go b/sdks/go/pkg/beam/io/mongodbio/read_test.go new file mode 100644 index 000000000000..5899457d5a86 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/read_test.go @@ -0,0 +1,393 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "math" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "go.mongodb.org/mongo-driver/bson" +) + +func Test_calculateBucketCount(t *testing.T) { + tests := []struct { + name string + collectionSize int64 + bundleSize int64 + want int32 + }{ + { + name: "Return ceiling of collection size / bundle size", + collectionSize: 3 * 1024 * 1024, + bundleSize: 2 * 1024 * 1024, + want: 2, + }, + { + name: "Return max int32 when calculated count is greater than max int32", + collectionSize: 1024 * 1024 * 1024 * 1024, + bundleSize: 1, + want: math.MaxInt32, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := calculateBucketCount(tt.collectionSize, tt.bundleSize); got != tt.want { + t.Errorf("calculateBucketCount() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_calculateBucketCountPanic(t *testing.T) { + t.Run("Panic when bundleSize is not greater than 0", func(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("calculateBucketCount() does not panic") + } + }() + + calculateBucketCount(1024, 0) + }) +} + +func Test_idFiltersFromBuckets(t *testing.T) { + tests := []struct { + name string + buckets []bucket + want []bson.M + }{ + { + name: "Create one $lte filter for start range, one $gt filter for end range, and filters with both " + + "$lte and $gt for ranges in between when there are three or more bucket elements", + buckets: []bucket{ + { + ID: minMax{ + Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"), + Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + { + ID: minMax{ + Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + { + ID: minMax{ + Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5384"), + }, + }, + }, + want: []bson.M{ + { + "_id": bson.M{ + "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + { + "_id": bson.M{ + "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + { + "_id": bson.M{ + "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + }, + }, + { + name: "Create one $lte filter for start range and one $gt filter for end range when there are two " + + "bucket elements", + buckets: []bucket{ + { + ID: minMax{ + Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"), + Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + { + ID: minMax{ + Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + }, + want: []bson.M{ + { + "_id": bson.M{ + "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + { + "_id": bson.M{ + "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + }, + }, + { + name: "Create an empty filter when there is one bucket element", + buckets: []bucket{ + { + ID: minMax{ + Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"), + Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + }, + want: []bson.M{{}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := idFiltersFromBuckets(tt.buckets); !cmp.Equal(got, tt.want) { + t.Errorf("idFiltersFromBuckets() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getChunkSize(t *testing.T) { + tests := []struct { + name string + bundleSize int64 + want int64 + }{ + { + name: "Return 1 MB if bundle size is less than 1 MB", + bundleSize: 1024, + want: 1024 * 1024, + }, + { + name: "Return 1 GB if bundle size is greater than 1 GB", + bundleSize: 2 * 1024 * 1024 * 1024, + want: 1024 * 1024 * 1024, + }, + { + name: "Return bundle size if bundle size is between 1 MB and 1 GB", + bundleSize: 4 * 1024 * 1024, + want: 4 * 1024 * 1024, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getChunkSize(tt.bundleSize); got != tt.want { + t.Errorf("getChunkSize() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_idFiltersFromSplits(t *testing.T) { + tests := []struct { + name string + splitKeys []splitKey + want []bson.M + }{ + { + name: "Create one $lte filter for start range, one $gt filter for end range, and filters with both " + + "$lte and $gt for ranges in between when there are two or more splitKey elements", + splitKeys: []splitKey{ + { + ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + { + ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + want: []bson.M{ + { + "_id": bson.M{ + "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + { + "_id": bson.M{ + "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + { + "_id": bson.M{ + "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"), + }, + }, + }, + }, + { + name: "Create one $lte filter for start range and one $gt filter for end range when there is one " + + "splitKey element", + splitKeys: []splitKey{ + { + ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + want: []bson.M{ + { + "_id": bson.M{ + "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + { + "_id": bson.M{ + "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"), + }, + }, + }, + }, + { + name: "Create an empty filter when there are no splitKey elements", + splitKeys: nil, + want: []bson.M{{}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := idFiltersFromSplits(tt.splitKeys); !cmp.Equal(got, tt.want) { + t.Errorf("idFiltersFromSplits() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_inferProjection(t *testing.T) { + type doc struct { + Field1 string `bson:"field1"` + Field2 string `bson:"field2"` + Field3 string `bson:"-"` + } + + tests := []struct { + name string + t reflect.Type + tagKey string + want bson.D + }{ + { + name: "Infer projection from struct bson tags", + t: reflect.TypeOf(doc{}), + tagKey: "bson", + want: bson.D{ + {Key: "field1", Value: 1}, + {Key: "field2", Value: 1}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := inferProjection(tt.t, tt.tagKey); !cmp.Equal(got, tt.want) { + t.Errorf("inferProjection() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_inferProjectionPanic(t *testing.T) { + type doc struct{} + + t.Run("Panic when type has no fields to infer", func(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("inferProjection() does not panic") + } + }() + + inferProjection(reflect.TypeOf(doc{}), "bson") + }) +} + +func Test_mergeFilters(t *testing.T) { + tests := []struct { + name string + idFilter bson.M + filter bson.M + want bson.M + }{ + { + name: "Returned merged ID filter and custom filter in an $and filter", + idFilter: bson.M{ + "_id": bson.M{ + "$gte": 10, + }, + }, + filter: bson.M{ + "key": bson.M{ + "$ne": "value", + }, + }, + want: bson.M{ + "$and": []bson.M{ + { + "_id": bson.M{ + "$gte": 10, + }, + }, + { + "key": bson.M{ + "$ne": "value", + }, + }, + }, + }, + }, + { + name: "Return only ID filter when custom filter is empty", + idFilter: bson.M{ + "_id": bson.M{ + "$gte": 10, + }, + }, + filter: bson.M{}, + want: bson.M{ + "_id": bson.M{ + "$gte": 10, + }, + }, + }, + { + name: "Return only custom filter when ID filter is empty", + idFilter: bson.M{}, + filter: bson.M{ + "key": bson.M{ + "$ne": "value", + }, + }, + want: bson.M{ + "key": bson.M{ + "$ne": "value", + }, + }, + }, + { + name: "Return empty filter when both ID filter and custom filter are empty", + idFilter: bson.M{}, + filter: bson.M{}, + want: bson.M{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := mergeFilters(tt.idFilter, tt.filter); !cmp.Equal(got, tt.want) { + t.Errorf("mergeFilters() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/write.go b/sdks/go/pkg/beam/io/mongodbio/write.go new file mode 100644 index 000000000000..2332e3ba9813 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/write.go @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "context" + "fmt" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +const ( + defaultWriteBatchSize = 1000 + defaultWriteOrdered = true +) + +func init() { + register.Function1x2(createIDFn) + register.Emitter2[primitive.ObjectID, beam.Y]() + + register.DoFn3x0[context.Context, beam.Y, func(beam.X, beam.Y)]( + &extractIDFn{}, + ) + register.Emitter2[beam.X, beam.Y]() + + register.DoFn4x1[context.Context, beam.X, beam.Y, func(beam.X), error]( + &writeFn{}, + ) + register.Emitter1[primitive.ObjectID]() +} + +// Write writes a PCollection of a type T to MongoDB. T must be a struct with exported fields +// that should have a "bson" tag. If the struct has a field with the bson tag "_id", the value of +// that field will be used as the id of the document. Otherwise, a new id field of type +// primitive.ObjectID will be generated for each document. Write returns a PCollection of the +// inserted id values with type K. +// +// The Write transform has the required parameters: +// - s: the scope of the pipeline +// - uri: the MongoDB connection string +// - database: the MongoDB database to write to +// - collection: the MongoDB collection to write to +// - col: the PCollection to write to MongoDB +// +// The Write transform takes a variadic number of WriteOptionFn which can set the WriteOption +// fields: +// - BatchSize: the number of documents to write in a single batch. Defaults to 1000 +// - Ordered: whether to execute the writes in order. Defaults to true +func Write( + s beam.Scope, + uri string, + database string, + collection string, + col beam.PCollection, + opts ...WriteOptionFn, +) beam.PCollection { + s = s.Scope("mongodbio.Write") + + option := &WriteOption{ + BatchSize: defaultWriteBatchSize, + Ordered: defaultWriteOrdered, + } + + for _, opt := range opts { + if err := opt(option); err != nil { + panic(fmt.Sprintf("mongodbio.Write: invalid option: %v", err)) + } + } + + t := col.Type().Type() + idIndex := structx.FieldIndexByTag(t, bsonTag, "_id") + + var keyed beam.PCollection + + if idIndex == -1 { + pre := beam.ParDo(s, createIDFn, col) + keyed = beam.Reshuffle(s, pre) + } else { + keyed = beam.ParDo( + s, + newExtractIDFn(idIndex), + col, + beam.TypeDefinition{Var: beam.XType, T: t.Field(idIndex).Type}, + ) + } + + return beam.ParDo( + s, + newWriteFn(uri, database, collection, option), + keyed, + ) +} + +func createIDFn(elem beam.Y) (primitive.ObjectID, beam.Y) { + id := primitive.NewObjectID() + return id, elem +} + +type extractIDFn struct { + IDIndex int +} + +func newExtractIDFn(idIndex int) *extractIDFn { + return &extractIDFn{ + IDIndex: idIndex, + } +} + +func (fn *extractIDFn) ProcessElement( + _ context.Context, + elem beam.Y, + emit func(beam.X, beam.Y), +) { + id := reflect.ValueOf(elem).Field(fn.IDIndex).Interface() + emit(id, elem) +} + +type writeFn struct { + mongoDBFn + BatchSize int64 + Ordered bool + models []mongo.WriteModel +} + +func newWriteFn( + uri string, + database string, + collection string, + option *WriteOption, +) *writeFn { + return &writeFn{ + mongoDBFn: mongoDBFn{ + URI: uri, + Database: database, + Collection: collection, + }, + BatchSize: option.BatchSize, + Ordered: option.Ordered, + } +} + +func (fn *writeFn) ProcessElement( + ctx context.Context, + key beam.X, + value beam.Y, + emit func(beam.X), +) error { + model := mongo.NewReplaceOneModel(). + SetFilter(bson.M{"_id": key}). + SetUpsert(true). + SetReplacement(value) + + fn.models = append(fn.models, model) + + if len(fn.models) >= int(fn.BatchSize) { + if err := fn.flush(ctx); err != nil { + return err + } + } + + emit(key) + + return nil +} + +func (fn *writeFn) FinishBundle(ctx context.Context, _ func(beam.X)) error { + if len(fn.models) > 0 { + return fn.flush(ctx) + } + + return nil +} + +func (fn *writeFn) flush(ctx context.Context) error { + opts := options.BulkWrite().SetOrdered(fn.Ordered) + + if _, err := fn.collection.BulkWrite(ctx, fn.models, opts); err != nil { + return fmt.Errorf("error bulk writing to MongoDB: %w", err) + } + + fn.models = nil + + return nil +} diff --git a/sdks/go/pkg/beam/io/mongodbio/write_option.go b/sdks/go/pkg/beam/io/mongodbio/write_option.go new file mode 100644 index 000000000000..8d54b6052b82 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/write_option.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "errors" +) + +// WriteOption represents options for writing to MongoDB. +type WriteOption struct { + BatchSize int64 + Ordered bool +} + +// WriteOptionFn is a function that configures a WriteOption. +type WriteOptionFn func(option *WriteOption) error + +// WithWriteBatchSize configures the WriteOption to use the provided batch size when writing +// documents. +func WithWriteBatchSize(batchSize int64) WriteOptionFn { + return func(o *WriteOption) error { + if batchSize <= 0 { + return errors.New("batch size must be greater than 0") + } + + o.BatchSize = batchSize + return nil + } +} + +// WithWriteOrdered configures the WriteOption whether to apply an ordered bulk write. +func WithWriteOrdered(ordered bool) WriteOptionFn { + return func(o *WriteOption) error { + o.Ordered = ordered + return nil + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/write_option_test.go b/sdks/go/pkg/beam/io/mongodbio/write_option_test.go new file mode 100644 index 000000000000..1e4e66bfbc41 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/write_option_test.go @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "testing" +) + +func TestWithWriteBatchSize(t *testing.T) { + tests := []struct { + name string + batchSize int64 + want int64 + wantErr bool + }{ + { + name: "Set batch size to 500", + batchSize: 500, + want: 500, + wantErr: false, + }, + { + name: "Error - batch size must be greater than 0", + batchSize: 0, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var option WriteOption + + if err := WithWriteBatchSize(tt.batchSize)(&option); (err != nil) != tt.wantErr { + t.Fatalf("WithWriteBatchSize() error = %v, wantErr %v", err, tt.wantErr) + } + + if option.BatchSize != tt.want { + t.Errorf("option.BatchSize = %v, want %v", option.BatchSize, tt.want) + } + }) + } +} + +func TestWithWriteOrdered(t *testing.T) { + tests := []struct { + name string + ordered bool + want bool + wantErr bool + }{ + { + name: "Set ordered to true", + ordered: true, + want: true, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var option WriteOption + + if err := WithWriteOrdered(tt.ordered)(&option); (err != nil) != tt.wantErr { + t.Fatalf("WithWriteOrdered() err = %v, wantErr %v", err, tt.wantErr) + } + + if option.Ordered != tt.want { + t.Errorf("option.Ordered = %v, want %v", option.Ordered, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/mongodbio/write_test.go b/sdks/go/pkg/beam/io/mongodbio/write_test.go new file mode 100644 index 000000000000..6608df8362b6 --- /dev/null +++ b/sdks/go/pkg/beam/io/mongodbio/write_test.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/google/go-cmp/cmp" +) + +func Test_createIDFn(t *testing.T) { + type doc struct { + Field1 int32 `bson:"field1"` + } + + tests := []struct { + name string + elem beam.Y + want beam.Y + }{ + { + name: "Create key-value pair of a new object ID and element", + elem: doc{Field1: 1}, + want: doc{Field1: 1}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotKey, gotValue := createIDFn(tt.elem) + + if gotKey.IsZero() { + t.Error("createIDFn() gotKey is zero") + } + + if !cmp.Equal(gotValue, tt.want) { + t.Errorf("createIDFn() gotValue = %v, want %v", gotValue, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/util/structx/struct.go b/sdks/go/pkg/beam/util/structx/struct.go index 2659191d38d4..aec8a63652d9 100644 --- a/sdks/go/pkg/beam/util/structx/struct.go +++ b/sdks/go/pkg/beam/util/structx/struct.go @@ -56,3 +56,22 @@ func InferFieldNames(t reflect.Type, key string) []string { return names } + +// FieldIndexByTag returns the index of the field with the given tag key and value. Returns -1 if +// the field is not found. Panics if the type's kind is not a struct. +func FieldIndexByTag(t reflect.Type, key string, value string) int { + if t.Kind() != reflect.Struct { + panic(fmt.Sprintf("structx: FieldIndexByTag of non-struct type %s", t)) + } + + for i := 0; i < t.NumField(); i++ { + values := t.Field(i).Tag.Get(key) + name := strings.Split(values, ",")[0] + + if name == value { + return i + } + } + + return -1 +} diff --git a/sdks/go/pkg/beam/util/structx/struct_test.go b/sdks/go/pkg/beam/util/structx/struct_test.go index 6aac5869604e..ab6c7278f628 100644 --- a/sdks/go/pkg/beam/util/structx/struct_test.go +++ b/sdks/go/pkg/beam/util/structx/struct_test.go @@ -142,3 +142,63 @@ func TestInferFieldNamesPanic(t *testing.T) { InferFieldNames(reflect.TypeOf(""), "key") }) } + +func TestFieldIndexByTag(t *testing.T) { + tests := []struct { + name string + t reflect.Type + key string + value string + want int + }{ + { + name: "Return index of field with matching tag key and value", + t: reflect.TypeOf(struct { + Field1 string `key:"field1"` + Field2 string `key:"field2"` + }{}), + key: "key", + value: "field2", + want: 1, + }, + { + name: "Return -1 for non-existent tag key", + t: reflect.TypeOf(struct { + Field1 string `key:"field1"` + Field2 string `key:"field2"` + }{}), + key: "other", + value: "field1", + want: -1, + }, + { + name: "Return -1 for non-existent tag value", + t: reflect.TypeOf(struct { + Field1 string `key:"field1"` + Field2 string `key:"field2"` + }{}), + key: "key", + value: "field3", + want: -1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := FieldIndexByTag(tt.t, tt.key, tt.value); got != tt.want { + t.Errorf("FieldIndexByTag() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFieldIndexByTagPanic(t *testing.T) { + t.Run("Panic for non-struct type", func(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("FieldIndexByTag() does not panic") + } + }() + + FieldIndexByTag(reflect.TypeOf(""), "key", "field1") + }) +} diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 2253df597bb4..c13d8b166923 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -223,6 +223,7 @@ var dataflowFilters = []string{ "TestJDBCIO_BasicReadWrite", "TestJDBCIO_PostgresReadWrite", "TestDebeziumIO_BasicRead", + "TestMongoDBIO.*", // TODO(BEAM-11576): TestFlattenDup failing on this runner. "TestFlattenDup", // The Dataflow runner does not support the TestStream primitive diff --git a/sdks/go/test/integration/internal/containers/containers.go b/sdks/go/test/integration/internal/containers/containers.go new file mode 100644 index 000000000000..5987897fd412 --- /dev/null +++ b/sdks/go/test/integration/internal/containers/containers.go @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package containers contains utilities for running test containers in integration tests. +package containers + +import ( + "context" + "testing" + + "github.com/docker/go-connections/nat" + "github.com/testcontainers/testcontainers-go" +) + +type ContainerOptionFn func(*testcontainers.ContainerRequest) + +func WithPorts(ports []string) ContainerOptionFn { + return func(option *testcontainers.ContainerRequest) { + option.ExposedPorts = ports + } +} + +func NewContainer( + ctx context.Context, + t *testing.T, + image string, + opts ...ContainerOptionFn, +) testcontainers.Container { + t.Helper() + + request := testcontainers.ContainerRequest{Image: image} + + for _, opt := range opts { + opt(&request) + } + + genericRequest := testcontainers.GenericContainerRequest{ + ContainerRequest: request, + Started: true, + } + + container, err := testcontainers.GenericContainer(ctx, genericRequest) + if err != nil { + t.Fatalf("error creating container: %v", err) + } + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("error terminating container: %v", err) + } + }) + + return container +} + +func Port( + ctx context.Context, + t *testing.T, + container testcontainers.Container, + port nat.Port, +) string { + t.Helper() + + mappedPort, err := container.MappedPort(ctx, port) + if err != nil { + t.Fatalf("error getting mapped port: %v", err) + } + + return mappedPort.Port() +} diff --git a/sdks/go/test/integration/io/mongodbio/helper_test.go b/sdks/go/test/integration/io/mongodbio/helper_test.go new file mode 100644 index 000000000000..751f9e56787e --- /dev/null +++ b/sdks/go/test/integration/io/mongodbio/helper_test.go @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "context" + "testing" + + "github.com/apache/beam/sdks/v2/go/test/integration/internal/containers" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" +) + +const ( + mongoImage = "mongo:6.0.3" + mongoPort = "27017" +) + +func setUpTestContainer(ctx context.Context, t *testing.T) string { + t.Helper() + + container := containers.NewContainer( + ctx, + t, + mongoImage, + containers.WithPorts([]string{mongoPort + "/tcp"}), + ) + + return containers.Port(ctx, t, container, mongoPort) +} + +func objectIDFromHex(t *testing.T, hex string) primitive.ObjectID { + t.Helper() + + id, err := primitive.ObjectIDFromHex(hex) + if err != nil { + t.Fatalf("error parsing hex string to primitive.ObjectID: %v", err) + } + + return id +} + +func newClient(ctx context.Context, t *testing.T, uri string) *mongo.Client { + t.Helper() + + opts := options.Client().ApplyURI(uri) + + client, err := mongo.Connect(ctx, opts) + if err != nil { + t.Fatalf("error connecting to MongoDB: %v", err) + } + + t.Cleanup(func() { + if err := client.Disconnect(ctx); err != nil { + t.Fatalf("error disconnecting from MongoDB: %v", err) + } + }) + + if err := client.Ping(ctx, readpref.Primary()); err != nil { + t.Fatalf("error pinging MongoDB: %v", err) + } + + return client +} + +func dropCollection(ctx context.Context, t *testing.T, collection *mongo.Collection) { + t.Helper() + + if err := collection.Drop(ctx); err != nil { + t.Fatalf("error dropping collection: %v", err) + } +} + +func readDocuments( + ctx context.Context, + t *testing.T, + collection *mongo.Collection, +) []bson.M { + t.Helper() + + cursor, err := collection.Find(ctx, bson.M{}) + if err != nil { + t.Fatalf("error finding documents: %v", err) + } + + var documents []bson.M + if err = cursor.All(ctx, &documents); err != nil { + t.Fatalf("error decoding documents: %v", err) + } + + return documents +} + +func writeDocuments( + ctx context.Context, + t *testing.T, + collection *mongo.Collection, + documents []any, +) { + t.Helper() + + if _, err := collection.InsertMany(ctx, documents); err != nil { + t.Fatalf("error inserting documents: %v", err) + } +} diff --git a/sdks/go/test/integration/io/mongodbio/mongodbio_test.go b/sdks/go/test/integration/io/mongodbio/mongodbio_test.go new file mode 100644 index 000000000000..b8885e7c728d --- /dev/null +++ b/sdks/go/test/integration/io/mongodbio/mongodbio_test.go @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongodbio + +import ( + "context" + "flag" + "fmt" + "reflect" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/test/integration" + "github.com/google/go-cmp/cmp" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*docWithObjectID)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*docWithStringID)(nil)).Elem()) +} + +type docWithObjectID struct { + ID primitive.ObjectID `bson:"_id"` + Field1 int32 `bson:"field1"` +} + +type docWithStringID struct { + ID string `bson:"_id"` + Field1 int32 `bson:"field1"` +} + +func TestMongoDBIO_Read(t *testing.T) { + integration.CheckFilters(t) + + ctx := context.Background() + port := setUpTestContainer(ctx, t) + uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port) + + tests := []struct { + name string + input []any + t reflect.Type + options []mongodbio.ReadOptionFn + want []any + }{ + { + name: "Read documents from MongoDB with id of type primitive.ObjectID", + input: []any{ + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)}, + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)}, + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)}, + }, + t: reflect.TypeOf(docWithObjectID{}), + want: []any{ + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2}, + }, + }, + { + name: "Read documents from MongoDB with id of type string", + input: []any{ + bson.M{"_id": "id01", "field1": int32(0)}, + bson.M{"_id": "id02", "field1": int32(1)}, + bson.M{"_id": "id03", "field1": int32(2)}, + }, + t: reflect.TypeOf(docWithStringID{}), + want: []any{ + docWithStringID{ID: "id01", Field1: 0}, + docWithStringID{ID: "id02", Field1: 1}, + docWithStringID{ID: "id03", Field1: 2}, + }, + }, + { + name: "Read documents from MongoDB where filter matches", + input: []any{ + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)}, + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)}, + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)}, + }, + t: reflect.TypeOf(docWithObjectID{}), + options: []mongodbio.ReadOptionFn{ + mongodbio.WithReadFilter(bson.M{"field1": bson.M{"$gt": 0}}), + }, + want: []any{ + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2}, + }, + }, + { + name: "Read documents from MongoDB with bucketAuto aggregation", + input: []any{ + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)}, + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)}, + bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)}, + }, + t: reflect.TypeOf(docWithObjectID{}), + options: []mongodbio.ReadOptionFn{ + mongodbio.WithReadBucketAuto(true), + }, + want: []any{ + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + database := "db" + collection := "coll" + + client := newClient(ctx, t, uri) + mongoCollection := client.Database(database).Collection(collection) + + t.Cleanup(func() { + dropCollection(ctx, t, mongoCollection) + }) + + writeDocuments(ctx, t, mongoCollection, tt.input) + + p, s := beam.NewPipelineWithRoot() + + got := mongodbio.Read(s, uri, database, collection, tt.t, tt.options...) + + passert.Equals(s, got, tt.want...) + ptest.RunAndValidate(t, p) + }) + } +} + +func TestMongoDBIO_Write(t *testing.T) { + integration.CheckFilters(t) + + ctx := context.Background() + port := setUpTestContainer(ctx, t) + uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port) + + tests := []struct { + name string + input []any + options []mongodbio.WriteOptionFn + wantIDs []any + wantDocs []bson.M + }{ + { + name: "Write documents to MongoDB with id of type primitive.ObjectID", + input: []any{ + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1}, + docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2}, + }, + wantIDs: []any{ + objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), + objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), + objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), + }, + wantDocs: []bson.M{ + {"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)}, + {"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)}, + {"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)}, + }, + }, + { + name: "Write documents to MongoDB with id of type string", + input: []any{ + docWithStringID{ID: "id01", Field1: 0}, + docWithStringID{ID: "id02", Field1: 1}, + docWithStringID{ID: "id03", Field1: 2}, + }, + wantIDs: []any{ + "id01", + "id02", + "id03", + }, + wantDocs: []bson.M{ + {"_id": "id01", "field1": int32(0)}, + {"_id": "id02", "field1": int32(1)}, + {"_id": "id03", "field1": int32(2)}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + database := "db" + collection := "coll" + + client := newClient(ctx, t, uri) + mongoCollection := client.Database(database).Collection(collection) + + t.Cleanup(func() { + dropCollection(ctx, t, mongoCollection) + }) + + p, s := beam.NewPipelineWithRoot() + + col := beam.CreateList(s, tt.input) + gotIDs := mongodbio.Write(s, uri, database, collection, col, tt.options...) + + passert.Equals(s, gotIDs, tt.wantIDs...) + ptest.RunAndValidate(t, p) + + if gotDocs := readDocuments(ctx, t, mongoCollection); !cmp.Equal(gotDocs, tt.wantDocs) { + t.Errorf("readDocuments() = %v, want %v", gotDocs, tt.wantDocs) + } + }) + } +} + +func TestMain(m *testing.M) { + flag.Parse() + beam.Init() + + ptest.MainRet(m) +}