Skip to content

Commit

Permalink
Merge pull request #14 from rticommunity/improve_hb_msg
Browse files Browse the repository at this point in the history
Improve HeartBeat and ACKNACK messages and fix minor issues
  • Loading branch information
iblancasa authored Dec 19, 2016
2 parents 389bf0e + 7f35f4e commit 76d327b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 30 deletions.
3 changes: 2 additions & 1 deletion logparser/devices/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,6 @@ def dict_regex_search(content, regex):
"""Apply the regex over all the fields of the dictionary."""
match = False
for field in content:
match = match if match else regex.search(field)
if isinstance(content[field], str):
match = match if match else regex.search(content[field])
return match
9 changes: 8 additions & 1 deletion logparser/devices/outputdevices.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ def write(self, text=""):
if self.support_ansi:
print("\033[K" + text)
else:
print(text)
# Catch any potential exception when piping the output and
# terminating the program.
try:
print(text)
except IOError:
# It makes no sense to print the error since we already had
# an exception printing a message.
pass

def close(self):
"""Do nothing, no need to close device."""
Expand Down
9 changes: 5 additions & 4 deletions logparser/network/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def get_regex_list():
regex.append([network.on_send_piggyback_hb,
r"COMMENDSrWriterService_agentFunction:\s?writer oid " +
r"0x(\w+) sends piggyback HB \(([\d,]+)\)-\(([\d,]+)\)"])
regex.append([network.on_send_piggyback_hb,
regex.append([network.on_send_piggyback_hb_syncrepair,
r"COMMENDSrWriterService_sendSyncRepairData:\[\d+,\d+\] " +
r"writer oid 0x(\w+) sends piggyback HB for sn " +
r"\(([\d,]+)\)-\(([\d,]+), epoch\(\d+\)\)"])
r"\(([\d,]+)\)-\(([\d,]+), epoch\((\d+)\)\)"])
regex.append([network.on_send_hb_response,
r"COMMENDSrWriterService_onSubmessage:\[\d+,\d+\] " +
r"writer oid 0x(\w+) sends response HB for sn " +
Expand Down Expand Up @@ -147,8 +147,9 @@ def get_regex_list():
r"nextRelSn\(([\d,]+)\), reservedCount\((\d+)\)"])
regex.append([network.on_receive_hb,
r"COMMENDSrReaderService_onSubmessage:\[\d+,\d+\] reader " +
r"oid 0x(\w+) received HB for sn \(([\d,]+)\)-" +
r"\(([\d,]+)\), epoch\((\d+)\) from writer 0x([\w\.]+)"])
r"oid 0x(\w+) received (HB|HB_BATCH|HB_SESSION) for " +
r"sn \(([\d,]+)\)-\(([\d,]+)\), epoch\((\d+)\) " +
r"from writer 0x([\w\.]+)"])
regex.append([network.on_send_ack,
r"COMMENDSrReaderService_onSubmessage:\[\d+,\d+\] reader " +
r"oid 0x(\w+) sent ACK of bitmap lead\(([\d,]+)\), " +
Expand Down
90 changes: 66 additions & 24 deletions logparser/network/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
+ on_send_preemptive_gap: it happens when sending a preemptive GAP message.
+ on_send_preemptive_hb: it happens when sending a preemptive HB message.
+ on_send_piggyback_hb: it happens when sending a piggyback HB message.
+ on_send_piggyback_hb_syncrepair: it happens for piggyback HB at repair.
+ on_send_hb_response: it happens when sending a HB response.
+ on_receive_ack: it happens when receiving an ACK message.
+ on_instance_not_found: it happens when the instance is not found.
Expand Down Expand Up @@ -143,7 +144,7 @@ def on_schedule_data(match, state):
"""It happens when a data is asynchronously scheduled."""
writer_oid = get_oid(match[0])
seqnum = parse_sn(match[1])
log_process("", writer_oid, "Scheduled DATA (%d)" % seqnum, state)
log_process("", writer_oid, "Scheduled DATA [%d]" % seqnum, state)

if 'packets_lost' not in state:
state['packets_lost'] = []
Expand All @@ -158,7 +159,7 @@ def on_send_data(match, state):
"""It happens when a DATA packet is sent."""
writer_oid = get_oid(match[0])
seqnum = parse_sn(match[1])
log_send("", writer_oid, "Sent DATA (%d)" % seqnum, state)
log_send("", writer_oid, "Sent DATA [%d]" % seqnum, state)
add_statistics_packet(writer_oid, "send", "DATA", state)

key = writer_oid + "-" + str(seqnum)
Expand All @@ -175,7 +176,7 @@ def on_resend_data(match, state):
seqnum = parse_sn(match[5])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send(remote_part, writer_oid,
"Resend %s [%d] to reader %s" % (packet_name, seqnum, remote_oid),
"Resent %s [%d] to reader %s" % (packet_name, seqnum, remote_oid),
state, verb)


Expand All @@ -185,9 +186,10 @@ def on_send_gap(match, state):
remote_part = parse_guid(state, match[1], match[2], match[3])
reader_oid = get_oid(match[4])
sn_start = parse_sn(match[5])
sn_end = parse_sn(match[6])
sn_end = parse_sn(match[6]) - 1
verb = 1 if is_builtin_entity(match[0]) else 0
log_send(remote_part, writer_oid, "Sent GAP to reader %s for [%d, %d)" %
log_send(remote_part, writer_oid,
"Sent GAP to reader %s for samples in [%d, %d]" %
(reader_oid, sn_start, sn_end), state, verb)
add_statistics_packet(writer_oid, 'send', 'GAP', state)

Expand All @@ -204,7 +206,7 @@ def on_send_gap(match, state):
oid = info[0]
seqnum = int(info[1])
if oid == writer_oid and seqnum >= sn_start and seqnum < sn_end:
log_warning("DATA (%d) may have been lost" % seqnum, state)
log_warning("DATA [%d] may have been lost" % seqnum, state)
losts.append(k)
for k in losts:
state['packets_lost'].remove(k)
Expand All @@ -229,7 +231,8 @@ def on_send_preemptive_hb(match, state):
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("",
writer_oid,
"Sent preemptive HB for [%d, %d]" % (sn_start, sn_end),
"Sent preemptive HB to let know about samples in [%d, %d]" %
(sn_start, sn_end),
state,
verb)

Expand All @@ -240,19 +243,40 @@ def on_send_piggyback_hb(match, state):
sn_first = parse_sn(match[1])
sn_last = parse_sn(match[2])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("", writer_oid, "Sent PIGGYBACK HB for [%d, %d]" %
log_send("", writer_oid,
"Sent piggyback HB to acknowledge samples in [%d, %d]" %
(sn_first, sn_last), state, verb)
add_statistics_packet(writer_oid, "send", "PIGGYBACK HB", state)


def on_send_piggyback_hb_syncrepair(match, state):
"""It happens when sending a piggyback HB message from repair."""
writer_oid = get_oid(match[0])
sn_first = parse_sn(match[1])
sn_last = parse_sn(match[2])
epoch = int(match[3])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("",
writer_oid,
("Sent piggyback HB [%d] from synchronous reparation" % epoch) +
" to acknowledge samples in [%d, %d]" % (sn_first, sn_last),
state,
verb)
add_statistics_packet(writer_oid, "send", "PIGGYBACK HB", state)


def on_send_hb_response(match, state):
"""It happens when sending a HB response."""
"""It happens when sending a HB to verify GAP."""
writer_oid = get_oid(match[0])
sn_end = parse_sn(match[1])
sn_start = parse_sn(match[2])
epoch = int(match[3])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("", writer_oid, "Sent HB response for [%d, %d]" %
(sn_start, sn_end), state, verb)
log_send("",
writer_oid,
"Sent HB [%d] to verify GAP for samples in [%d, %d]" %
(epoch, sn_start, sn_end),
state, verb)


def on_receive_ack(match, state):
Expand All @@ -262,9 +286,13 @@ def on_receive_ack(match, state):
reader_addr = parse_guid(state, remote[0], remote[1], remote[2])
reader_oid = get_oid(remote[3])
seqnum = parse_sn(match[2])
bitcount = int(match[3])
epoch = int(match[4])
verb = 1 if is_builtin_entity(match[0]) else 0
log_recv(reader_addr, writer_oid,
"Received ACKNACK from reader %s for %d" % (reader_oid, seqnum),
log_recv(reader_addr,
writer_oid,
"Received ACKNACK [%d] from reader %s for %d +%d" %
(epoch, reader_oid, seqnum, bitcount),
state, verb)


Expand Down Expand Up @@ -383,42 +411,56 @@ def on_rejected_data(match, state):
def on_receive_hb(match, state):
"""It happens when the write receives a HB."""
reader_oid = get_oid(match[0])
sn_start = parse_sn(match[1])
sn_end = parse_sn(match[2])
remote = match[4].split('.')
packet = match[1]
sn_start = parse_sn(match[2])
sn_end = parse_sn(match[3])
epoch = int(match[4])
remote = match[5].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_recv(writer_addr, reader_oid,
"Received HB from writer %s for [%d, %d]" %
(writer_oid, sn_start, sn_end), state, verb)
log_recv(writer_addr,
reader_oid,
"Received %s [%d] from writer %s for samples in [%d, %d]" %
(packet, epoch, writer_oid, sn_start, sn_end),
state,
verb)


def on_send_ack(match, state):
"""It happens when a ACK message is sent."""
reader_oid = get_oid(match[0])
lead = parse_sn(match[1])
bitcount = int(match[2])
epoch = int(match[3])
remote = match[4].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_send(writer_addr, reader_oid, "Sent ACK to writer %s for %d count %d" %
(writer_oid, lead, bitcount), state, verb)
log_send(writer_addr,
reader_oid,
"Sent ACK [%d] to writer %s for %d count %d" %
(epoch, writer_oid, lead, bitcount),
state,
verb)


def on_send_nack(match, state):
"""It happens when a NACK message is sent."""
reader_oid = get_oid(match[0])
lead = parse_sn(match[1])
bitcount = int(match[2])
epoch = int(match[3])
remote = match[4].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_send(writer_addr, reader_oid,
"Sent NACK to writer %s for %d count %d" %
(writer_oid, lead, bitcount), state, verb)
log_send(writer_addr,
reader_oid,
"Sent NACK [%d] to writer %s for %d count %d" %
(epoch, writer_oid, lead, bitcount),
state,
verb)


def on_sample_received_from_deleted_writer(match, state):
Expand Down

0 comments on commit 76d327b

Please sign in to comment.