Skip to content

Commit

Permalink
Store FDMI events in kafka. (Seagate#1261)
Browse files Browse the repository at this point in the history
We add callback for kafka producer. If FDMI events
are sent successfully, we will ack the FDMI record in
the callback.

The patch focuses on happy path.
More failure case handlings are needed in future.

Signed-off-by: Hua Huang <hua.huang@seagate.com>

Co-authored-by: John Bent <john.bent@seagate.com>
  • Loading branch information
Huang Hua and johnbent authored Nov 30, 2021
1 parent b374747 commit e0bb65c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 12 deletions.
102 changes: 92 additions & 10 deletions fdmi/plugins/fdmi_app
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,76 @@ MAX_KEYSTRS = 10240
# is communicating.
processes = []

FDMI_producer = None


def send_ack_to_plugin(banner, p, rec_id):
print(banner + " ACK " + rec_id, flush = True)
ri = bytes('{}\n'.format(rec_id), encoding = "utf-8")
p.stdin.write(ri)
p.stdin.flush()




# Establish connection to kafka server
def kafka_producer_connect(args):
global FDMI_producer

if (args.kafka_server is not None):
from kafka import KafkaProducer
FDMI_producer = KafkaProducer(bootstrap_servers=args.kafka_server,
value_serializer=lambda m: json.dumps(m).encode())

if (not FDMI_producer.bootstrap_connected()):
print('Can not connect to kafka server: {}'.format(args.kafka_server),
file=sys.stderr)
sys.exit(einval)


# Close connection to kafka server
def kafka_producer_close():
global FDMI_producer

if (FDMI_producer is not None):
FDMI_producer.flush()
FDMI_producer.close()
FDMI_producer = None


def on_send_success(*args, **kwargs):
"""
callback on kafka send success
:param args: (kv, RecordMetadata)
"""
kv = args[0]
print('success on {}'.format(kv))
rec_id = kv["rec_id"]
for p in processes:
send_ack_to_plugin("----------", p, rec_id)
return args


def on_send_error(*args, **kwargs):
"""
callback on kafka send failure
Not used. TODO in future.
"""
return args



# Send FDMI record to kafka with "FDMI" topic
def kafka_producer_send(kv):
global FDMI_producer

if (FDMI_producer is not None):
FDMI_producer.send("FDMI", kv).add_callback(on_send_success, kv).add_errback(on_send_error, kv)
else:
rec_id = kv["rec_id"]
for p in processes:
send_ack_to_plugin("++++++++++", p, rec_id)


def str_decode(encoded):
if encoded is None:
Expand All @@ -73,6 +143,7 @@ def str_decode(encoded):
# such as de-dup, logging and/or sending to the replicator program for data
# move.
def process_fdmi_record(kv_record, p):

kvs = json.loads(kv_record)
fid = kvs.get('fid')
if fid is None:
Expand Down Expand Up @@ -103,10 +174,6 @@ def process_fdmi_record(kv_record, p):
'cr_val': cr_val
}

ri = bytes('{}\n'.format(rec_id), encoding = "utf-8")
p.stdin.write(ri)
p.stdin.flush()

# This string is used to be the KEY in the `kv_records`
# 'fid': '<5400000600012345:123450>' the most significant chars are
# fid type and fid location. They are excluded.
Expand All @@ -118,13 +185,21 @@ def process_fdmi_record(kv_record, p):
# Check de-dup records
kv_i = kv_records.get(keystr)
if kv_i is not None:
print('dup: {}'.format(kv), file=sys.stdout, flush = True)
return
#print('dup: {}'.format(kv), file=sys.stdout, flush = True)
#send ACK to plugin immediately beause it is a duplicated one
send_ack_to_plugin("++++++++++", p, rec_id)
pass
else:
print('new: {}'.format(kv), file=sys.stdout, flush = True)
#print('new: {}'.format(kv), file=sys.stdout, flush = True)
kv_records[keystr] = kv
keystr_LRU.append(keystr)
return
#send FDMI event to kafka asynchronously.
#We will ack it in on_send_success() callback
kafka_producer_send(kv)

return
#end of process_fdmi_record()



# Handle SIGNT and SIGTERM. Stop the underlying processes and exit.
Expand All @@ -134,6 +209,9 @@ def signal_handler(sig, frame):
for p in processes:
p.send_signal(sig)
p.wait()

kafka_producer_close()

sys.exit(0)


Expand Down Expand Up @@ -308,7 +386,7 @@ def main(args):
args.profile_fid is not None and
args.process_fid is not None and
args.filter_id is not None ):
# All required info is from arguments
# All required info is from arguments
cluster_info = dict()
cluster_info['local_endpoint'] = args.local_endpoint
cluster_info['ha_endpoint'] = args.ha_endpoint
Expand Down Expand Up @@ -336,7 +414,7 @@ def main(args):
print(' config-path = {}'.format(config_path), file=sys.stderr)
print(' filter-id = {}'.format(filter_id), file=sys.stderr)

# Now query from `hctl` and conf.xc
# Now query from `hctl` and conf.xc
cluster_info = get_cluster_info(args)

if cluster_info is None:
Expand All @@ -348,6 +426,8 @@ def main(args):
print('Register SIGINT signal handler', file=sys.stderr)
signal.signal(signal.SIGINT, signal_handler)

kafka_producer_connect(args)

# Execute fdmi_sample_plugin with cluster info in loop and wait for
# the FDMI events in form of JSON metadata.
listen_on_command(get_plugin_cmd(args))
Expand Down Expand Up @@ -375,5 +455,7 @@ if __name__ == '__main__':
type=str)
ap.add_argument('-sf', '--process-fid', required=False, help='Process FID',
type=str)
ap.add_argument('-ks', '--kafka-server', required=False, help='kafka server and port',
type=str)

main(ap.parse_args())
6 changes: 4 additions & 2 deletions fdmi/plugins/fdmi_plugin_st.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ do_some_kv_operations()
echo "This is to simulate the plugin failure and start"

rc=1
stop_fdmi_plugin && sleep 5 &&
sleep 5 && stop_fdmi_plugin && sleep 5 &&
start_fdmi_plugin "$FDMI_FILTER_FID" "$FDMI_PLUGIN_EP" &&
start_fdmi_plugin "$FDMI_FILTER_FID2" "$FDMI_PLUGIN_EP2" && rc=0
if [[ $rc -eq 1 ]] ; then
Expand Down Expand Up @@ -177,7 +177,9 @@ start_fdmi_plugin()
-he ${lnet_nid}:$HA_EP -pf $PROF_OPT \
-sf $M0T1FS_PROC_ID \
-fi $fdmi_filter_fid \
--plugin-path $M0_SRC_DIR/fdmi/plugins/fdmi_sample_plugin"
--plugin-path $M0_SRC_DIR/fdmi/plugins/fdmi_sample_plugin "
# To test with kafka server, uncomment following with valid kafka server
# APP_PARAM="$APP_PARAM -ks 127.0.0.1:9092"
PLUGIN_CMD="$M0_SRC_DIR/fdmi/plugins/fdmi_app $APP_PARAM"

if $interactive ; then
Expand Down

0 comments on commit e0bb65c

Please sign in to comment.