Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple collections support in the destination #131

Merged
merged 12 commits into from
Apr 12, 2024

Conversation

hariso
Copy link
Contributor

@hariso hariso commented Apr 12, 2024

Description

Closes ConduitIO/conduit#1474.

In the absence of the metadata in the Pg source connector, I've used a processor to insert the metadata:

---
version: 2.2
pipelines:
  - id: pipeline1
    status: running
    name: pipeline1
    description: 'Generator to PostgreSQL'
    connectors:
      - id: generator-source
        type: source
        plugin: "builtin:generator"
        name: generator-source
        settings:
          recordCount: "10"
          # readTime: "1s"
          format.options: "id:int,name:string"
          format.type: "structured"

      - id: pg-destination
        type: destination
        plugin: "standalone:postgres"
        name: pg-destination
        settings:
          url: "postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable"
          # table: "employees"
    
    processors:
      - id: "format-lines"
        plugin: "custom.javascript"
        settings:
          script: |
            function getRandomInt(max) {
              return Math.floor(Math.random() * max);
            }

            function getRandomBoolean() {
              return Math.random() < 0.5;

            }

            function process(rec) {
              rec.Key = RawData(JSON.stringify({"id": getRandomInt(1000)}));
              if (getRandomBoolean()) {
                rec.Metadata["opencdc.collection"] = "contractors"
              } else {
                rec.Metadata["opencdc.collection"] = "employees"
              }

              return rec;
            }

Tested with a static table name and also the default Go template.

I've renamed the GitHub job build to test because it more accurately describes what it does. build is a required PR check, but I'll handle the rename once the PR has been approved.

Quick checks:

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

@hariso hariso marked this pull request as ready for review April 12, 2024 10:32
@hariso hariso requested a review from a team as a code owner April 12, 2024 10:32
Copy link
Member

@raulb raulb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Once this ships I'll rebase #129.

.github/workflows/test.yml Show resolved Hide resolved
@hariso hariso enabled auto-merge (squash) April 12, 2024 14:56
@hariso hariso merged commit e7c451c into main Apr 12, 2024
3 checks passed
@hariso hariso deleted the haris/multiple-collections-destination branch April 12, 2024 14:58
raulb added a commit that referenced this pull request Apr 16, 2024
@raulb raulb mentioned this pull request Apr 16, 2024
7 tasks
raulb added a commit that referenced this pull request Apr 19, 2024
lovromazgon added a commit that referenced this pull request Apr 24, 2024
* feat: read from multiple tables

* use sdk constant

* feat(longpoll): use opencdc.collection

* read from all tables

* update paramgen.go

* update readme

* restore readme

* read columns from multiple tables

* fix typo

* restore

* leftovers from #131

* more leftovers

* update readme on destination

* update

* update readme

* update comment

* update comment

* use opencdc constant

* start making changes on readme

* address after rebase

* wip

* wip

* tie snapshot iterator into longpolling mode

* get table key if not supplied manually

* ensure snapshot iterator waits for all acks

* fix snapshot iterator tests

* add test for ensuring iterator waits for acks

* fix cdc tests

* undo changes on readme

* address PR review

* use pg_tables

---------

Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MC: Postgres Destination - Write to multiple tables
3 participants