-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka-cp-topic
executable file
·26 lines (20 loc) · 904 Bytes
/
kafka-cp-topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/bin/bash -e
#
# kafka-cp-topic <topic-src> <topic-dst> [n]
#
# Copy up to <n> messages from topic <src> to topic <dst>
#
[[ $# != 2 && $# != 3 ]] && { echo "usage: kafka-cp-topic <topic-src> <topic-dst> [n]" 1>&2; exit -1; }
SRC="$1"
DST="$2"
test -z "$3" && COUNT= || COUNT="-c $3"
# A delimiter that's unlikely to appear in the byte stream by chance
DELIM="$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 40 | head -n 1)"
# Temporary dir where we'll dump the messages from $SRC, one file per message
TDIR="$(mktemp -d)"
trap "{ rm -rf "$TDIR"; }" EXIT
# Consume the messages into the temporary dir
kafkacat -C -e -b localhost -t "$SRC" -D "$DELIM" $COUNT |
awk 'BEGIN { RS="'"$DELIM"'"; PART=-1 } { PART++; FILENAME=sprintf("'"$TDIR/msg.%012d"'", PART); printf "%s",$0 > FILENAME }'
# Produce from temporary dir into the destination topic
kafkacat -P -b localhost -t "$DST" "$TDIR"/msg.*