Skip to content

Commit

Permalink
DBZ-7755 Refactor exporting to CloudEvents: remove RecordParser
Browse files Browse the repository at this point in the history
  • Loading branch information
rkudryashov authored and jpechane committed Apr 8, 2024
1 parent debdf4b commit 9ead9b8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
*/
package io.debezium.connector.informix.converters;

import java.util.Set;

import io.debezium.connector.AbstractSourceInfo;
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.RecordParser;
import io.debezium.converters.spi.SerializerType;
import io.debezium.data.Envelope;
import io.debezium.util.Collect;

/**
* CloudEvents maker for records produced by the Informix connector.
Expand All @@ -18,14 +22,24 @@
*/
public class InformixCloudEventsMaker extends CloudEventsMaker {

public InformixCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
super(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName);
static final String CHANGE_LSN_KEY = "change_lsn";
static final String COMMIT_LSN_KEY = "commit_lsn";

static final Set<String> IFX_SOURCE_FIELDS = Collect.unmodifiableSet(CHANGE_LSN_KEY, COMMIT_LSN_KEY);

public InformixCloudEventsMaker(RecordAndMetadata recordAndMetadata, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
super(recordAndMetadata, contentType, dataSchemaUriBase, cloudEventsSchemaName, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
}

@Override
public String ceId() {
return "name:" + recordParser.getMetadata(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";change_lsn:" + recordParser.getMetadata(InformixRecordParser.CHANGE_LSN_KEY)
+ ";commit_lsn:" + recordParser.getMetadata(InformixRecordParser.COMMIT_LSN_KEY);
return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";change_lsn:" + sourceField(CHANGE_LSN_KEY)
+ ";commit_lsn:" + sourceField(COMMIT_LSN_KEY);
}

@Override
public Set<String> connectorSpecificSourceFields() {
return IFX_SOURCE_FIELDS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.CloudEventsProvider;
import io.debezium.converters.spi.RecordParser;
import io.debezium.converters.spi.SerializerType;

/**
Expand All @@ -25,12 +24,7 @@ public String getName() {
}

@Override
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
return new InformixRecordParser(recordAndMetadata);
}

@Override
public CloudEventsMaker createMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
return new InformixCloudEventsMaker(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName);
public CloudEventsMaker createMaker(RecordAndMetadata recordAndMetadata, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
return new InformixCloudEventsMaker(recordAndMetadata, contentType, dataSchemaUriBase, cloudEventsSchemaName);
}
}

This file was deleted.

0 comments on commit 9ead9b8

Please sign in to comment.