diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/pom.xml b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/pom.xml index 13bf7523bd..20b678602e 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/pom.xml +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/pom.xml @@ -170,6 +170,10 @@ org.apache.axis2.transport axis2-transport-rabbitmq-amqp + + org.wso2.orbit.debezium + debezium + org.apache.logging.log4j log4j-core @@ -181,36 +185,12 @@ test - org.wso2.carbon.mediation - org.wso2.carbon.inbound.endpoint.persistence - - - io.debezium - debezium-embedded - - - io.debezium - debezium-api - - - io.debezium - debezium-connector-mysql - - - org.apache.kafka - kafka-clients - - - org.apache.kafka - connect-runtime - - - org.apache.kafka - connect-json + com.google.code.gson + gson - com.github.shyiko - mysql-binlog-connector-java + org.wso2.carbon + org.wso2.carbon.securevault diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java index 3aa30fd759..334c9b8528 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java @@ -30,12 +30,9 @@ import org.apache.synapse.commons.json.JsonUtil; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.inbound.InboundEndpoint; -import org.apache.synapse.mediators.MediatorFaultHandler; import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.transport.customlogsetter.CustomLogSetter; -import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericConstants; -import java.util.Map; import java.util.Properties; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_DATABASE_NAME; @@ -52,7 +49,6 @@ public class CDCInjectHandler { private boolean sequential; private Properties cdcProperties; private SynapseEnvironment synapseEnvironment; - private Map transportHeaders; public CDCInjectHandler(String injectingSeq, String onErrorSeq, boolean sequential, SynapseEnvironment synapseEnvironment, Properties cdcProperties) { @@ -98,24 +94,18 @@ public boolean invoke(Object object, String inboundEndpointName) { cdcEventOutput.getOutputJsonPayload().toString(), true, true); } catch (AxisFault ex) { - logger.error("Error while creating the OMElement", ex); - msgCtx.setProperty(SynapseConstants.ERROR_CODE, GenericConstants.INBOUND_BUILD_ERROR); - msgCtx.setProperty(SynapseConstants.ERROR_MESSAGE, ex.getMessage()); - SequenceMediator faultSequence = getFaultSequence(msgCtx); - faultSequence.mediate(msgCtx); - return true; + handleError("Error while creating the OMElement"); } // Inject the message to the sequence. try { msgCtx.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement)); } catch (AxisFault e) { - throw new RuntimeException("Error while creating the SOAP Envelop", e); + handleError("Error while creating the SOAP Envelop"); } if (injectingSeq == null || injectingSeq.equals("")) { - logger.error("Sequence name not specified. Sequence : " + injectingSeq); - return false; + handleError("Injecting sequence name not specified"); } SequenceMediator seq = (SequenceMediator) synapseEnvironment.getSynapseConfiguration() .getSequence(injectingSeq); @@ -126,32 +116,23 @@ public boolean invoke(Object object, String inboundEndpointName) { if (!seq.isInitialized()) { seq.init(synapseEnvironment); } - SequenceMediator faultSequence = getFaultSequence(msgCtx); - MediatorFaultHandler mediatorFaultHandler = new MediatorFaultHandler(faultSequence); - msgCtx.pushFaultHandler(mediatorFaultHandler); + + seq.setErrorHandler(onErrorSeq); if (!synapseEnvironment.injectInbound(msgCtx, seq, sequential)) { - return false; + handleError("Failed to inject the sequence"); } } else { - logger.error("Sequence: " + injectingSeq + " not found"); + handleError("Sequence:" + injectingSeq + " not found"); } } return true; } - private SequenceMediator getFaultSequence(org.apache.synapse.MessageContext synCtx) { - SequenceMediator faultSequence = null; - if (this.onErrorSeq != null) { - faultSequence = (SequenceMediator) synCtx.getSequence(this.onErrorSeq); - } - - if (faultSequence == null) { - faultSequence = (SequenceMediator) synCtx.getFaultSequence(); - } - - return faultSequence; - } + private void handleError(String msg) { + logger.error(msg); + throw new RuntimeException(msg); + } /** * Create the initial message context for the file @@ -163,7 +144,6 @@ private org.apache.synapse.MessageContext createMessageContext() { .getAxis2MessageContext(); axis2MsgCtx.setServerSide(true); axis2MsgCtx.setMessageID(UUIDGenerator.getUUID()); - axis2MsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, transportHeaders); msgCtx.setProperty(MessageContext.CLIENT_API_NON_BLOCKING, true); return msgCtx; } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc-inbound-endpoint.xml b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc-inbound-endpoint.xml index e3caf7bc88..96eb58e48e 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc-inbound-endpoint.xml +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc-inbound-endpoint.xml @@ -1,19 +1,22 @@ - 9091 + 1000 engine + initial org.apache.kafka.connect.storage.FileOffsetBackingStore - + cdc/offsetStorage/offsets1_.dat io.debezium.connector.mysql.MySqlConnector localhost 3306 root - rusiri@wso2 + wso2:vault-lookup('mysql_password') students - 85744 + 8574444 + server_1 + topic2 io.debezium.storage.file.history.FileSchemaHistory - + cdc/schemaHistory/schema_history1_.dat students.marks - c + create \ No newline at end of file diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc_process_seq.xml b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc_process_seq.xml index 16bab87ac4..982d9998d9 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc_process_seq.xml +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/test/resources/CDCInbound/cdc_process_seq.xml @@ -2,15 +2,15 @@ - + - + - + - + diff --git a/pom.xml b/pom.xml index 382605cddd..977053d43f 100644 --- a/pom.xml +++ b/pom.xml @@ -1457,116 +1457,14 @@ ${opencensus.orbit.version} - io.debezium - debezium-embedded + org.wso2.orbit.debezium + debezium ${debezium.version} - io.debezium - debezium-api - ${debezium.version} - - - io.debezium - debezium-connector-mysql - ${debezium.version} - - - io.debezium - debezium-connector-sqlserver - ${debezium.version} - - - io.debezium - debezium-connector-postgres - ${debezium.version} - - - io.debezium - debezium-connector-oracle - ${debezium.version} - - - io.debezium - debezium-core - ${debezium.version} - - - org.slf4j - slf4j-api - - - - - io.debezium - debezium-storage-kafka - ${debezium.version} - - - org.slf4j - slf4j-api - - - - - io.debezium - debezium-storage-file - ${debezium.version} - - - org.slf4j - slf4j-api - - - - - io.debezium - debezium-ddl-parser - ${debezium.version} - - - org.slf4j - slf4j-api - - - - - com.github.shyiko - mysql-binlog-connector-java - ${mysql.binlog.connector.version} - - - org.apache.kafka - kafka-clients - ${kafka.version} - - - org.slf4j - slf4j-api - - - - - org.apache.kafka - connect-runtime - ${kafka.version} - - - org.slf4j - slf4j-api - - - - - org.apache.kafka - connect-json - ${kafka.version} - - - org.slf4j - slf4j-api - - + com.google.code.gson + gson + ${version.com.google.code.gson} @@ -1664,12 +1562,8 @@ 2.3.1 - 2.1.4.Final + 2.1.4.Final.wso2v1 21.3.0.0 - 0.21.0 - - - 3.3.1 4.0.0-wso2v40 [4.0.0, 4.0.1)