Skip to content

Commit

Permalink
add mongodb-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
wenfengwang committed Aug 26, 2022
1 parent a690900 commit de9f658
Show file tree
Hide file tree
Showing 24 changed files with 1,837 additions and 29 deletions.
1 change: 1 addition & 0 deletions connectors/database/mongodb-sink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# TODO finish
9 changes: 9 additions & 0 deletions connectors/database/mongodb-sink/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
WORKDIR=$(shell pwd)
PROTO_ROOT=${WORKDIR}/../../schemas

generate-pb:
mkdir -p proto
protoc -I=${PROTO_ROOT} \
-I=${PROTO_ROOT}/thirds \
--go_out=paths=:proto \
${PROTO_ROOT}/database/mongodb.proto
189 changes: 189 additions & 0 deletions connectors/database/mongodb-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# MongoDB Connector

## Introduction

This is the template README.md of the connector, a real example can be found in [mongo](../mongodb/README.md)

## How to use

Copy this template to your connector directory, and finish all section.

### quickstart

```bash
docker run -it --rm public.ecr.aws/vanus/connector/xxx:latest /etc/vance/xxx/start.sh \
--volume /xxx/config.json /etc/vance/xxx/config.json \
--volume /xxx/secret.json /etc/vance/xxx/secret.json \
--env XXX_CONNECTOR_HOME=/etc/vance/xxx
```

### vance

Coming soon, it depends on Vance Operator, the experience of it will be like follow:

```bash
kubectl apply -f https://raw.githubusercontent.com/linkall-labs/vance/main/connectors/xxx/xxx.yml
# or
vsctl connectors create mongodb --source --config /xxx/config.josn --secret /xxx/secret.json
```

### k8s

```bash
kubectl apply -f https://raw.githubusercontent.com/linkall-labs/vance/main/connectors/xxx/xxx-bare.yml
```

## Configuration

### config.json

```json
{
"v_target": "http://localhost:8080",
"xxx": "xxx",
"yyy": "yyy"
}
```

| Name | Required | Default | Description |
| :--------- | :--------: | :-------: | ------------------------------------- |
| v_target | **YES** | - | Target URL will send CloudEvents to |
| xxx | **YES** | - | xxxxx |
| yyy | NO | empty | |

xxxxxx

### secret.json

```json
{
"xxx": "xxx",
"yyy": "xxx",
"zzz": "xxx"
}
```


| Name | Required | Default | Description |
| :----- | :--------: | :-------: | ------------- |
| xxx | **YES** | babala | xxx |
| yyy | NO | empty | yyy |
| zzz | NO | empty | zzz |

The `user` and `password` are required only when MongoDB is configured to use authentication. This `authsoure` required
only when MongoDB is configured to use authentication with another authentication database than admin.

## Schema

The output events' schema is a [CloudEvent](https://github.com/cloudevents/spec) format, and each field are explained
follows.


| field | description |
| -------------------------- | ------------------------------------- |
| id | the bson`_id` will be set as the id |
| source | xxx |
| type | xxx |
| time | xxx |
| data | the body of`ChangeEvent` |
| data.xxx | ... |
| data.yyy | ... |
| data.aaa.bbb | ... |
| vance{conenctorname}xxx} | ... |
| vance{conenctorname}yyy | ... |

### Different Data Body explanation1(if it's needed)

```json
{
"specversion": "1.0",
"id": "6304855bccaea8fcf8a159f2",
"source": "mongodb.replicaset-01.test.source",
"type": "test.source",
"datacontenttype": "application/json",
"time": "2022-08-23T07:44:27Z",
"data": {
"full": {
"download": "1234",
"connector": "mongodb",
"_id": "6304855bccaea8fcf8a159f2",
"version": "v0.3.0"
}
},
"vancemongodbrecognized": true,
"vancemongodbversion": "1.9.4.Final",
"vancemongodbsnapshot": "false",
"vancemongodbname": "test",
"vancemongodbord": "1",
"vancemongodboperation": "insert"
}
```

### Different Data Body explanation2(if it's needed)

```json
{
"specversion": "1.0",
"id": "6304855bccaea8fcf8a159f2",
"source": "mongodb.replicaset-01.test.source",
"type": "test.source",
"datacontenttype": "application/json",
"time": "2022-08-23T08:08:05Z",
"data": {
"full": {
"download": "1240",
"connector": "mongodb",
"_id": "6304855bccaea8fcf8a159f2",
"version": "v0.3.0"
},
"changed": {
"updated": {
"download": 1240
}
}
},
"vancemongodbrecognized": true,
"vancemongodbversion": "1.9.4.Final",
"vancemongodbsnapshot": "false",
"vancemongodbname": "test",
"vancemongodbord": "1",
"vancemongodboperation": "update"
}
```

...

### Unrecognized Event(if it's needed)

Although we do our best to deal with different events, but it's not easy to make sure that all raw data are parsed to a
structured format because we just only can deal the raw we knew. So, if there is error happened when we try to parse raw
data, we will see the event is an unrecognized event and set `vancemongodbrecognized` to false instead of discard in
order to guaranty no data loss.

User should put event with `vancemongodbrecognized=false` to the `deadLetter` in the further step. we're appreciated
that you can create an issue to feedback us about the unrecognized event, we will fix it as soon as possible.

```json
{
"specversion": "1.0",
"id": "unknown",
"source": "unknown",
"type": "unknown",
"datacontenttype": "application/json",
"time": "unknown",
"data": {
"rawKey": "xxxxx",
"rawValue": "xxxx"
},
"vancemongodbrecognized": false,
"vancemongodboperation": "unknown"
}
```

## example

xxxx

## Acknowledgement

The MongoDB Connector built on [debezium](https://github.com/debezium/debezium)
26 changes: 26 additions & 0 deletions connectors/database/mongodb-sink/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"context"
"github.com/linkall-labs/connector/mongodb-sink/internal"
"os"
)

func main() {
ga := internal.NewMongoSink(internal.Config{Port: 8081})
err := ga.StartReceive(context.Background())
if err != nil {
//log.Error(context.Background(), "start controller proxy failed", map[string]interface{}{
// log.KeyError: err,
//})
os.Exit(-1)
}

err = ga.StartReceive(context.Background())
if err != nil {
//log.Error(context.Background(), "start CloudEvents gateway failed", map[string]interface{}{
// log.KeyError: err,
//})
os.Exit(-1)
}
}
52 changes: 52 additions & 0 deletions connectors/database/mongodb-sink/crd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# TODO finish
apiVersion: apps/v1
kind: Deployment
metadata:
name: mongodb-connector
labels:
app: mongodb-connector
spec:
selector:
matchLabels:
app: mongodb-connector
replicas: 1
template:
metadata:
labels:
app: mongodb-connector
spec:
containers:
- name: mongodb-connector
image: public.ecr.aws/vanus/connector/mongodb:latest
imagePullPolicy: Always
command: [ "sh", "-c", "/var/mongodb/run.sh" ]
resources:
requests:
cpu: 100m
memory: 1000Mi
env:
- name: MONGODB_HOSTS
value: "localhost:27017"
- name: MONGODB_NAME
value: "admin"
- name: MONGODB_AUTHSOURCE
value: "admin"
- name: DB_INCLUDE_LIST
value: "test"
volumeMounts:
- name: secret
mountPath: "/var/mongodb/secret.json"
readOnly: true
volumes:
- name: secret
secret:
secretName: mongodb-secret
---
apiVersion: v1
kind: Secret
metadata:
name: mongodb-secret
type: Opaque
data:
user: admin
password: admin
Loading

0 comments on commit de9f658

Please sign in to comment.