diff --git a/src/main/java/org/apache/flume/sink/rabbitmq/RabbitMQSink.java b/src/main/java/org/apache/flume/sink/rabbitmq/RabbitMQSink.java index d05f5ad..3d330c7 100644 --- a/src/main/java/org/apache/flume/sink/rabbitmq/RabbitMQSink.java +++ b/src/main/java/org/apache/flume/sink/rabbitmq/RabbitMQSink.java @@ -24,6 +24,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.impl.AMQBasicProperties; +import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; @@ -105,7 +106,9 @@ public Status process() throws EventDeliveryException { } try { - _Channel.basicPublish(_ExchangeName, _QueueName, null, e.getBody()); + String realExchangeName = BucketPath.escapeString(_ExchangeName, e.getHeaders()); + String realQueueName = BucketPath.escapeString(_QueueName, e.getHeaders()); + _Channel.basicPublish(realExchangeName, realQueueName, null, e.getBody()); tx.commit(); _CounterGroup.incrementAndGet(RabbitMQConstants.COUNTER_PUBLISH); } catch(Exception ex){