Skip to content

Commit

Permalink
OpenCDC integration (#80)
Browse files Browse the repository at this point in the history
* integrate OpenCDC record

* make parquet writer less dumb

* implement detection of updates, optimize cache handling

* fix cache population and tests

* update readme

* update SDK utilities

* update connector sdk version

* update Go version
  • Loading branch information
lovromazgon authored Aug 5, 2022
1 parent 95b827a commit 1955858
Show file tree
Hide file tree
Showing 18 changed files with 279 additions and 288 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.17
go-version: 1.18

- name: Test
env:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.18
- name: golangci-lint
uses: golangci/golangci-lint-action@v3.2.0
with:
version: v1.42.1
version: v1.47.3
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Run `make`.

### Testing

Run `make test` to run all the tests. You must set the environment variables (`AWS_ACCESS_KEY_ID`
, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`)
Run `make test` to run all the tests. You must set the environment variables (`AWS_ACCESS_KEY_ID`,
`AWS_SECRET_ACCESS_KEY`, `AWS_REGION`)
before you run all the tests. If not set, the tests that use these variables will be ignored.

## S3 Source
Expand All @@ -26,13 +26,11 @@ occur. After that, the
### Change Data Capture (CDC)

This connector implements CDC features for S3 by scanning the bucket for changes every
`pollingPeriod` and detecting any change that happened after a certain timestamp. These changes (update, delete, insert)
`pollingPeriod` and detecting any change that happened after a certain timestamp. These changes (update, delete, create)
are then inserted into a buffer that is checked on each Read request.

* To capture "delete" actions, the S3 bucket versioning must be enabled, and the output record will have a metadata
of `"action":"delete"`.
* To capture "insert" or "update" actions, the bucket versioning doesn't matter, and no metadata is added for these
actions.
* To capture "delete" and "update", the S3 bucket versioning must be enabled.
* To capture "create" actions, the bucket versioning doesn't matter.

#### Position Handling

Expand Down Expand Up @@ -97,4 +95,4 @@ The config passed to `Configure` can contain the following fields.
| `aws.bucket` | the AWS S3 bucket name | yes | "bucket_name" |
| `format` | the destination format, either "json" or "parquet" | yes | "json" |
| `bufferSize` | the buffer size {when full, the files will be written to destination}, default is "1000", max is "100000" | no | "100" |
| `prefix` | the key prefix for S3 destination | no | "conduit-" |
| `prefix` | the key prefix for S3 destination | no | "conduit-" |
4 changes: 1 addition & 3 deletions cmd/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package main

import (
s3 "github.com/conduitio/conduit-connector-s3"
"github.com/conduitio/conduit-connector-s3/destination"
"github.com/conduitio/conduit-connector-s3/source"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
sdk.Serve(s3.Specification, source.NewSource, destination.NewDestination)
sdk.Serve(s3.Connector)
}
27 changes: 27 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3

import (
"github.com/conduitio/conduit-connector-s3/destination"
"github.com/conduitio/conduit-connector-s3/source"
sdk "github.com/conduitio/conduit-connector-sdk"
)

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSource: source.NewSource,
NewDestination: destination.NewDestination,
}
12 changes: 8 additions & 4 deletions destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,15 @@ func generateRecords(count int) []sdk.Record {

for i := 0; i < count; i++ {
result = append(result, sdk.Record{
Operation: sdk.OperationCreate,
Position: []byte(strconv.Itoa(i)),
Payload: sdk.RawData(fmt.Sprintf("this is a message #%d", i+1)),
Key: sdk.RawData(fmt.Sprintf("key-%d", i)),
Metadata: map[string]string{"number": fmt.Sprint(i)},
CreatedAt: time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC).Add(time.Duration(i) * time.Second),
Payload: sdk.Change{
After: sdk.RawData(fmt.Sprintf("this is a message #%d", i+1)),
},
Key: sdk.RawData(fmt.Sprintf("key-%d", i)),
Metadata: map[string]string{
sdk.MetadataCreatedAt: strconv.FormatInt(time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC).Add(time.Duration(i)*time.Second).UnixNano(), 10),
},
})
}

Expand Down
50 changes: 25 additions & 25 deletions destination/fixtures/reference-1.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
{"Position":"0","Payload":"this is a message #1","Key":"key-0","Metadata":{"number":"0"},"CreatedAt":1577840400000000000}
{"Position":"1","Payload":"this is a message #2","Key":"key-1","Metadata":{"number":"1"},"CreatedAt":1577840401000000000}
{"Position":"2","Payload":"this is a message #3","Key":"key-2","Metadata":{"number":"2"},"CreatedAt":1577840402000000000}
{"Position":"3","Payload":"this is a message #4","Key":"key-3","Metadata":{"number":"3"},"CreatedAt":1577840403000000000}
{"Position":"4","Payload":"this is a message #5","Key":"key-4","Metadata":{"number":"4"},"CreatedAt":1577840404000000000}
{"Position":"5","Payload":"this is a message #6","Key":"key-5","Metadata":{"number":"5"},"CreatedAt":1577840405000000000}
{"Position":"6","Payload":"this is a message #7","Key":"key-6","Metadata":{"number":"6"},"CreatedAt":1577840406000000000}
{"Position":"7","Payload":"this is a message #8","Key":"key-7","Metadata":{"number":"7"},"CreatedAt":1577840407000000000}
{"Position":"8","Payload":"this is a message #9","Key":"key-8","Metadata":{"number":"8"},"CreatedAt":1577840408000000000}
{"Position":"9","Payload":"this is a message #10","Key":"key-9","Metadata":{"number":"9"},"CreatedAt":1577840409000000000}
{"Position":"10","Payload":"this is a message #11","Key":"key-10","Metadata":{"number":"10"},"CreatedAt":1577840410000000000}
{"Position":"11","Payload":"this is a message #12","Key":"key-11","Metadata":{"number":"11"},"CreatedAt":1577840411000000000}
{"Position":"12","Payload":"this is a message #13","Key":"key-12","Metadata":{"number":"12"},"CreatedAt":1577840412000000000}
{"Position":"13","Payload":"this is a message #14","Key":"key-13","Metadata":{"number":"13"},"CreatedAt":1577840413000000000}
{"Position":"14","Payload":"this is a message #15","Key":"key-14","Metadata":{"number":"14"},"CreatedAt":1577840414000000000}
{"Position":"15","Payload":"this is a message #16","Key":"key-15","Metadata":{"number":"15"},"CreatedAt":1577840415000000000}
{"Position":"16","Payload":"this is a message #17","Key":"key-16","Metadata":{"number":"16"},"CreatedAt":1577840416000000000}
{"Position":"17","Payload":"this is a message #18","Key":"key-17","Metadata":{"number":"17"},"CreatedAt":1577840417000000000}
{"Position":"18","Payload":"this is a message #19","Key":"key-18","Metadata":{"number":"18"},"CreatedAt":1577840418000000000}
{"Position":"19","Payload":"this is a message #20","Key":"key-19","Metadata":{"number":"19"},"CreatedAt":1577840419000000000}
{"Position":"20","Payload":"this is a message #21","Key":"key-20","Metadata":{"number":"20"},"CreatedAt":1577840420000000000}
{"Position":"21","Payload":"this is a message #22","Key":"key-21","Metadata":{"number":"21"},"CreatedAt":1577840421000000000}
{"Position":"22","Payload":"this is a message #23","Key":"key-22","Metadata":{"number":"22"},"CreatedAt":1577840422000000000}
{"Position":"23","Payload":"this is a message #24","Key":"key-23","Metadata":{"number":"23"},"CreatedAt":1577840423000000000}
{"Position":"24","Payload":"this is a message #25","Key":"key-24","Metadata":{"number":"24"},"CreatedAt":1577840424000000000}
{"Operation":"Create","Position":"0","Payload":"this is a message #1","Key":"key-0","Metadata":{"opencdc.createdAt":"1577840400000000000"}}
{"Operation":"Create","Position":"1","Payload":"this is a message #2","Key":"key-1","Metadata":{"opencdc.createdAt":"1577840401000000000"}}
{"Operation":"Create","Position":"2","Payload":"this is a message #3","Key":"key-2","Metadata":{"opencdc.createdAt":"1577840402000000000"}}
{"Operation":"Create","Position":"3","Payload":"this is a message #4","Key":"key-3","Metadata":{"opencdc.createdAt":"1577840403000000000"}}
{"Operation":"Create","Position":"4","Payload":"this is a message #5","Key":"key-4","Metadata":{"opencdc.createdAt":"1577840404000000000"}}
{"Operation":"Create","Position":"5","Payload":"this is a message #6","Key":"key-5","Metadata":{"opencdc.createdAt":"1577840405000000000"}}
{"Operation":"Create","Position":"6","Payload":"this is a message #7","Key":"key-6","Metadata":{"opencdc.createdAt":"1577840406000000000"}}
{"Operation":"Create","Position":"7","Payload":"this is a message #8","Key":"key-7","Metadata":{"opencdc.createdAt":"1577840407000000000"}}
{"Operation":"Create","Position":"8","Payload":"this is a message #9","Key":"key-8","Metadata":{"opencdc.createdAt":"1577840408000000000"}}
{"Operation":"Create","Position":"9","Payload":"this is a message #10","Key":"key-9","Metadata":{"opencdc.createdAt":"1577840409000000000"}}
{"Operation":"Create","Position":"10","Payload":"this is a message #11","Key":"key-10","Metadata":{"opencdc.createdAt":"1577840410000000000"}}
{"Operation":"Create","Position":"11","Payload":"this is a message #12","Key":"key-11","Metadata":{"opencdc.createdAt":"1577840411000000000"}}
{"Operation":"Create","Position":"12","Payload":"this is a message #13","Key":"key-12","Metadata":{"opencdc.createdAt":"1577840412000000000"}}
{"Operation":"Create","Position":"13","Payload":"this is a message #14","Key":"key-13","Metadata":{"opencdc.createdAt":"1577840413000000000"}}
{"Operation":"Create","Position":"14","Payload":"this is a message #15","Key":"key-14","Metadata":{"opencdc.createdAt":"1577840414000000000"}}
{"Operation":"Create","Position":"15","Payload":"this is a message #16","Key":"key-15","Metadata":{"opencdc.createdAt":"1577840415000000000"}}
{"Operation":"Create","Position":"16","Payload":"this is a message #17","Key":"key-16","Metadata":{"opencdc.createdAt":"1577840416000000000"}}
{"Operation":"Create","Position":"17","Payload":"this is a message #18","Key":"key-17","Metadata":{"opencdc.createdAt":"1577840417000000000"}}
{"Operation":"Create","Position":"18","Payload":"this is a message #19","Key":"key-18","Metadata":{"opencdc.createdAt":"1577840418000000000"}}
{"Operation":"Create","Position":"19","Payload":"this is a message #20","Key":"key-19","Metadata":{"opencdc.createdAt":"1577840419000000000"}}
{"Operation":"Create","Position":"20","Payload":"this is a message #21","Key":"key-20","Metadata":{"opencdc.createdAt":"1577840420000000000"}}
{"Operation":"Create","Position":"21","Payload":"this is a message #22","Key":"key-21","Metadata":{"opencdc.createdAt":"1577840421000000000"}}
{"Operation":"Create","Position":"22","Payload":"this is a message #23","Key":"key-22","Metadata":{"opencdc.createdAt":"1577840422000000000"}}
{"Operation":"Create","Position":"23","Payload":"this is a message #24","Key":"key-23","Metadata":{"opencdc.createdAt":"1577840423000000000"}}
{"Operation":"Create","Position":"24","Payload":"this is a message #25","Key":"key-24","Metadata":{"opencdc.createdAt":"1577840424000000000"}}
Binary file modified destination/fixtures/reference-1.parquet
Binary file not shown.
50 changes: 25 additions & 25 deletions destination/fixtures/reference-2.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
{"Position":"25","Payload":"this is a message #26","Key":"key-25","Metadata":{"number":"25"},"CreatedAt":1577840425000000000}
{"Position":"26","Payload":"this is a message #27","Key":"key-26","Metadata":{"number":"26"},"CreatedAt":1577840426000000000}
{"Position":"27","Payload":"this is a message #28","Key":"key-27","Metadata":{"number":"27"},"CreatedAt":1577840427000000000}
{"Position":"28","Payload":"this is a message #29","Key":"key-28","Metadata":{"number":"28"},"CreatedAt":1577840428000000000}
{"Position":"29","Payload":"this is a message #30","Key":"key-29","Metadata":{"number":"29"},"CreatedAt":1577840429000000000}
{"Position":"30","Payload":"this is a message #31","Key":"key-30","Metadata":{"number":"30"},"CreatedAt":1577840430000000000}
{"Position":"31","Payload":"this is a message #32","Key":"key-31","Metadata":{"number":"31"},"CreatedAt":1577840431000000000}
{"Position":"32","Payload":"this is a message #33","Key":"key-32","Metadata":{"number":"32"},"CreatedAt":1577840432000000000}
{"Position":"33","Payload":"this is a message #34","Key":"key-33","Metadata":{"number":"33"},"CreatedAt":1577840433000000000}
{"Position":"34","Payload":"this is a message #35","Key":"key-34","Metadata":{"number":"34"},"CreatedAt":1577840434000000000}
{"Position":"35","Payload":"this is a message #36","Key":"key-35","Metadata":{"number":"35"},"CreatedAt":1577840435000000000}
{"Position":"36","Payload":"this is a message #37","Key":"key-36","Metadata":{"number":"36"},"CreatedAt":1577840436000000000}
{"Position":"37","Payload":"this is a message #38","Key":"key-37","Metadata":{"number":"37"},"CreatedAt":1577840437000000000}
{"Position":"38","Payload":"this is a message #39","Key":"key-38","Metadata":{"number":"38"},"CreatedAt":1577840438000000000}
{"Position":"39","Payload":"this is a message #40","Key":"key-39","Metadata":{"number":"39"},"CreatedAt":1577840439000000000}
{"Position":"40","Payload":"this is a message #41","Key":"key-40","Metadata":{"number":"40"},"CreatedAt":1577840440000000000}
{"Position":"41","Payload":"this is a message #42","Key":"key-41","Metadata":{"number":"41"},"CreatedAt":1577840441000000000}
{"Position":"42","Payload":"this is a message #43","Key":"key-42","Metadata":{"number":"42"},"CreatedAt":1577840442000000000}
{"Position":"43","Payload":"this is a message #44","Key":"key-43","Metadata":{"number":"43"},"CreatedAt":1577840443000000000}
{"Position":"44","Payload":"this is a message #45","Key":"key-44","Metadata":{"number":"44"},"CreatedAt":1577840444000000000}
{"Position":"45","Payload":"this is a message #46","Key":"key-45","Metadata":{"number":"45"},"CreatedAt":1577840445000000000}
{"Position":"46","Payload":"this is a message #47","Key":"key-46","Metadata":{"number":"46"},"CreatedAt":1577840446000000000}
{"Position":"47","Payload":"this is a message #48","Key":"key-47","Metadata":{"number":"47"},"CreatedAt":1577840447000000000}
{"Position":"48","Payload":"this is a message #49","Key":"key-48","Metadata":{"number":"48"},"CreatedAt":1577840448000000000}
{"Position":"49","Payload":"this is a message #50","Key":"key-49","Metadata":{"number":"49"},"CreatedAt":1577840449000000000}
{"Operation":"Create","Position":"25","Payload":"this is a message #26","Key":"key-25","Metadata":{"opencdc.createdAt":"1577840425000000000"}}
{"Operation":"Create","Position":"26","Payload":"this is a message #27","Key":"key-26","Metadata":{"opencdc.createdAt":"1577840426000000000"}}
{"Operation":"Create","Position":"27","Payload":"this is a message #28","Key":"key-27","Metadata":{"opencdc.createdAt":"1577840427000000000"}}
{"Operation":"Create","Position":"28","Payload":"this is a message #29","Key":"key-28","Metadata":{"opencdc.createdAt":"1577840428000000000"}}
{"Operation":"Create","Position":"29","Payload":"this is a message #30","Key":"key-29","Metadata":{"opencdc.createdAt":"1577840429000000000"}}
{"Operation":"Create","Position":"30","Payload":"this is a message #31","Key":"key-30","Metadata":{"opencdc.createdAt":"1577840430000000000"}}
{"Operation":"Create","Position":"31","Payload":"this is a message #32","Key":"key-31","Metadata":{"opencdc.createdAt":"1577840431000000000"}}
{"Operation":"Create","Position":"32","Payload":"this is a message #33","Key":"key-32","Metadata":{"opencdc.createdAt":"1577840432000000000"}}
{"Operation":"Create","Position":"33","Payload":"this is a message #34","Key":"key-33","Metadata":{"opencdc.createdAt":"1577840433000000000"}}
{"Operation":"Create","Position":"34","Payload":"this is a message #35","Key":"key-34","Metadata":{"opencdc.createdAt":"1577840434000000000"}}
{"Operation":"Create","Position":"35","Payload":"this is a message #36","Key":"key-35","Metadata":{"opencdc.createdAt":"1577840435000000000"}}
{"Operation":"Create","Position":"36","Payload":"this is a message #37","Key":"key-36","Metadata":{"opencdc.createdAt":"1577840436000000000"}}
{"Operation":"Create","Position":"37","Payload":"this is a message #38","Key":"key-37","Metadata":{"opencdc.createdAt":"1577840437000000000"}}
{"Operation":"Create","Position":"38","Payload":"this is a message #39","Key":"key-38","Metadata":{"opencdc.createdAt":"1577840438000000000"}}
{"Operation":"Create","Position":"39","Payload":"this is a message #40","Key":"key-39","Metadata":{"opencdc.createdAt":"1577840439000000000"}}
{"Operation":"Create","Position":"40","Payload":"this is a message #41","Key":"key-40","Metadata":{"opencdc.createdAt":"1577840440000000000"}}
{"Operation":"Create","Position":"41","Payload":"this is a message #42","Key":"key-41","Metadata":{"opencdc.createdAt":"1577840441000000000"}}
{"Operation":"Create","Position":"42","Payload":"this is a message #43","Key":"key-42","Metadata":{"opencdc.createdAt":"1577840442000000000"}}
{"Operation":"Create","Position":"43","Payload":"this is a message #44","Key":"key-43","Metadata":{"opencdc.createdAt":"1577840443000000000"}}
{"Operation":"Create","Position":"44","Payload":"this is a message #45","Key":"key-44","Metadata":{"opencdc.createdAt":"1577840444000000000"}}
{"Operation":"Create","Position":"45","Payload":"this is a message #46","Key":"key-45","Metadata":{"opencdc.createdAt":"1577840445000000000"}}
{"Operation":"Create","Position":"46","Payload":"this is a message #47","Key":"key-46","Metadata":{"opencdc.createdAt":"1577840446000000000"}}
{"Operation":"Create","Position":"47","Payload":"this is a message #48","Key":"key-47","Metadata":{"opencdc.createdAt":"1577840447000000000"}}
{"Operation":"Create","Position":"48","Payload":"this is a message #49","Key":"key-48","Metadata":{"opencdc.createdAt":"1577840448000000000"}}
{"Operation":"Create","Position":"49","Payload":"this is a message #50","Key":"key-49","Metadata":{"opencdc.createdAt":"1577840449000000000"}}
Binary file modified destination/fixtures/reference-2.parquet
Binary file not shown.
6 changes: 3 additions & 3 deletions destination/format/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@ import (

type jsonRecord struct {
// TODO save schema type
Operation string `json:"Operation"`
Position string `json:"Position"`
Payload string `json:"Payload"`
Key string `json:"Key"`
Metadata map[string]string `json:"Metadata"`
CreatedAt int64 `json:"CreatedAt"`
}

func makeJSONBytes(records []sdk.Record) ([]byte, error) {
buf := bytes.NewBuffer([]byte{})

for _, r := range records {
r := jsonRecord{
Operation: r.Operation.String(),
Position: string(r.Position),
Payload: string(r.Payload.Bytes()),
Payload: string(r.Payload.After.Bytes()),
Key: string(r.Key.Bytes()),
Metadata: r.Metadata,
CreatedAt: r.CreatedAt.UnixNano(),
}

bytes, err := json.Marshal(r)
Expand Down
Loading

0 comments on commit 1955858

Please sign in to comment.