Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add mongodb connector #55

Merged
merged 47 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7542ecb
feat: add mongodb source
wenfengwang Aug 19, 2022
403507e
feat: add adapter of mongodb
wenfengwang Aug 22, 2022
668ca1a
feat: support update & delete event
wenfengwang Aug 23, 2022
1b16b51
update README and add more tests
wenfengwang Aug 23, 2022
f988fc9
tiny update
wenfengwang Aug 23, 2022
2312b49
update mongodb connector readme
wenfengwang Aug 23, 2022
1f5d610
upadte readme
wenfengwang Aug 23, 2022
85abe45
upadte readme
wenfengwang Aug 23, 2022
32a26b3
update
wenfengwang Aug 23, 2022
4c43025
update
wenfengwang Aug 23, 2022
acd3281
add specification doc
wenfengwang Aug 25, 2022
59f8a8d
update
wenfengwang Aug 25, 2022
a690900
introduce proto
wenfengwang Aug 26, 2022
de9f658
add mongodb-sink
wenfengwang Aug 26, 2022
ccff23c
update
wenfengwang Aug 26, 2022
9345891
update mongodb sink
wenfengwang Aug 28, 2022
01e22ec
update mongo-sink
wenfengwang Aug 28, 2022
5ab494d
update
wenfengwang Aug 28, 2022
80b47b1
update
wenfengwang Aug 28, 2022
a11f9a2
update
wenfengwang Aug 28, 2022
883f625
update
wenfengwang Aug 28, 2022
b476e6c
update
wenfengwang Aug 28, 2022
47b400c
update
wenfengwang Aug 28, 2022
dab2c90
update
wenfengwang Aug 28, 2022
c3df709
update
wenfengwang Aug 28, 2022
6df269a
update
wenfengwang Aug 29, 2022
0e0fe80
update
wenfengwang Aug 29, 2022
46bb56f
update
wenfengwang Aug 29, 2022
631ddf3
update
wenfengwang Aug 29, 2022
150e8fe
update k8s file of mongodb sink
wenfengwang Aug 29, 2022
f460d7e
update secret
wenfengwang Aug 29, 2022
a471556
update readme
wenfengwang Aug 29, 2022
5be7f00
update readme
wenfengwang Aug 29, 2022
409ce38
update readme
wenfengwang Aug 29, 2022
683ec60
refactor mongo-sink
wenfengwang Aug 29, 2022
e947f93
dockerfile
wenfengwang Aug 29, 2022
b3bd832
update
wenfengwang Aug 29, 2022
6e8fdd5
update
wenfengwang Aug 29, 2022
a327102
update
wenfengwang Aug 29, 2022
4efbec3
update
wenfengwang Aug 29, 2022
18f68ad
add java docker file
wenfengwang Aug 29, 2022
f18e5de
add
wenfengwang Aug 29, 2022
d62fd3e
restructure
wenfengwang Aug 29, 2022
fe8708b
restructure
wenfengwang Aug 29, 2022
8c5622b
remove debezium file
wenfengwang Aug 29, 2022
b6bec9b
update readme
wenfengwang Aug 30, 2022
2d700ae
update readme
wenfengwang Aug 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,12 @@ testbin/*
target/
# dep
cdk-go

# jetbrains
.idea

# go test resources
**/test/resources/*

# java proto file
**/src/proto
9 changes: 9 additions & 0 deletions connectors/database/mongodb-sink/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
WORKDIR=$(shell pwd)
PROTO_ROOT=${WORKDIR}/../../schemas

generate-pb:
mkdir -p proto
protoc -I=${PROTO_ROOT} \
-I=${PROTO_ROOT}/thirds \
--go_out=paths=source_relative:proto \
${PROTO_ROOT}/database/mongodb.proto
240 changes: 240 additions & 0 deletions connectors/database/mongodb-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# MongoDB Sink Connector

## Introduction

This connector for interaction with MongoDB, which support `insert/update/delete` operation now.

## Quickstart

### create config file

```shell
cat << EOF > config.yml
# change this hosts to your mongodb's address
db_hosts:
- 44.242.140.28:27017
port: 8080
EOF
```

### run mongodb-sink

it assumes that the mongodb instance doesn't need authentication. For how to use authentication please see
[secret](#secret) section.

```shell
docker run -d \
-p 8080:8080 \
-v ${PWD}:/vance/config \
--name mongodb-sink \
--rm public.ecr.aws/vanus/connector/mongodb-sink:dev
```

### insert document to mongodb
About more details for how to understand, please see [Schema](#examples) and [Examples](#examples) section.

```shell
curl --location --request POST 'http://127.0.0.1:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "62ff305f779a73966deb3877",
"source": "mongodb.replicaset-01.test.source",
"type": "test.source",
"datacontenttype": "application/json",
"time": "2022-08-26T18:42:16Z",
"data": {
"op": "INSERT",
"insert": {
"document": {
"a": 1234
}
}
},
"vancemongosinkdatabase":"test",
"vancemongosinkcollection": "sink",
}'
```

### clean resource

```shell
docker stop mongodb-sink
```

## Configuration

the configuration of mongodb-sink based on [Connection String URI Format](https://www.mongodb.com/docs/v6.0/reference/connection-string/)

### config

| Name | Required | Default | Description |
|:---------|:--------:|:-------:|-------------------------------------------------|
| db_hosts | **YES** | - | the mongodb cluster hosts |
| port | **YES** | - | the port the mongodb-sink for listening request |

- example

create a `config.yml` that its content like follow, and mount it to container inside.

```yaml
db_hosts:
- 127.0.0.1:27017
port: 8080
```

```shell
docker run -d \
-p 8080:8080 \
-v ${PWD}:/vance/config \
--name mongodb-sink \
--rm public.ecr.aws/vanus/connector/mongodb-sink:v0.2.0-alpha
```

### secret

| Name | Required | Default | Description |
|:-----------|:--------:|:-------:|----------------------------------|
| username | **YES** | - | the username to connect mongodb |
| password | **YES** | - | the password to connect mongodb |
| authSource | NO | admin | the authSource to authentication |

The `user` and `password` are required only when MongoDB is configured to use authentication. This `authSource` required
only when MongoDB is configured to use authentication with another authentication database than admin.

- example: create a `secert.yml` that its content like follow, and mount it to container inside.

```yaml
username: "test"
password: "123456"
authSource: "admin"
```

```shell
docker run -d \
-p 8080:8080 \
-v ${PWD}:/vance/config \
--env CONNECTOR_SECRET_ENABLE=true
--name mongodb-sink \
--rm public.ecr.aws/vanus/connector/mongodb-sink:v0.2.0-alpha
```

## Deploy

### using k8s(recommended)

```shell
kubectl apply -f https://raw.githubusercontent.com/linkall-labs/vance/main/connectors/database/mongodb-sink/mongodb-sink.yml
```

### using vance Operator

Coming soon, it depends on Vance Operator, the experience of it will be like follow:

```shell
kubectl apply -f https://raw.githubusercontent.com/linkall-labs/vance/main/connectors/database/mongodb-sink/crd.yml
```

or

```shell
vsctl connectors create mongodb --source --config /xxx/config.josn --secret /xxx/secret.json
```

## Schema

The input events' schema is a [CloudEvent](https://github.com/cloudevents/spec) format, and each field are explained
follows.

the original `ChangeEvent` can be found in [official document](https://www.mongodb.com/docs/manual/reference/change-events/)

| Field | Required | Description |
|--------------------------|:--------:|-----------------------------------------------------------------------------------------------------------------------------------------------|
| id | **YES** | the bson`_id` will be set as the id |
| source | **YES** | where the event come from |
| type | **YES** | what's the event's type |
| time | NO | the time of this event generated with RFC3339 encoding |
| data | **YES** | the body of`ChangeEvent`, it's defined as `Event` in [mongodb.proto](../../schemas/database/mongodb.proto) |
| data.metadata | NO | the metadata of this event, it's defined as`Metadata` in [base.proto](../../schemas/base/base.proto) , in the most cases users can be ignored |
| data.op | **YES** | the event operation of this event, it's defined as`Operation` in [database.proto](../../schemas/database/database.proto) |
| data.raw | NO | the raw data of this event, it's defined as "Raw" in [database.proto](../../schemas/database/database.proto) |
| data.insert | NO | it's defined as`InsertEvent` in [mongodb.proto](../../schemas/database/mongodb.proto) |
| data.update | NO | it's defined as`UpdateEvent` in [mongodb.proto](../../schemas/database/mongodb.proto) |
| vancemongosinkdatabase | **YES** | which `database` the event into |
| vancemongosinkcollection | **YES** | which `collection` the event into |

## Examples

### insert document

```shell
curl --location --request POST 'http://127.0.0.1:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "62ff305f779a73966deb3877",
"source": "mongodb.replicaset-01.test.source",
"type": "test.source",
"datacontenttype": "application/json",
"time": "2022-08-26T18:42:16Z",
"data": {
"op": "INSERT",
"insert": {
"document": {
"a": 1234
}
}
},
"vancemongosinkdatabase":"test",
"vancemongosinkcollection": "sink",
}'
```

### update document

```shell
curl --location --request POST 'http://127.0.0.1:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "62ff305f779a73966deb3877",
"source": "mongodb.replicaset-01.test.source",
"type": "test.source",
"datacontenttype": "application/json",
"time": "2022-08-26T18:42:16Z",
"data": {
"op": "UPDATE",
"update": {
"updateDescription": {
"removedFields": [],
"truncatedArrays": [],
"updatedFields": {
"a": 12314
}
}
}
},
"vancemongosinkdatabase":"test",
"vancemongosinkcollection": "sink",
}'
```

### delete document

```shell
curl --location --request POST 'http://127.0.0.1:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "62ff305f779a73966deb3877",
"source": "mongodb.replicaset-01.test.source",
"type": "test.source",
"datacontenttype": "application/json",
"time": "2022-08-26T18:42:16Z",
"data": {
"op": "DELETE"
},
"vancemongosinkdatabase":"test",
"vancemongosinkcollection": "sink",
}'
```
24 changes: 24 additions & 0 deletions connectors/database/mongodb-sink/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 Linkall Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"github.com/linkall-labs/cdk-go"
"github.com/linkall-labs/connector/mongodb-sink/internal"
)

func main() {
cdkgo.RunSink(internal.NewMongoSink())
}
8 changes: 8 additions & 0 deletions connectors/database/mongodb-sink/crd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: vance.linkall.com/v1alpha
kind: Connector
metadata:
name: connector-mongo-sink
labels:
app: connector-mongo-sink
spec:
# TODO finish
41 changes: 41 additions & 0 deletions connectors/database/mongodb-sink/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
module github.com/linkall-labs/connector/mongodb-sink

go 1.18

require (
github.com/cloudevents/sdk-go/v2 v2.11.0
github.com/golang/protobuf v1.5.2
github.com/linkall-labs/cdk-go v0.0.0
github.com/linkall-labs/connector/proto v0.0.0
go.mongodb.org/mongo-driver v1.10.1
google.golang.org/protobuf v1.28.1
)

replace (
github.com/linkall-labs/cdk-go => ../../../../cdk-go
github.com/linkall-labs/connector/proto => ../../schemas/pkg
)

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
Loading