Skip to content

Commit

Permalink
Merge pull request voxpupuli#97 from SegFaultAX/kafka-mirrormaker-abo…
Browse files Browse the repository at this point in the history
…rtonfailure

Add support for MirrorMaker abort.on.send.failure
  • Loading branch information
fessyfoo authored Aug 26, 2016
2 parents 4942117 + 18b4ad4 commit 5ea991f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 9 deletions.
5 changes: 5 additions & 0 deletions manifests/mirror.pp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
# [*num_producers*]
# Number of producer threads to start.
#
# [*abort_on_send_failure*]
# Abort immediately if MirrorMaker fails to send to receiving cluster
#
# [*install_java*]
# Install java if it's not already installed.
#
Expand Down Expand Up @@ -68,6 +71,7 @@
$producer_config_defaults = $kafka::params::producer_config_defaults,
$num_streams = $kafka::params::num_streams,
$num_producers = $kafka::params::num_producers,
$abort_on_send_failure = $kafka::params::abort_on_send_failure,
$install_java = $kafka::params::install_java,
$whitelist = $kafka::params::whitelist,
$blacklist = $kafka::params::blacklist,
Expand All @@ -82,6 +86,7 @@
validate_re($mirror_url, '^(https?:\/\/)?([\da-z\.-]+)\.([a-z\.]{2,6})([\/\w \.-]*)*\/?$', "${mirror_url} is not a valid url")
validate_integer($num_streams)
validate_integer($num_producers)
validate_bool($abort_on_send_failure)
validate_bool($install_java)
validate_re($max_heap, '\d+[g|G|m|M|k|K]', "${max_heap} is not a valid heap size")
validate_absolute_path($package_dir)
Expand Down
21 changes: 14 additions & 7 deletions manifests/mirror/service.pp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
# It manages the kafka-mirror service
#
class kafka::mirror::service(
$consumer_config = $kafka::params::consumer_config,
$producer_config = $kafka::params::producer_config,
$num_streams = $kafka::mirror::num_streams,
$num_producers = $kafka::mirror::num_producers,
$whitelist = $kafka::mirror::whitelist,
$blacklist = $kafka::mirror::blacklist,
$max_heap = $kafka::mirror::max_heap
$consumer_config = $kafka::params::consumer_config,
$producer_config = $kafka::params::producer_config,
$num_streams = $kafka::mirror::num_streams,
$num_producers = $kafka::mirror::num_producers,
$abort_on_send_failure = $kafka::mirror::abort_on_send_failure,
$whitelist = $kafka::mirror::whitelist,
$blacklist = $kafka::mirror::blacklist,
$max_heap = $kafka::mirror::max_heap
) inherits ::kafka::params {

if $caller_module_name != $module_name {
Expand All @@ -23,6 +24,12 @@

$service_name = 'kafka-mirror'

if versioncmp($kafka::mirror::version, '0.9.0') >= 0 {
$abort_on_send_failure_opt = "--abort.on.send.failure=${abort_on_send_failure}"
} else {
$abort_on_send_failure_opt = ''
}

if $::service_provider == 'systemd' {
include ::systemd

Expand Down
1 change: 1 addition & 0 deletions manifests/params.pp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@
$producer_config = '/opt/kafka/config/producer.properties'
$num_streams = 2
$num_producers = 1
$abort_on_send_failure = true
$mirror_max_heap = '256M'
$whitelist = '.*'
$blacklist = ''
Expand Down
2 changes: 1 addition & 1 deletion templates/init.erb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export KAFKA_LOG4J_OPTS="<%= @consumer_log4j_opts %>"
PGREP_PATTERN=kafka.tools.MirrorMaker

DAEMON="/opt/kafka/bin/kafka-run-class.sh"
DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @consumer_config -%> --num.streams <%= @num_streams -%> --producer.config <%= @producer_config -%><%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%> --whitelist='<%= @whitelist -%>'<%- end %><%- if !@blacklist.eql?('') -%> --blacklist='<%= @blacklist -%>'<%- end -%>"
DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @consumer_config -%> --num.streams <%= @num_streams -%> --producer.config <%= @producer_config -%><%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%> --whitelist='<%= @whitelist -%>'<%- end %><%- if !@blacklist.eql?('') -%> --blacklist='<%= @blacklist -%>'<%- end -%> <%= @abort_on_send_failure_opt %>"

export KAFKA_HEAP_OPTS="-Xmx<%= @max_heap -%> -Xms<%= @max_heap -%>"
export KAFKA_JMX_OPTS="<%= @mirror_jmx_opts %>"
Expand Down
2 changes: 1 addition & 1 deletion templates/unit.erb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ExecStart=/opt/kafka/bin/kafka-console-consumer.sh <% @consumer_service_config.s
Environment='KAFKA_LOG4J_OPTS=<%= @mirror_log4j_opts %>'
Environment='KAFKA_JMX_OPTS=<%= @mirror_jmx_opts %>'
Environment='KAFKA_HEAP_OPTS=-Xmx<%= @max_heap -%>'
ExecStart=/opt/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config <%= @consumer_config -%> --num.streams <%= @num_streams -%> --producer.config <%= @producer_config -%><%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%> --whitelist='<%= @whitelist -%>'<%- end %><%- if !@blacklist.eql?('') -%> --blacklist='<%= @blacklist -%>'<%- end -%>
ExecStart=/opt/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config <%= @consumer_config -%> --num.streams <%= @num_streams -%> --producer.config <%= @producer_config -%><%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%> --whitelist='<%= @whitelist -%>'<%- end %><%- if !@blacklist.eql?('') -%> --blacklist='<%= @blacklist -%>'<%- end -%> <%= @abort_on_send_failure_opt %>
<%- when 'kafka-producer' -%>
Environment='KAFKA_LOG4J_OPTS=<%= @producer_log4j_opts %>'
Environment='KAFKA_JMX_OPTS=<%= @producer_jmx_opts %>'
Expand Down

0 comments on commit 5ea991f

Please sign in to comment.