Camel is an open source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data.
This library adds a thin layer on top of Java Apache Camel and provides a more idiomatic experience of using Apache Camel in the Clojure ecosystem, without changing the original functionality.
Include in your project.clj
(require [clj-camel.core :as c]
[clj-camel.util :as cu])
Simple route
(c/route-builder (c/from "direct:test")
(c/route-id "test-route")
(c/set-body (c/constant "test-body"))
(c/log "log: ${body}")
(c/to "direct:result"))
Filter (original doc)
(c/route-builder (c/from "direct:test")
(c/route-id "test-route")
(c/to "http://test-http")
(c/filter (c/predicate (comp pos? :body))
(c/log "Filtered ... ${body}")
(c/to "direct:result"))
(c/process (fn [_] {:body "after filter"})))
Choice (original doc)
(c/route-builder (c/choice (c/when (c/predicate (comp pos? :body))
(c/log "when 1")
(c/process some-processor))
(c/when (c/predicate (comp neg? :body))
(c/log "when 2")
(c/process some-processor))
(c/otherwise
(c/log "otherwise")
(c/process some-processor)))
(c/log "after choice"))
Split (original doc)
(c/route-builder (c/from "direct:test")
(c/route-id "test-route")
(c/process processor1)
(c/to "http://test-http")
(c/split (c/json-path "$.data.*") {:agg-strategy c/grouped-exchange-strategy
:streaming true
:parallel-processing true}
(c/process (fn [_] {}))
(c/filter (c/predicate (comp pos? :reserved-today :body))
(c/log "Filtered ... ${body}")
(c/to "direct:result")))
(c/process (fn [_] {:body "after"})))
Aggregate (original doc)
(c/route-builder (c/from "direct:test")
(c/set-body (c/constant "test"))
(c/aggregate (c/constant 1) c/grouped-body-strategy
{:completion-size 1000
:completion-timeout 1000
:completion-predicate (c/predicate (fn [_] true))}
(c/to "direct:result"))
(c/log "after aggregating"))
Caching (original doc)
(c/route-builder (c/from "direct:test")
(c/route-id "test-route")
(c/set-body (c/constant "key"))
(c/log "key requested: ${body}")
(c/memoize (cu/create-jcache-expiration-policy "cache-name" 60)
(c/set-body (c/constant "value"))
(c/log "Populate cache with ${body}"))
(c/log "key value result: ${body}")
(c/to "direct:result"))
Throttling (original doc)
(c/route-builder (c/from "direct:test")
(c/set-body (c/constant "test"))
(c/throttle 20 {:async-delayed false
:reject-execution false
:time-period-millis 10000})
(c/log "after throttling")
(c/to "direct:result"))
Try/Catch/Finally (original doc)
(c/route-builder (c/from "direct:test")
(c/route-id "test-route")
(c/do-try (c/to "http://test-http")
(c/do-catch Exception
(c/log "handle exception")
(c/log "handle exception2"))
(c/do-finally
(c/log "finally")
(c/log "finally2")))
(c/log "after do-try"))
MDC from headers UOW
The following will populate MDC context with name-of-mdc-field
with value of incoming exchange header
field name-of-header-field
(if incoming exchange has a respective header)
Example of log:
{
"message": "example message",
"mdc": {
"name-of-mdc-field": "test-value",
"camel.breadcrumbId": "xxx",
"camel.routeId": "yyy"
}
}
(c/set-customer-uow-with-mdc-from-headers context {"name-of-header-filed" "name-of-mdc-field"})
GCP Pub-sub attributes propagation
Specified pub-sub message attributes will be added to exchange header if exist
(c/set-pubsub-attributes-propagation context {"pubsub-attribute-name" "name-of-header-field"})
When configuring your logging appenders, contrary to the Camel documentation, you will need to add clj-camel.core
as your logging namespace.
- authentication mechanism in google-pubsub-component was changed.
serviceAccountKey
become mandatory parameter in google pubsub endpoint string: https://camel.apache.org/components/3.11.x/google-pubsub-component.html#_authentication_configuration - changed interface of
Exchange
class:getAllProperties
should be used instead ofgetProperties
method - apm-opentracing dependencies may need to be updated
- fasterxml/jackson dependencies may need to be updated