Skip to content

Commit

Permalink
Directly throw the exceptions without using fault sequence and add bu…
Browse files Browse the repository at this point in the history
…ndled dependencies
  • Loading branch information
RusJaI committed Aug 10, 2023
1 parent 0663071 commit 1d9c9e0
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@
<groupId>org.apache.axis2.transport</groupId>
<artifactId>axis2-transport-rabbitmq-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.orbit.debezium</groupId>
<artifactId>debezium</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
Expand All @@ -181,36 +185,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.mediation</groupId>
<artifactId>org.wso2.carbon.inbound.endpoint.persistence</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.securevault</artifactId>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@
import org.apache.synapse.commons.json.JsonUtil;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.inbound.InboundEndpoint;
import org.apache.synapse.mediators.MediatorFaultHandler;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericConstants;

import java.util.Map;
import java.util.Properties;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_DATABASE_NAME;
Expand All @@ -52,7 +49,6 @@ public class CDCInjectHandler {
private boolean sequential;
private Properties cdcProperties;
private SynapseEnvironment synapseEnvironment;
private Map<String, Object> transportHeaders;

public CDCInjectHandler(String injectingSeq, String onErrorSeq, boolean sequential,
SynapseEnvironment synapseEnvironment, Properties cdcProperties) {
Expand Down Expand Up @@ -98,24 +94,18 @@ public boolean invoke(Object object, String inboundEndpointName) {
cdcEventOutput.getOutputJsonPayload().toString(), true, true);

} catch (AxisFault ex) {
logger.error("Error while creating the OMElement", ex);
msgCtx.setProperty(SynapseConstants.ERROR_CODE, GenericConstants.INBOUND_BUILD_ERROR);
msgCtx.setProperty(SynapseConstants.ERROR_MESSAGE, ex.getMessage());
SequenceMediator faultSequence = getFaultSequence(msgCtx);
faultSequence.mediate(msgCtx);
return true;
handleError("Error while creating the OMElement");
}

// Inject the message to the sequence.
try {
msgCtx.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
} catch (AxisFault e) {
throw new RuntimeException("Error while creating the SOAP Envelop", e);
handleError("Error while creating the SOAP Envelop");
}

if (injectingSeq == null || injectingSeq.equals("")) {
logger.error("Sequence name not specified. Sequence : " + injectingSeq);
return false;
handleError("Injecting sequence name not specified");
}
SequenceMediator seq = (SequenceMediator) synapseEnvironment.getSynapseConfiguration()
.getSequence(injectingSeq);
Expand All @@ -126,32 +116,23 @@ public boolean invoke(Object object, String inboundEndpointName) {
if (!seq.isInitialized()) {
seq.init(synapseEnvironment);
}
SequenceMediator faultSequence = getFaultSequence(msgCtx);
MediatorFaultHandler mediatorFaultHandler = new MediatorFaultHandler(faultSequence);
msgCtx.pushFaultHandler(mediatorFaultHandler);

seq.setErrorHandler(onErrorSeq);

if (!synapseEnvironment.injectInbound(msgCtx, seq, sequential)) {
return false;
handleError("Failed to inject the sequence");
}
} else {
logger.error("Sequence: " + injectingSeq + " not found");
handleError("Sequence:" + injectingSeq + " not found");
}
}
return true;
}

private SequenceMediator getFaultSequence(org.apache.synapse.MessageContext synCtx) {
SequenceMediator faultSequence = null;
if (this.onErrorSeq != null) {
faultSequence = (SequenceMediator) synCtx.getSequence(this.onErrorSeq);
}

if (faultSequence == null) {
faultSequence = (SequenceMediator) synCtx.getFaultSequence();
}

return faultSequence;
}
private void handleError(String msg) {
logger.error(msg);
throw new RuntimeException(msg);
}

/**
* Create the initial message context for the file
Expand All @@ -163,7 +144,6 @@ private org.apache.synapse.MessageContext createMessageContext() {
.getAxis2MessageContext();
axis2MsgCtx.setServerSide(true);
axis2MsgCtx.setMessageID(UUIDGenerator.getUUID());
axis2MsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, transportHeaders);
msgCtx.setProperty(MessageContext.CLIENT_API_NON_BLOCKING, true);
return msgCtx;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
<inboundEndpoint name="cdc-inbound-endpoint" onError="fault" protocol="cdc" sequence="cdc_process_seq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="inbound.cdc.port">9091</parameter>
<parameter name="interval">1000</parameter>
<parameter name="name">engine</parameter>
<parameter name="snapshot.mode">initial</parameter>
<parameter name="offset.storage">org.apache.kafka.connect.storage.FileOffsetBackingStore</parameter>
<!--parameter name="offset.storage.file.filename">~Documents/mytemp1/offsets.dat</parameter-->
<parameter name="offset.storage.file.filename">cdc/offsetStorage/offsets1_.dat</parameter>
<parameter name="connector.class">io.debezium.connector.mysql.MySqlConnector</parameter>
<parameter name="database.hostname">localhost</parameter>
<parameter name="database.port">3306</parameter>
<parameter name="database.user">root</parameter>
<parameter name="database.password">rusiri@wso2</parameter>
<parameter name="database.password">wso2:vault-lookup('mysql_password')</parameter>
<parameter name="database.dbname">students</parameter>
<parameter name="database.server.id">85744</parameter>
<parameter name="database.server.id">8574444</parameter>
<parameter name="database.server.name">server_1</parameter>
<parameter name="topic.prefix">topic2</parameter>
<parameter name="schema.history.internal">io.debezium.storage.file.history.FileSchemaHistory</parameter>
<!--parameter name="schema.history.internal.file.filename">~Documents/mytemp1/schemahistory.dat</parameter-->
<parameter name="schema.history.internal.file.filename">cdc/schemaHistory/schema_history1_.dat</parameter>
<parameter name="table.include.list">students.marks</parameter>
<parameter name="skipped.operations">c</parameter>
<parameter name="allowed.operations">create</parameter>
</parameters>
</inboundEndpoint>
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
<sequence name="cdc_process_seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="full"/>
<log level="custom">
<property expression="get-property('database')" name="database" xmlns:ns="http://org.apache.synapse/xsd"/>
<property expression="get-property('cdc.database')" name="cdc.database" xmlns:ns="http://org.apache.synapse/xsd"/>
</log>
<log level="custom">
<property expression="get-property('tables')" name="tables" xmlns:ns="http://org.apache.synapse/xsd"/>
<property expression="get-property('cdc.tables')" name="cdc.tables" xmlns:ns="http://org.apache.synapse/xsd"/>
</log>
<log level="custom">
<property expression="get-property('operations')" name="operations" xmlns:ns="http://org.apache.synapse/xsd"/>
<property expression="get-property('cdc.operations')" name="cdc.operations" xmlns:ns="http://org.apache.synapse/xsd"/>
</log>
<log level="custom">
<property expression="get-property('ts_ms')" name="ts_ms" xmlns:ns="http://org.apache.synapse/xsd"/>
<property expression="get-property('cdc.ts_ms')" name="cdc.ts_ms" xmlns:ns="http://org.apache.synapse/xsd"/>
</log>
</sequence>
118 changes: 6 additions & 112 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1457,116 +1457,14 @@
<version>${opencensus.orbit.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<groupId>org.wso2.orbit.debezium</groupId>
<artifactId>debezium</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>${mysql.binlog.connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${version.com.google.code.gson}</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -1664,12 +1562,8 @@
<com.sun.jaxb.impl.version>2.3.1</com.sun.jaxb.impl.version>

<!-- Debezium version -->
<debezium.version>2.1.4.Final</debezium.version>
<debezium.version>2.1.4.Final.wso2v1</debezium.version>
<ojdbc8.version>21.3.0.0</ojdbc8.version>
<mysql.binlog.connector.version>0.21.0</mysql.binlog.connector.version>

<!-- Kafka version -->
<kafka.version>3.3.1</kafka.version>

<synapse.version>4.0.0-wso2v40</synapse.version>
<imp.pkg.version.synapse>[4.0.0, 4.0.1)</imp.pkg.version.synapse>
Expand Down

0 comments on commit 1d9c9e0

Please sign in to comment.