diff --git a/bin/hbase-connectors b/bin/hbase-connectors new file mode 100755 index 00000000..cb31ec5b --- /dev/null +++ b/bin/hbase-connectors @@ -0,0 +1,292 @@ +#! /usr/bin/env bash +# +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ +# +# The hbase command script. Based on the hadoop command script putting +# in hbase classes, libs and configurations ahead of hadoop's. +# +# TODO: Narrow the amount of duplicated code. +# +# Environment Variables: +# +# JAVA_HOME The java implementation to use. Overrides JAVA_HOME. +# HBASE_CONNECTOR_CLASSPATH_PREFIX Extra Java CLASSPATH entries that should be +# prefixed to the system classpath. +# +# HBASE_CONNECTOR_HEAPSIZE The maximum amount of heap to use. +# Default is unset and uses the JVMs default setting +# (usually 1/4th of the available memory). +# +# HBASE_CONNECTOR_LIBRARY_PATH HBase additions to JAVA_LIBRARY_PATH for adding +# native libraries. +# +# HBASE_CONNECTOR_OPTS Extra Java runtime options. +# +# HBASE_CONNECTOR_CONF_DIR Alternate conf dir. Default is ${HBASE_CONNECTOR_HOME}/conf. +# +# HBASE_CONNECTOR_ROOT_LOGGER The root appender. Default is INFO,console +# + + +bin=`dirname "$0"` +bin=`cd "$bin">/dev/null; pwd` + +# This will set HBASE_CONNECTOR_HOME etc. +. "$bin"/hbase-connectors-config.sh + + +cygwin=false +case "`uname`" in +CYGWIN*) cygwin=true;; +esac + +# Detect if we are in hbase sources dir +in_dev_env=false +if [ -d "${HBASE_CONNECTOR_HOME}/target" ]; then + in_dev_env=true +fi + +# Detect if we are in the omnibus tarball +in_omnibus_tarball="false" +if [ -f "${HBASE_CONNECTOR_HOME}/bin/hbase-connectors-daemon.sh" ]; then + in_omnibus_tarball="true" +fi + +# if no args specified, show usage +if [ $# = 0 ]; then + echo "Usage: hbase-connectors [] []" + echo "" + echo "Commands:" + + if [ "${in_omnibus_tarball}" = "true" ]; then + echo " kafkaproxy Run the HBase Kafka Proxy server" + echo " kafkaproxytest Run the HBase Kafka Proxy sample kafka listener" + fi + + echo " CLASSNAME Run the class named CLASSNAME" + exit 1 +fi + +# get arguments +COMMAND=$1 +shift + +JAVA=$JAVA_HOME/bin/java + +# override default settings for this command, if applicable +if [ -f "$HBASE_CONNECTOR_HOME/conf/hbase-connector-env-$COMMAND.sh" ]; then + . "$HBASE_CONNECTOR_HOME/conf/hbase-connector-env-$COMMAND.sh" +fi + +add_size_suffix() { + # add an 'm' suffix if the argument is missing one, otherwise use whats there + local val="$1" + local lastchar=${val: -1} + if [[ "mMgG" == *$lastchar* ]]; then + echo $val + else + echo ${val}m + fi +} + + + +#if [[ -n "$HBASE_CONNECTOR_HEAPSIZE" ]]; then +# JAVA_HEAP_MAX="-Xmx$(add_size_suffix $HBASE_CONNECTOR_HEAPSIZE)" +#fi +# +#if [[ -n "$HBASE_CONNECTOR_OFFHEAPSIZE" ]]; then +# JAVA_OFFHEAP_MAX="-XX:MaxDirectMemorySize=$(add_size_suffix $HBASE_OFFHEAPSIZE)" +#fi + + + + + +# so that filenames w/ spaces are handled correctly in loops below +ORIG_IFS=$IFS +IFS= + +# CLASSPATH initially contains $HBASE_CONNECTOR_CONF_DIR +PASS_CLASSPATH="${HBASE_CONNECTOR_CONF_DIR}" + +#CLASSPATH=${PASS_CLASSPATH}:$JAVA_HOME/lib/tools.jar + + +HBASE_IN_PATH=$(which hbase 2>/dev/null) + +# default log directory & file +if [ "$HBASE_CONNECTOR_LOG_DIR" = "" ]; then + HBASE_CONNECTOR_LOG_DIR="$HBASE_CONNECTOR_HOME/logs" +fi +if [ "$HBASE_CONNECTOR_LOGFILE" = "" ]; then + HBASE_CONNECTOR_LOGFILE='hbase-connector.log' +fi + +function append_path() { + if [ -z "$1" ]; then + echo "$2" + else + echo "$1:$2" + fi +} + +JAVA_PLATFORM="" + +# if HBASE_CONNECTOR_LIBRARY_PATH is defined lets use it as first or second option +if [ "$HBASE_CONNECTOR_LIBRARY_PATH" != "" ]; then + JAVA_LIBRARY_PATH=$(append_path "$JAVA_LIBRARY_PATH" "$HBASE_CONNECTOR_LIBRARY_PATH") +fi + + +# Add user-specified CLASSPATH last +if [ "$HBASE_CONNECTOR_CLASSPATH" != "" ]; then + PASS_CLASSPATH=${PASS_CLASSPATH}:${HBASE_CONNECTOR_CLASSPATH} +fi + +# Add user-specified CLASSPATH prefix first +if [ "$HBASE_CONNECTOR_CLASSPATH_PREFIX" != "" ]; then + PASS_CLASSPATH=${HBASE_CONNECTOR_CLASSPATH_PREFIX}:${PASS_CLASSPATH} +fi + +# cygwin path translation +if $cygwin; then + PASS_CLASSPATH=`cygpath -p -w "$PASS_CLASSPATH"` + HBASE_CONNECTOR_HOME=`cygpath -d "$HBASE_CONNECTOR_HOME"` + HBASE_CONNECTOR_LOG_DIR=`cygpath -d "$HBASE_CONNECTOR_LOG_DIR"` +fi + +# cygwin path translation +if $cygwin; then + JAVA_LIBRARY_PATH=`cygpath -p "$JAVA_LIBRARY_PATH"` +fi + +# restore ordinary behaviour +unset IFS + +#Set the right GC options based on the what we are running +declare -a server_cmds=("kafkaproxy") +for cmd in ${server_cmds[@]}; do + if [[ $cmd == $COMMAND ]]; then + server=true + break + fi +done + +if [[ $server ]]; then + HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS $SERVER_GC_OPTS" +else + HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS $CLIENT_GC_OPTS" +fi + +if [ "$AUTH_AS_SERVER" == "true" ]; then + if [ -n "$HBASE_CONNECTOR_SERVER_JAAS_OPTS" ]; then + HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS $HBASE_CONNECTOR_SERVER_JAAS_OPTS" + fi +fi + + +add_maven_deps_to_classpath() { + f="${HBASE_CONNECTOR_HOME}/target/$1" + + if [ ! -f "${f}" ]; then + echo "As this is a development environment, we need ${f} to be generated from maven (command: mvn install -DskipTests)" + exit 1 + fi + PASS_CLASSPATH=${PASS_CLASSPATH}:$(cat "${f}") +} + + +add_connector_jars_to_classpath() { + connector_dir="${HBASE_CONNECTOR_HOME}/$1" + + if [ -d "${connector_dir}" ]; then + for f in $connector_dir/*.jar; do + PASS_CLASSPATH="${PASS_CLASSPATH}:${f}" + done + fi +} + + +#Add the development env class path stuff +if $in_dev_env; then + add_maven_deps_to_classpath "cached_classpath.txt" +fi + +# figure out which class to run +if [ "$COMMAND" = "kafkaproxy" ] ; then + CLASS='org.apache.hadoop.hbase.kafka.KafkaProxy' + if [ "$1" != "stop" ] ; then + HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS $HBASE_KAFKA_OPTS" + fi + + # add the kafka proxy jars + add_connector_jars_to_classpath "hbase-kafka-proxy" + +elif [ "$COMMAND" = "kafkaproxytest" ] ; then + CLASS='org.apache.hadoop.hbase.kafka.DumpToStringListener' + if [ "$1" != "stop" ] ; then + HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS $HBASE_KAFKA_TEST_OPTS" + fi + + # add the kafka proxy jars + add_connector_jars_to_classpath "hbase-kafka-proxy" + +else + CLASS=$COMMAND +fi + +HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS -Dhbase.connector.log.dir=$HBASE_CONNECTOR_LOG_DIR" +HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS -Dhbase.connector.log.file=$HBASE_CONNECTOR_LOGFILE" +HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS -Dhbase.connector.home.dir=$HBASE_CONNECTOR_HOME" +HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS -Dhbase.connector.id.str=$HBASE_CONNECTOR_IDENT_STRING" +HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS -Dhbase.connector.root.logger=${HBASE_CONNECTOR_ROOT_LOGGER:-INFO,console}" + + +if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then + HBASE_CONNECTOR_OPTS="$HBASE_CONNECTOR_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH" + export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$JAVA_LIBRARY_PATH" +fi + +#HEAP_SETTINGS="$JAVA_HEAP_MAX $JAVA_OFFHEAP_MAX" + + + + + +# by now if we're running a command it means we need logging +for f in ${HBASE_CONNECTOR_HOME}/lib/client-facing-thirdparty/slf4j-*.jar; do + if [ -f "${f}" ]; then + PASS_CLASSPATH="${PASS_CLASSPATH}:${f}" + break + fi +done + + + +CLASSPATH=$PASS_CLASSPATH:`$HBASE_IN_PATH classpath` + +export CLASSPATH + + +if [ "${HBASE_CONNECTOR_NOEXEC}" != "" ]; then + "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $HEAP_SETTINGS $HBASE_CONNECTOR_OPTS $CLASS "$@" +else + exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $HEAP_SETTINGS $HBASE_CONNECTOR_OPTS $CLASS "$@" +fi \ No newline at end of file diff --git a/bin/hbase-connectors-config.sh b/bin/hbase-connectors-config.sh new file mode 100755 index 00000000..fdaa7e65 --- /dev/null +++ b/bin/hbase-connectors-config.sh @@ -0,0 +1,127 @@ +# +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +# included in all the hbase connector scripts with source command +# should not be executable directly +# also should not be passed any arguments, since we need original $* +# Modelled after $HADOOP_HOME/bin/hadoop-env.sh. + +# resolve links - "${BASH_SOURCE-$0}" may be a softlink + +this="${BASH_SOURCE-$0}" +while [ -h "$this" ]; do + ls=`ls -ld "$this"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '.*/.*' > /dev/null; then + this="$link" + else + this=`dirname "$this"`/"$link" + fi +done + +# convert relative path to absolute path +bin=`dirname "$this"` +script=`basename "$this"` +bin=`cd "$bin">/dev/null; pwd` +this="$bin/$script" + +# the root of the hbase connector installation +if [ -z "$HBASE_CONNECTOR_HOME" ]; then + export HBASE_CONNECTOR_HOME=`dirname "$this"`/.. +fi + +#check to see if the conf dir or hbase home are given as an optional arguments +while [ $# -gt 1 ] +do + if [ "--config" = "$1" ] + then + shift + confdir=$1 + shift + HBASE_CONF_DIR=$confdir + elif [ "--autostart-window-size" = "$1" ] + then + shift + AUTOSTART_WINDOW_SIZE=$(( $1 + 0 )) + if [ $AUTOSTART_WINDOW_SIZE -lt 0 ]; then + echo "Invalid value for --autostart-window-size, should be a positive integer" + exit 1 + fi + shift + elif [ "--autostart-window-retry-limit" = "$1" ] + then + shift + AUTOSTART_WINDOW_RETRY_LIMIT=$(( $1 + 0 )) + if [ $AUTOSTART_WINDOW_RETRY_LIMIT -lt 0 ]; then + echo "Invalid value for --autostart-window-retry-limit, should be a positive integer" + exit 1 + fi + shift + elif [ "--internal-classpath" = "$1" ] + then + shift + # shellcheck disable=SC2034 + INTERNAL_CLASSPATH="true" + elif [ "--debug" = "$1" ] + then + shift + # shellcheck disable=SC2034 + DEBUG="true" + else + # Presume we are at end of options and break + break + fi +done + + + +# Allow alternate hbase connector conf dir location. +HBASE_CONNECTOR_CONF_DIR="${HBASE_CONNECTOR_CONF_DIR:-$HBASE_CONNECTOR_HOME/conf}" + + +if [ -n "$HBASE_CONNECTOR_JMX_BASE" ] && [ -z "$HBASE_CONNECTOR_JMX_OPTS" ]; then + HBASE_CONNECTOR_JMX_OPTS="$HBASE_CONNECTOR_JMX_BASE" +fi + + +# Source the hbase-connector-env.sh only if it has not already been done. HBASE_CONNECTOR_ENV_INIT keeps track of it. +if [ -z "$HBASE_CONNECTOR_ENV_INIT" ] && [ -f "${HBASE_CONNECTOR_CONF_DIR}/hbase-connector-env.sh" ]; then + . "${HBASE_CONNECTOR_CONF_DIR}/hbase-connector-env.sh" + export HBASE_CONNECTOR_ENV_INIT="true" +fi + +# Newer versions of glibc use an arena memory allocator that causes virtual +# memory usage to explode. Tune the variable down to prevent vmem explosion. +export MALLOC_ARENA_MAX=${MALLOC_ARENA_MAX:-4} + + +# Now having JAVA_HOME defined is required +if [ -z "$JAVA_HOME" ]; then + cat 1>&2 < http://www.oracle.com/technetwork/java/javase/downloads | +| | +| HBase Connectors requires Java 1.8 or later. | ++======================================================================+ +EOF + exit 1 +fi diff --git a/bin/hbase-connectors-daemon.sh b/bin/hbase-connectors-daemon.sh new file mode 100755 index 00000000..fea5633f --- /dev/null +++ b/bin/hbase-connectors-daemon.sh @@ -0,0 +1,370 @@ +#!/usr/bin/env bash +# +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ +# +# Runs a Hadoop hbase connector command as a daemon. +# +# Environment Variables +# +# HBASE_CONNECTOR_CONF_DIR Alternate hbase conf dir. Default is ${HBASE_HOME}/conf. +# HBASE_CONNECTOR_LOG_DIR Where log files are stored. PWD by default. +# HBASE_CONNECTOR_PID_DIR The pid files are stored. /tmp by default. +# HBASE_CONNECTOR_IDENT_STRING A string representing this instance of hadoop. $USER by default +# HBASE_CONNECTOR_NICENESS The scheduling priority for daemons. Defaults to 0. +# HBASE_CONNECTOR_STOP_TIMEOUT Time, in seconds, after which we kill -9 the server if it has not stopped. +# Default 1200 seconds. +# +# Modelled after $HBASE_HOME/bin/hbase-daemon.sh + +usage="Usage: hbase-connectors-daemon.sh [--config ]\ + [--autostart-window-size ]\ + [--autostart-window-retry-limit ]\ + (start|stop|restart|autostart|autorestart|foreground_start) \ + " + +# if no args specified, show usage +if [ $# -le 1 ]; then + echo $usage + exit 1 +fi + +# default autostart args value indicating infinite window size and no retry limit +AUTOSTART_WINDOW_SIZE=0 +AUTOSTART_WINDOW_RETRY_LIMIT=0 + +bin=`dirname "${BASH_SOURCE-$0}"` +bin=`cd "$bin">/dev/null; pwd` + +. "$bin"/hbase-connectors-config.sh + + + +# get arguments +startStop=$1 +shift + +command=$1 +shift + +rotate_log () +{ + log=$1; + num=5; + if [ -n "$2" ]; then + num=$2 + fi + if [ -f "$log" ]; then # rotate logs + while [ $num -gt 1 ]; do + prev=`expr $num - 1` + [ -f "$log.$prev" ] && mv -f "$log.$prev" "$log.$num" + num=$prev + done + mv -f "$log" "$log.$num"; + fi +} + +cleanAfterRun() { + if [ -f ${HBASE_CONNECTOR_PID} ]; then + # If the process is still running time to tear it down. + kill -9 `cat ${HBASE_CONNECTOR_PID}` > /dev/null 2>&1 + rm -f ${HBASE_CONNECTOR_PID} > /dev/null 2>&1 + fi +} + +check_before_start(){ + #ckeck if the process is not running + mkdir -p "$HBASE_CONNECTOR_PID_DIR" + if [ -f "$HBASE_CONNECTOR_PID" ]; then + if kill -0 `cat $HBASE_CONNECTOR_PID` > /dev/null 2>&1; then + echo $command running as process `cat $HBASE_CONNECTOR_PID`. Stop it first. + exit 1 + fi + fi +} + +wait_until_done () +{ + p=$1 + cnt=${HBASE_CONNECTOR_SLAVE_TIMEOUT:-300} + origcnt=$cnt + while kill -0 $p > /dev/null 2>&1; do + if [ $cnt -gt 1 ]; then + cnt=`expr $cnt - 1` + sleep 1 + else + echo "Process did not complete after $origcnt seconds, killing." + kill -9 $p + exit 1 + fi + done + return 0 +} + +waitForProcessEnd() { + pidKilled=$1 + commandName=$2 + processedAt=`date +%s` + while kill -0 $pidKilled > /dev/null 2>&1; + do + echo -n "." + sleep 1; + # if process persists more than $HBASE_STOP_TIMEOUT (default 1200 sec) no mercy + if [ $(( `date +%s` - $processedAt )) -gt ${HBASE_STOP_TIMEOUT:-1200} ]; then + break; + fi + done + # process still there : kill -9 + if kill -0 $pidKilled > /dev/null 2>&1; then + echo -n force stopping $commandName with kill -9 $pidKilled + $JAVA_HOME/bin/jstack -l $pidKilled > "$logout" 2>&1 + kill -9 $pidKilled > /dev/null 2>&1 + fi + # Add a CR after we're done w/ dots. + echo +} + + +# get log directory +if [ "$HBASE_CONNECTOR_LOG_DIR" = "" ]; then + export HBASE_CONNECTOR_LOG_DIR="$HBASE_CONNECTOR_HOME/logs" +fi +mkdir -p "$HBASE_CONNECTOR_LOG_DIR" + +if [ "$HBASE_CONNECTOR_PID_DIR" = "" ]; then + HBASE_CONNECTOR_PID_DIR=/tmp +fi + +if [ "$HBASE_CONNECTOR_IDENT_STRING" = "" ]; then + export HBASE_CONNECTOR_IDENT_STRING="$USER" +fi + +# Some variables +# Work out java location so can print version into log. +if [ "$JAVA_HOME" != "" ]; then + #echo "run java in $JAVA_HOME" + JAVA_HOME=$JAVA_HOME +fi +if [ "$JAVA_HOME" = "" ]; then + echo "Error: JAVA_HOME is not set." + exit 1 +fi + +JAVA=$JAVA_HOME/bin/java +export HBASE_CONNECTOR_LOG_PREFIX=hbase-connector-$HBASE_CONNECTOR_IDENT_STRING-$command-$HOSTNAME +export HBASE_CONNECTOR_LOGFILE=$HBASE_CONNECTOR_LOG_PREFIX.log + +if [ -z "${HBASE_CONNECTOR_ROOT_LOGGER}" ]; then +export HBASE_CONNECTOR_ROOT_LOGGER=${HBASE_CONNECTOR_ROOT_LOGGER:-"INFO,RFA"} +fi + +if [ -z "${HBASE_CONNECTOR_SECURITY_LOGGER}" ]; then +export HBASE_CONNECTOR_SECURITY_LOGGER=${HBASE_CONNECTOR_SECURITY_LOGGER:-"INFO,RFAS"} +fi + +HBASE_CONNECTOR_LOGOUT=${HBASE_CONNECTOR_LOGOUT:-"$HBASE_CONNECTOR_LOG_DIR/$HBASE_CONNECTOR_LOG_PREFIX.out"} +HBASE_CONNECTOR_LOGGC=${HBASE_CONNECTOR_LOGGC:-"$HBASE_CONNECTOR_LOG_DIR/$HBASE_CONNECTOR_LOG_PREFIX.gc"} +HBASE_CONNECTOR_LOGLOG=${HBASE_CONNECTOR_LOGLOG:-"${HBASE_CONNECTOR_LOG_DIR}/${HBASE_CONNECTOR_LOGFILE}"} +HBASE_CONNECTOR_CONNECTOR_PID=$HBASE_CONNECTOR_CONNECTOR_PID_DIR/hbase-connector-$HBASE_CONNECTOR_IDENT_STRING-$command.pid + +export HBASE_CONNECTOR_AUTOSTART_FILE=$HBASE_CONNECTOR_CONNECTOR_PID_DIR/hbase-connector-$HBASE_CONNECTOR_IDENT_STRING-$command.autostart + +if [ -n "$SERVER_GC_OPTS" ]; then + export SERVER_GC_OPTS=${SERVER_GC_OPTS/"-Xloggc:"/"-Xloggc:${HBASE_CONNECTOR_LOGGC}"} +fi +if [ -n "$CLIENT_GC_OPTS" ]; then + export CLIENT_GC_OPTS=${CLIENT_GC_OPTS/"-Xloggc:"/"-Xloggc:${HBASE_CONNECTOR_LOGGC}"} +fi + +# Set default scheduling priority +if [ "$HBASE_CONNECTOR_NICENESS" = "" ]; then + export HBASE_CONNECTOR_NICENESS=0 +fi + +#thiscmd="$bin/${BASH_SOURCE-$0}" +thiscmd="$bin/hbase-connectors-daemon.sh" +args=$@ + + +if [ -f $HBASE_CONNECTOR_PID_DIR/"hbase-connectors-"$command".pid" ]; +then + HBASE_CONNECTOR_PID=$HBASE_CONNECTOR_PID_DIR/"hbase-connectors-"$command".pid" +else + HBASE_CONNECTOR_PID="" +fi + +case $startStop in + +(start) + check_before_start + rotate_log $HBASE_CONNECTOR_LOGOUT + rotate_log $HBASE_CONNECTOR_LOGGC + echo running $command, logging to $HBASE_CONNECTOR_LOGOUT + $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" \ + foreground_start $command $args < /dev/null > ${HBASE_CONNECTOR_LOGOUT} 2>&1 & + disown -h -r + sleep 1; head "${HBASE_CONNECTOR_LOGOUT}" + ;; + +(autostart) + check_before_start + rotate_log $HBASE_CONNECTOR_LOGOUT + rotate_log $HBASE_CONNECTOR_LOGGC + echo running $command, logging to $HBASE_CONNECTOR_LOGOUT + nohup $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" --autostart-window-size ${AUTOSTART_WINDOW_SIZE} --autostart-window-retry-limit ${AUTOSTART_WINDOW_RETRY_LIMIT} \ + internal_autostart $command $args < /dev/null > ${HBASE_CONNECTOR_LOGOUT} 2>&1 & + ;; + +(autorestart) + echo running $command, logging to $HBASE_CONNECTOR_LOGOUT + # stop the command + $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" stop $command $args & + wait_until_done $! + # wait a user-specified sleep period + sp=${HBASE_CONNECTOR_RESTART_SLEEP:-3} + if [ $sp -gt 0 ]; then + sleep $sp + fi + + check_before_start + rotate_log $HBASE_CONNECTOR_LOGOUT + nohup $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" --autostart-window-size ${AUTOSTART_WINDOW_SIZE} --autostart-window-retry-limit ${AUTOSTART_WINDOW_RETRY_LIMIT} \ + internal_autostart $command $args < /dev/null > ${HBASE_CONNECTOR_LOGOUT} 2>&1 & + ;; + +(foreground_start) + trap cleanAfterRun SIGHUP SIGINT SIGTERM EXIT + if [ "$HBASE_CONNECTOR_NO_REDIRECT_LOG" != "" ]; then + # NO REDIRECT + echo "`date` Starting $command on `hostname`" + echo "`ulimit -a`" + # in case the parent shell gets the kill make sure to trap signals. + # Only one will get called. Either the trap or the flow will go through. + nice -n $HBASE_CONNECTOR_NICENESS "$HBASE_CONNECTOR_HOME"/bin/hbase-connectors \ + --config "${HBASE_CONNECTOR_CONF_DIR}" \ + $command "$@" start & + else + echo "`date` Starting $command on `hostname`" >> ${HBASE_CONNECTOR_LOGLOG} + echo "`ulimit -a`" >> "$HBASE_CONNECTOR_LOGLOG" 2>&1 + # in case the parent shell gets the kill make sure/ to trap signals. + # Only one will get called. Either the trap or the flow will go through. + nice -n $HBASE_CONNECTOR_NICENESS "$HBASE_CONNECTOR_HOME"/bin/hbase-connectors \ + --config "${HBASE_CONNECTOR_CONF_DIR}" \ + $command "$@" start >> ${HBASE_CONNECTOR_LOGOUT} 2>&1 & + fi + # Add to the command log file vital stats on our environment. + hbase_connector_pid=$! + HBASE_CONNECTOR_PID=$HBASE_CONNECTOR_PID_DIR/"hbase-connectors-"$command".pid" + echo $hbase_connector_pid > ${HBASE_CONNECTOR_PID} + wait $hbase_connector_pid + ;; + +(internal_autostart) + ONE_HOUR_IN_SECS=3600 + autostartWindowStartDate=`date +%s` + autostartCount=0 + touch "$HBASE_CONNECTOR_AUTOSTART_FILE" + + # keep starting the command until asked to stop. Reloop on software crash + while true + do + rotate_log $HBASE_CONNECTOR_LOGGC + if [ -f $HBASE_CONNECTOR_PID ] && kill -0 "$(cat "$HBASE_CONNECTOR_PID")" > /dev/null 2>&1 ; then + wait "$(cat "$HBASE_CONNECTOR_PID")" + else + #if the file does not exist it means that it was not stopped properly by the stop command + if [ ! -f "$HBASE_CONNECTOR_AUTOSTART_FILE" ]; then + echo "`date` HBase might be stopped removing the autostart file. Exiting Autostart process" >> ${HBASE_CONNECTOR_LOGOUT} + exit 1 + fi + + echo "`date` Autostarting hbase $command service. Attempt no: $(( $autostartCount + 1))" >> ${HBASE_CONNECTOR_LOGLOG} + touch "$HBASE_CONNECTOR_AUTOSTART_FILE" + $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" foreground_start $command $args + autostartCount=$(( $autostartCount + 1 )) + fi + + curDate=`date +%s` + autostartWindowReset=false + + # reset the auto start window size if it exceeds + if [ $AUTOSTART_WINDOW_SIZE -gt 0 ] && [ $(( $curDate - $autostartWindowStartDate )) -gt $(( $AUTOSTART_WINDOW_SIZE * $ONE_HOUR_IN_SECS )) ]; then + echo "Resetting Autorestart window size: $autostartWindowStartDate" >> ${HBASE_CONNECTOR_LOGOUT} + autostartWindowStartDate=$curDate + autostartWindowReset=true + autostartCount=0 + fi + + # kill autostart if the retry limit is exceeded within the given window size (window size other then 0) + if ! $autostartWindowReset && [ $AUTOSTART_WINDOW_RETRY_LIMIT -gt 0 ] && [ $autostartCount -gt $AUTOSTART_WINDOW_RETRY_LIMIT ]; then + echo "`date` Autostart window retry limit: $AUTOSTART_WINDOW_RETRY_LIMIT exceeded for given window size: $AUTOSTART_WINDOW_SIZE hours.. Exiting..." >> ${HBASE_CONNECTOR_LOGLOG} + rm -f "$HBASE_CONNECTOR_AUTOSTART_FILE" + exit 1 + fi + + # wait for shutdown hook to complete + sleep 20 + done + ;; + +(stop) + echo running $command, logging to $HBASE_CONNECTOR_LOGOUT + rm -f "$HBASE_CONNECTOR_AUTOSTART_FILE" + if [ "$HBASE_CONNECTOR_PID" != "" ]; then + if [ -f $HBASE_CONNECTOR_PID ]; then + pidToKill=`cat $HBASE_CONNECTOR_PID` + # kill -0 == see if the PID exists + if kill -0 $pidToKill > /dev/null 2>&1; then + echo -n stopping $command + echo "`date` Terminating $command" >> $HBASE_CONNECTOR_LOGLOG + kill $pidToKill > /dev/null 2>&1 + waitForProcessEnd $pidToKill $command + else + retval=$? + echo no $command to stop because kill -0 of pid $pidToKill failed with status $retval + fi + else + echo no $command to stop because no pid file $HBASE_CONNECTOR_PID + fi + else + echo no $command to stop because no pid file $HBASE_CONNECTOR_PID + fi + + rm -f $HBASE_CONNECTOR_PID + ;; + +(restart) + echo running $command, logging to $HBASE_CONNECTOR_LOGOUT + # stop the command + $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" stop $command $args & + wait_until_done $! + # wait a user-specified sleep period + sp=${HBASE_CONNECTOR_RESTART_SLEEP:-3} + if [ $sp -gt 0 ]; then + sleep $sp + fi + # start the command + $thiscmd --config "${HBASE_CONNECTOR_CONF_DIR}" start $command $args & + wait_until_done $! + ;; + +(*) + echo $usage + echo "ze parms " $0 $1 $2 $3 $4 + exit 1 + ;; +esac diff --git a/conf/log4j.properties b/conf/log4j.properties new file mode 100644 index 00000000..efd80799 --- /dev/null +++ b/conf/log4j.properties @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hbase.connector.root.logger=INFO,console +hbase.connector.log.dir=. +hbase.connector.log.file=hbase-connector.log +hbase.connector.log.level=INFO + +# Define the root logger to the system property "hbase.connector.root.logger". +log4j.rootLogger=${hbase.connector.root.logger} + +# Logging Threshold +log4j.threshold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hbase.connector.log.dir}/${hbase.connector.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %.1000m%n + +# Rolling File Appender properties +hbase.connector.log.maxfilesize=256MB +hbase.connector.log.maxbackupindex=20 + +# Rolling File Appender +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.File=${hbase.connector.log.dir}/${hbase.connector.log.file} + +log4j.appender.RFA.MaxFileSize=${hbase.connector.log.maxfilesize} +log4j.appender.RFA.MaxBackupIndex=${hbase.connector.log.maxbackupindex} + +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %.1000m%n + + +# +# Null Appender +# +log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %.1000m%n + +log4j.appender.asyncconsole=org.apache.hadoop.hbase.AsyncConsoleAppender +log4j.appender.asyncconsole.target=System.err + +# Custom Logging levels + + + +log4j.logger.org.apache.hadoop.hbase.kafka=INFO + +#this is a debugging tool +log4j.logger.org.apache.hadoop.hbase.kafka.DumpToStringListener=DEBUG + + + +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN diff --git a/hbase-connectors-assembly/pom.xml b/hbase-connectors-assembly/pom.xml new file mode 100755 index 00000000..bd6b2cee --- /dev/null +++ b/hbase-connectors-assembly/pom.xml @@ -0,0 +1,159 @@ + + + + 4.0.0 + + hbase-connectors + org.apache.hbase.connectors + 1.0.0-SNAPSHOT + + hbase-connectors-assembly + Apache HBase Connectors - Assembly + + Module that does project assembly and that is all that it does. + + pom + + true + + + + org.apache.hbase.connectors + hbase-kafka-proxy + ${project.version} + + + + org.apache.hbase.connectors + hbase-kafka-model + ${project.version} + + + + + + maven-assembly-plugin + + + distro-assembly + package + + single + + + hbase-connectors-${project.version} + false + true + gnu + + + src/main/assembly/hbase-connectors-bin.xml + + + + + + + + + maven-dependency-plugin + + + + create-hbase-connectors-generated-classpath + test + + build-classpath + + + ${project.parent.basedir}/target/cached_classpath.txt + + + + + + + + + org.apache.maven.plugins + maven-remote-resources-plugin + + + aggregate-licenses + + process + + + + ${build.year} + ${license.debug.print.included} + + ${license.bundles.dependencies} + ${license.bundles.jquery} + ${license.bundles.logo} + ${license.bundles.bootstrap} + + + org.apache.hbase:hbase-resource-bundle:${hbase.version} + + + org.apache.hbase:hbase-resource-bundle:${hbase.version} + + + + supplemental-models.xml + + + + + + + + org.codehaus.mojo + exec-maven-plugin + ${exec.maven.version} + + + concat-NOTICE-files + package + + exec + + + env + + bash + -c + cat maven-shared-archive-resources/META-INF/NOTICE \ + `find ${project.build.directory}/dependency -iname NOTICE -or -iname NOTICE.txt` + + + ${project.build.directory}/NOTICE.aggregate + ${project.build.directory} + + + + + + + + diff --git a/hbase-connectors-assembly/src/main/assembly/connector-components.xml b/hbase-connectors-assembly/src/main/assembly/connector-components.xml new file mode 100755 index 00000000..84e99be1 --- /dev/null +++ b/hbase-connectors-assembly/src/main/assembly/connector-components.xml @@ -0,0 +1,45 @@ + + + + + + + + + ${project.basedir}/../conf + conf + 0644 + 0755 + + + + + ${project.basedir}/../bin + bin + + hbase-connectors + hbase-connectors-config.sh + + 0755 + 0755 + + + diff --git a/hbase-connectors-assembly/src/main/assembly/hbase-connectors-bin.xml b/hbase-connectors-assembly/src/main/assembly/hbase-connectors-bin.xml new file mode 100755 index 00000000..d57fbd41 --- /dev/null +++ b/hbase-connectors-assembly/src/main/assembly/hbase-connectors-bin.xml @@ -0,0 +1,54 @@ + + + + + hbase-connectors-bin + + tar.gz + + hbase-connectors-${project.version} + + src/main/assembly/connector-components.xml + + + + true + + org.apache.hbase.connectors:hbase-kafka-proxy + + + false + hbase-kafka-proxy + + + + org.apache.yetus:audience-annotations + org.slf4j:slf4j-api + org.slf4j:slf4j-log4j12 + + + + + + + diff --git a/hbase-connectors-assembly/src/main/resources/META-INF/LEGAL b/hbase-connectors-assembly/src/main/resources/META-INF/LEGAL new file mode 100644 index 00000000..e69de29b diff --git a/kafka/README b/kafka/README new file mode 100755 index 00000000..7685095b --- /dev/null +++ b/kafka/README @@ -0,0 +1,126 @@ +Hbase Kafka Proxy + +Welcome to the hbase kafka proxy. The purpose of this proxy is to act as a 'fake peer'. It +receives replication events from it's peer and applies a set of rules (stored in +kafka-route-rules.xml) to determine if the event should be forwared to a topic. If the +mutation matches one of the rules, the mutation is converted to an avro object and the +item is placed into the topic. + +The service sets up a bare bones region server, so it will use the values in hbase-site.xml. If +you wish to override those values, pass them as -Dkey=value. + +To Use: + +1. Make sure the hbase command is in your path. The proxy uses the 'hbase classpath' command to +find the hbase libraries. + +2. Create any topics in your kafka broker that you wish to use. + +3. set up kafka-route-rules.xml. This file controls how the mutations are routed. There are +two kinds of rules: route and drop. drop: any mutation that matches this rule will be dropped. +route: any mutation that matches this rule will be routed to the configured topic. + +Each rule has the following parameters: +- table +- columnFamily +- qualifier + +The qualifier parameter can contain simple wildcard expressions (start and end only). + +Examples + + + + + + +This causes all mutations done to default:mytable to be routed to kafka topic 'foo' + + + + + + +This will cause all mutations from default:mytable columnFamily mycf and qualifier myqualifier +to be routed to mykafkatopic. + + + + + + + +This combination will route all mutations from default:mytable columnFamily mycf to mykafkatopic +unless they start with 'secret'. Items matching that rule will be dropped. The way the rule is +written, all other mutations for column family mycf will be routed to the 'mykafka' topic. + +4. Service arguments + +--kafkabrokers (or -b) +--routerulesfile (or -r) +--kafkaproperties (or -f) +--peername (or -p) name of hbase peer to use (defaults to hbasekafka) +--znode (or -z) root znode (defaults to /kafkaproxy) +--enablepeer (or -e) enable peer on startup (defaults to false)] +--auto (or -a) auto create peer + + +5. start the service. + - make sure the hbase command is in your path + - ny default, the service looks for route-rules.xml in the conf directory, you can specify a + differeent file or location with the -r argument + +bin/hbase-connectors-daemon.sh start kafkaproxy -a -e -p wootman -b localhost:9092 -r ~/kafka-route-rules.xml + +this: +- starts the kafka proxy +- passes the -a. The proxy will create the replication peer specified by -p if it does not exist + (not required, but it savecs some busy work). +- enables the peer (-e) the proxy will enable the peer when the service starts (not required, you can + manually enable the peer in the hbase shell) + + +Notes: +1. The proxy will connect to the zookeeper in hbase-site.xml by default. You can override this by + passing -Dhbase.zookeeper.quorum + + bin/hbase-connectors-daemon.sh start kafkaproxy -Dhbase.zookeeper.quorum=localhost:1234 ..... other args .... + +2. route rules only support unicode characters. +3. I do not have access to a secured hadoop clsuter to test this on. + +Message format + +Messages are in avro format, this is the schema: + +{"namespace": "org.apache.hadoop.hbase.kafka", + "type": "record", + "name": "HBaseKafkaEvent", + "fields": [ + {"name": "key", "type": "bytes"}, + {"name": "timestamp", "type": "long" }, + {"name": "delete", "type": "boolean" }, + {"name": "value", "type": "bytes"}, + {"name": "qualifier", "type": "bytes"}, + {"name": "family", "type": "bytes"}, + {"name": "table", "type": "bytes"} + ] +} + +Any language that supports Avro should be able to consume the messages off the topic. + + +Testing Utility + +A utility is included to test the routing rules. + +bin/hbase-connectors-daemon.sh start kafkaproxytest -k -t + +the messages will be dumped in string format under logs/ + +TODO: +1. Some properties passed into the region server are hard-coded. +2. The avro objects should be generic +3. Allow rules to be refreshed without a restart +4. Get this tested on a secure (TLS & Kerberos) enabled cluster. \ No newline at end of file diff --git a/kafka/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro b/kafka/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro old mode 100644 new mode 100755 index ec886274..b25acfb9 --- a/kafka/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro +++ b/kafka/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + {"namespace": "org.apache.hadoop.hbase.kafka", "type": "record", "name": "HBaseKafkaEvent", diff --git a/kafka/hbase-kafka-proxy/pom.xml b/kafka/hbase-kafka-proxy/pom.xml old mode 100644 new mode 100755 index b4b3226f..6ebace50 --- a/kafka/hbase-kafka-proxy/pom.xml +++ b/kafka/hbase-kafka-proxy/pom.xml @@ -40,6 +40,7 @@ + org.apache.maven.plugins @@ -76,36 +77,28 @@ org.apache.avro avro - + org.apache.hbase.connectors hbase-kafka-model - - org.apache.hbase - hbase-common - + org.apache.hbase hbase-common test-jar test - - org.apache.hbase - hbase-client - - - org.apache.hbase - hbase-zookeeper - + org.apache.hbase hbase-server + provided org.apache.hbase hbase-annotations + compile org.apache.hbase @@ -113,6 +106,11 @@ test-jar test + + org.apache.yetus + audience-annotations + ${audience-annotations.version} + org.apache.kafka kafka-clients diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java old mode 100644 new mode 100755 index 5874f35c..9704c67e --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java @@ -60,9 +60,9 @@ public static void main(String[] args) { VersionInfo.logVersion(); Options options = new Options(); - options.addOption("k", "kafkabrokers", true, "Kafka Brokers " + + options.addRequiredOption("k", "kafkabrokers", true, "Kafka Brokers " + "(comma delimited)"); - options.addOption("t", "kafkatopics", true,"Kafka Topics " + options.addRequiredOption("t", "kafkatopics", true,"Kafka Topics " + "to subscribe to (comma delimited)"); CommandLine commandLine = null; try { diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java old mode 100644 new mode 100755 index 14c9179c..fda5cf2f --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java @@ -40,8 +40,11 @@ import org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -71,11 +74,12 @@ public final class KafkaProxy { public static final String KAFKA_PROXY_KAFKA_BROKERS = "kafkaproxy.kafka.brokers"; private static Map DEFAULT_PROPERTIES = new HashMap<>(); + private static Map CAN_OVERRIDE_DEFAULT_PROPERTIES = new HashMap<>(); + static { DEFAULT_PROPERTIES.put("hbase.cluster.distributed","true"); DEFAULT_PROPERTIES.put("zookeeper.znode.parent","/kafkaproxy"); - DEFAULT_PROPERTIES.put("hbase.regionserver.port","17020"); DEFAULT_PROPERTIES.put("hbase.regionserver.info.port","17010"); DEFAULT_PROPERTIES.put("hbase.client.connection.impl", "org.apache.hadoop.hbase.kafka.KafkaBridgeConnection"); @@ -91,33 +95,25 @@ public final class KafkaProxy { DEFAULT_PROPERTIES.put("hbase.regionserver.replication.handler.count","1"); DEFAULT_PROPERTIES.put("hbase.regionserver.handler.count","1"); DEFAULT_PROPERTIES.put("hbase.ipc.server.read.threadpool.size","3"); + + CAN_OVERRIDE_DEFAULT_PROPERTIES.put("hbase.regionserver.port","17020"); } private static void printUsageAndExit(Options options, int exitCode) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("hbase kafkaproxy start", "", options, "\nTo run the kafka proxy as a daemon, execute " + - "hbase-daemon.sh start|stop kafkaproxy " + - "[--kafkabrokers ] " + - "[-b ] " + - "[--routerulesfile ] " + - "[-r ] " + - "[--kafkaproperties ] " + - "[-f ] " + - "[--peername name of hbase peer to use (defaults to hbasekafka)] " + - "[-p name of hbase peer to use (defaults to hbasekafka)] " + - "[--znode root znode (defaults to /kafkaproxy)] " + - "[-z root znode (defaults to /kafkaproxy)] " + - - "[--enablepeer enable peer on startup (defaults to false)] " + - "[-e enable peer on startup (defaults to false)] " + - - "[--auto auto create peer] " + - "[-a auto create peer] \n", true); + "hbase-connectors-daemon.sh start|stop kafkaproxy \n" + + "[--kafkabrokers (or -b) ] \n" + + "[--routerulesfile (or -r) ] \n" + + "[--kafkaproperties (or -f) ] \n" + + "[--peername (or -p) name of hbase peer to use (defaults to hbasekafka)]\n " + + "[--znode (or -z) root znode (defaults to /kafkaproxy)] \n" + + "[--enablepeer (or -e) enable peer on startup (defaults to false)]\n " + + "[--auto (or -a) auto create peer] " + + "\n", true); System.exit(exitCode); } @@ -139,13 +135,13 @@ public static void main(String[] args) throws Exception { Options options = new Options(); - options.addOption("b", "kafkabrokers", true, + options.addRequiredOption("b", "kafkabrokers", true, "Kafka Brokers (comma delimited)"); options.addOption("r", "routerulesfile", true, "file that has routing rules (defaults to conf/kafka-route-rules.xml"); options.addOption("f", "kafkaproperties", true, "Path to properties file that has the kafka connection properties"); - options.addOption("p", "peername", true, + options.addRequiredOption("p", "peername", true, "Name of hbase peer"); options.addOption("z", "znode", true, "root zode to use in zookeeper (defaults to /kafkaproxy)"); @@ -154,19 +150,28 @@ public static void main(String[] args) throws Exception { options.addOption("e", "enablepeer", false, "enable peer on startup (defaults to false)"); - LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); CommandLine commandLine = null; + + Configuration commandLineConf = new Configuration(); + commandLineConf.clear(); + + GenericOptionsParser parser = new GenericOptionsParser(commandLineConf, args); + String restArgs[] =parser.getRemainingArgs(); + + + try { - commandLine = new BasicParser().parse(options, args); + commandLine = new BasicParser().parse(options, restArgs); } catch (ParseException e) { LOG.error("Could not parse: ", e); printUsageAndExit(options, -1); } + String peer=""; if (!commandLine.hasOption('p')){ System.err.println("hbase peer id is required"); @@ -231,15 +236,28 @@ public static void main(String[] args) throws Exception { .map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey))) .collect(Collectors.toList()); + allArgs.addAll(CAN_OVERRIDE_DEFAULT_PROPERTIES.keySet().stream() + .filter((argKey)->commandLineConf.get(argKey,"").equalsIgnoreCase("")) + .map((argKey)->("-D"+argKey+"="+ CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey))) + .collect(Collectors.toList())); + + for (Map.Entry k : commandLineConf){ + allArgs.add("-D"+k.getKey()+"="+k.getValue()); + } + otherProps.keySet().stream() .map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey))) .forEach((item)->allArgs.add(item)); - Arrays.stream(args) + Arrays.stream(restArgs) .filter((arg)->(arg.startsWith("-D")||arg.equals("start"))) .forEach((arg)->allArgs.add(arg)); - LOG.info("Args passed to region server "+allArgs); + // is start there? + if (allArgs.stream() + .filter((arg)->arg.equalsIgnoreCase("start")).count() < 1){ + allArgs.add("start"); + } String[] newArgs=new String[allArgs.size()]; allArgs.toArray(newArgs); @@ -262,8 +280,10 @@ public static void setupZookeeperZnodes(CuratorFramework zk, String rootZnode,St byte []uuidBytes = Bytes.toBytes(newValue); String idPath=rootZnode+"/hbaseid"; if (zk.checkExists().forPath(idPath) == null) { - zk.create().creatingParentsIfNeeded().forPath(rootZnode + - "/hbaseid",uuidBytes); + // zk.create().creatingParentsIfNeeded().forPath(rootZnode + + // "/hbaseid",uuidBytes); + zk.create().forPath(rootZnode); + zk.create().forPath(rootZnode +"/hbaseid",uuidBytes); } else { // If the znode is there already make sure it has the // expected value for the peer name. @@ -322,7 +342,13 @@ public static void checkForOrCreateReplicationPeer(Configuration hbaseConf, if (peerThere) { if (enablePeer){ LOG.info("enable peer," + peerName); - admin.enableReplicationPeer(peerName); + List peers = admin.listReplicationPeers().stream() + .filter((peer)->peer.getPeerId().equals(peerName)) + .filter((peer)->peer.isEnabled()==false) + .collect(Collectors.toList()); + if (!peers.isEmpty()){ + admin.enableReplicationPeer(peerName); + } } break; } else { diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java old mode 100644 new mode 100755 index ec8034de..25e4796a --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java @@ -176,6 +176,11 @@ public void batch(final List actions, Object[] results) this.producer.flush(); } + @Override + public void close() { + this.producer.flush(); + } + @Override public TableName getName() { return this.tableName; diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index 45f25234..da176d57 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,8 @@ kafka/hbase-kafka-model kafka/hbase-kafka-proxy - + hbase-connectors-assembly + scm:git:git://gitbox.apache.org/repos/asf/hbase-connectors.git scm:git:https://gitbox.apache.org/repos/asf/hbase-connectors.git @@ -119,6 +120,8 @@ 1.7.7 2.1.0 3.6.1 + 1.6.0 + 0.5.0 @@ -167,21 +170,34 @@ hbase-client ${hbase.version} - - org.apache.hbase.connectors - hbase-kafka-model - ${project.version} - - - org.apache.hbase.connectors - hbase-kafka-proxy - ${project.version} - + + + + org.apache.hbase.connectors + hbase-kafka-proxy + ${project.version} + + + + org.apache.hbase.connectors + hbase-kafka-model + ${project.version} + + + + + maven-assembly-plugin + + + true + + org.apache.maven.plugins maven-release-plugin