Skip to content

Commit

Permalink
Merge pull request #3062 from redpanda-data/pgcdc
Browse files Browse the repository at this point in the history
pgcdc: consolidate mode into operation
  • Loading branch information
Jeffail authored Dec 5, 2024
2 parents a03a02a + 181ad11 commit 65763d9
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Changed

- The `pg_stream` input has been renamed to `postgres_cdc`. The old name will continue to function as an alias. (@rockwotj)
- The `postgres_cdc` input no longer emits `mode` metadata and instead snapshot reads set `operation` metadata to be `read` instead of `insert`. (@rockwotj)

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions docs/modules/components/pages/inputs/pg_stream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ Additionally, if `stream_snapshot` is set to true, then the existing data in the
== Metadata
This input adds the following metadata fields to each message:
- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing)
- table (Name of the table that the message originated from)
- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if `include_transaction_markers` is enabled)
- operation (Type of operation that generated the message: "read", "insert", "update", or "delete". "read" is from messages that are read in the initial snapshot phase. This will also be "begin" and "commit" if `include_transaction_markers` is enabled)
- lsn (the log sequence number in postgres)
== Fields
Expand Down
4 changes: 2 additions & 2 deletions docs/modules/components/pages/inputs/postgres_cdc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ Additionally, if `stream_snapshot` is set to true, then the existing data in the
== Metadata
This input adds the following metadata fields to each message:
- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing)
- table (Name of the table that the message originated from)
- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if `include_transaction_markers` is enabled)
- operation (Type of operation that generated the message: "read", "insert", "update", or "delete". "read" is from messages that are read in the initial snapshot phase. This will also be "begin" and "commit" if `include_transaction_markers` is enabled)
- lsn (the log sequence number in postgres)
== Fields
Expand Down
7 changes: 3 additions & 4 deletions internal/impl/postgresql/input_pg_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Additionally, if ` + "`" + fieldStreamSnapshot + "`" + ` is set to true, then th
== Metadata
This input adds the following metadata fields to each message:
- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing)
- table (Name of the table that the message originated from)
- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if ` + "`" + fieldIncludeTxnMarkers + "`" + ` is enabled)
- operation (Type of operation that generated the message: "read", "insert", "update", or "delete". "read" is from messages that are read in the initial snapshot phase. This will also be "begin" and "commit" if ` + "`" + fieldIncludeTxnMarkers + "`" + ` is enabled)
- lsn (the log sequence number in postgres)
`).
Field(service.NewStringField(fieldDSN).
Description("The Data Source Name for the PostgreSQL database in the form of `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]`. Please note that Postgres enforces SSL by default, you can override this with the parameter `sslmode=disable` if required.").
Expand Down Expand Up @@ -259,7 +259,7 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser
return nil, err
}

return conf.WrapBatchInputExtractTracingSpanMapping("pg_stream", r)
return conf.WrapBatchInputExtractTracingSpanMapping("postgres_cdc", r)
}

// validateSimpleString ensures we aren't vuln to SQL injection
Expand Down Expand Up @@ -367,7 +367,6 @@ func (p *pgStreamInput) processStream(pgStream *pglogicalstream.Stream, batcher
break
}
batchMsg := service.NewMessage(mb)
batchMsg.MetaSet("mode", string(message.Mode))
batchMsg.MetaSet("table", message.Table)
batchMsg.MetaSet("operation", string(message.Operation))
if message.LSN != nil {
Expand Down
10 changes: 4 additions & 6 deletions internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,10 @@ func (s *Stream) processSnapshot() error {

snapshotChangePacket := StreamMessage{
LSN: nil,
Mode: StreamModeSnapshot,
Operation: InsertOpType,

Table: tableWithoutSchema,
Schema: s.schema,
Data: data,
Operation: ReadOpType,
Table: tableWithoutSchema,
Schema: s.schema,
Data: data,
}

if rowsCount%100 == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func isCommitMessage(WALData []byte) (bool, *CommitMessage, error) {
// before the change message.
func decodePgOutput(WALData []byte, relations map[uint32]*RelationMessage, typeMap *pgtype.Map) (*StreamMessage, error) {
logicalMsg, err := Parse(WALData)
message := &StreamMessage{Mode: StreamModeStreaming}
message := &StreamMessage{}

if err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions internal/impl/postgresql/pglogicalstream/stream_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
type OpType string

const (
// ReadOpType is a snapshot read
ReadOpType OpType = "read"
// InsertOpType is a database insert
InsertOpType OpType = "insert"
// UpdateOpType is a database update
Expand All @@ -36,11 +38,10 @@ const (

// StreamMessage represents a single change from the database
type StreamMessage struct {
LSN *string `json:"lsn"`
Operation OpType `json:"operation"`
Schema string `json:"schema"`
Table string `json:"table"`
Mode StreamMode `json:"mode"`
LSN *string `json:"lsn"`
Operation OpType `json:"operation"`
Schema string `json:"schema"`
Table string `json:"table"`
// For deleted messages - there will be old changes if replica identity set to full or empty changes
Data any `json:"data"`
}

0 comments on commit 65763d9

Please sign in to comment.