Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve HeartBeat and ACKNACK messages and fix minor issues #14

Merged
merged 10 commits into from
Dec 19, 2016
3 changes: 2 additions & 1 deletion logparser/devices/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,6 @@ def dict_regex_search(content, regex):
"""Apply the regex over all the fields of the dictionary."""
match = False
for field in content:
match = match if match else regex.search(field)
if isinstance(content[field], str):
match = match if match else regex.search(content[field])
return match
9 changes: 8 additions & 1 deletion logparser/devices/outputdevices.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ def write(self, text=""):
if self.support_ansi:
print("\033[K" + text)
else:
print(text)
# Catch any potential exception when piping the output and
# terminating the program.
try:
print(text)
except IOError:
# It makes no sense to print the error since we already had
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we printing where the exception is happening?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception happens in the native print operation because the pipe is closed.

# an exception printing a message.
pass

def close(self):
"""Do nothing, no need to close device."""
Expand Down
9 changes: 5 additions & 4 deletions logparser/network/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def get_regex_list():
regex.append([network.on_send_piggyback_hb,
r"COMMENDSrWriterService_agentFunction:\s?writer oid " +
r"0x(\w+) sends piggyback HB \(([\d,]+)\)-\(([\d,]+)\)"])
regex.append([network.on_send_piggyback_hb,
regex.append([network.on_send_piggyback_hb_syncrepair,
r"COMMENDSrWriterService_sendSyncRepairData:\[\d+,\d+\] " +
r"writer oid 0x(\w+) sends piggyback HB for sn " +
r"\(([\d,]+)\)-\(([\d,]+), epoch\(\d+\)\)"])
r"\(([\d,]+)\)-\(([\d,]+), epoch\((\d+)\)\)"])
regex.append([network.on_send_hb_response,
r"COMMENDSrWriterService_onSubmessage:\[\d+,\d+\] " +
r"writer oid 0x(\w+) sends response HB for sn " +
Expand Down Expand Up @@ -147,8 +147,9 @@ def get_regex_list():
r"nextRelSn\(([\d,]+)\), reservedCount\((\d+)\)"])
regex.append([network.on_receive_hb,
r"COMMENDSrReaderService_onSubmessage:\[\d+,\d+\] reader " +
r"oid 0x(\w+) received HB for sn \(([\d,]+)\)-" +
r"\(([\d,]+)\), epoch\((\d+)\) from writer 0x([\w\.]+)"])
r"oid 0x(\w+) received (HB|HB_BATCH|HB_SESSION) for " +
r"sn \(([\d,]+)\)-\(([\d,]+)\), epoch\((\d+)\) " +
r"from writer 0x([\w\.]+)"])
regex.append([network.on_send_ack,
r"COMMENDSrReaderService_onSubmessage:\[\d+,\d+\] reader " +
r"oid 0x(\w+) sent ACK of bitmap lead\(([\d,]+)\), " +
Expand Down
105 changes: 75 additions & 30 deletions logparser/network/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
+ on_send_preemptive_gap: it happens when sending a preemptive GAP message.
+ on_send_preemptive_hb: it happens when sending a preemptive HB message.
+ on_send_piggyback_hb: it happens when sending a piggyback HB message.
+ on_send_piggyback_hb_syncrepair: it happens for piggyback HB at repair.
+ on_send_hb_response: it happens when sending a HB response.
+ on_receive_ack: it happens when receiving an ACK message.
+ on_instance_not_found: it happens when the instance is not found.
Expand All @@ -57,9 +58,9 @@
from logparser.devices.logger import (log_cfg, log_error, log_process,
log_recv, log_send, log_warning)
from logparser.utils import (add_statistics_bandwidth, add_statistics_packet,
get_locator, get_oid, get_participant,
get_port_name, get_port_number, hex2ip,
is_builtin_entity, parse_guid, parse_sn)
get_data_packet_name, get_locator, get_oid,
get_participant, get_port_name, get_port_number,
hex2ip, is_builtin_entity, parse_guid, parse_sn)


# --------------------------------------------------------------------------- #
Expand Down Expand Up @@ -143,7 +144,7 @@ def on_schedule_data(match, state):
"""It happens when a data is asynchronously scheduled."""
writer_oid = get_oid(match[0])
seqnum = parse_sn(match[1])
log_process("", writer_oid, "Scheduled DATA (%d)" % seqnum, state)
log_process("", writer_oid, "Scheduled DATA [%d]" % seqnum, state)

if 'packets_lost' not in state:
state['packets_lost'] = []
Expand All @@ -158,7 +159,7 @@ def on_send_data(match, state):
"""It happens when a DATA packet is sent."""
writer_oid = get_oid(match[0])
seqnum = parse_sn(match[1])
log_send("", writer_oid, "Sent DATA (%d)" % seqnum, state)
log_send("", writer_oid, "Sent DATA [%d]" % seqnum, state)
add_statistics_packet(writer_oid, "send", "DATA", state)

key = writer_oid + "-" + str(seqnum)
Expand All @@ -169,12 +170,13 @@ def on_send_data(match, state):
def on_resend_data(match, state):
"""It happens when the writer resend a DATA message."""
writer_oid = get_oid(match[0])
packet_name = get_data_packet_name(match[0])
remote_part = parse_guid(state, match[1], match[2], match[3])
remote_oid = get_oid(match[4])
seqnum = parse_sn(match[5])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send(remote_part, writer_oid,
"Resend DATA (%d) to reader %s" % (seqnum, remote_oid),
"Resent %s [%d] to reader %s" % (packet_name, seqnum, remote_oid),
state, verb)


Expand All @@ -184,9 +186,10 @@ def on_send_gap(match, state):
remote_part = parse_guid(state, match[1], match[2], match[3])
reader_oid = get_oid(match[4])
sn_start = parse_sn(match[5])
sn_end = parse_sn(match[6])
sn_end = parse_sn(match[6]) - 1
verb = 1 if is_builtin_entity(match[0]) else 0
log_send(remote_part, writer_oid, "Sent GAP to reader %s for [%d, %d)" %
log_send(remote_part, writer_oid,
"Sent GAP to reader %s for samples in [%d, %d]" %
(reader_oid, sn_start, sn_end), state, verb)
add_statistics_packet(writer_oid, 'send', 'GAP', state)

Expand All @@ -203,7 +206,7 @@ def on_send_gap(match, state):
oid = info[0]
seqnum = int(info[1])
if oid == writer_oid and seqnum >= sn_start and seqnum < sn_end:
log_warning("DATA (%d) may have been lost" % seqnum, state)
log_warning("DATA [%d] may have been lost" % seqnum, state)
losts.append(k)
for k in losts:
state['packets_lost'].remove(k)
Expand All @@ -228,7 +231,8 @@ def on_send_preemptive_hb(match, state):
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("",
writer_oid,
"Sent preemptive HB for [%d, %d]" % (sn_start, sn_end),
"Sent preemptive HB to let know about samples in [%d, %d]" %
(sn_start, sn_end),
state,
verb)

Expand All @@ -239,19 +243,40 @@ def on_send_piggyback_hb(match, state):
sn_first = parse_sn(match[1])
sn_last = parse_sn(match[2])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("", writer_oid, "Sent PIGGYBACK HB for [%d, %d]" %
log_send("", writer_oid,
"Sent piggyback HB to acknowledge samples in [%d, %d]" %
(sn_first, sn_last), state, verb)
add_statistics_packet(writer_oid, "send", "PIGGYBACK HB", state)


def on_send_piggyback_hb_syncrepair(match, state):
"""It happens when sending a piggyback HB message from repair."""
writer_oid = get_oid(match[0])
sn_first = parse_sn(match[1])
sn_last = parse_sn(match[2])
epoch = int(match[3])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("",
writer_oid,
("Sent piggyback HB [%d] from synchronous reparation" % epoch) +
" to acknowledge samples in [%d, %d]" % (sn_first, sn_last),
state,
verb)
add_statistics_packet(writer_oid, "send", "PIGGYBACK HB", state)


def on_send_hb_response(match, state):
"""It happens when sending a HB response."""
"""It happens when sending a HB to verify GAP."""
writer_oid = get_oid(match[0])
sn_end = parse_sn(match[1])
sn_start = parse_sn(match[2])
epoch = int(match[3])
verb = 1 if is_builtin_entity(match[0]) else 0
log_send("", writer_oid, "Sent HB response for [%d, %d]" %
(sn_start, sn_end), state, verb)
log_send("",
writer_oid,
"Sent HB [%d] to verify GAP for samples in [%d, %d]" %
(epoch, sn_start, sn_end),
state, verb)


def on_receive_ack(match, state):
Expand All @@ -261,9 +286,13 @@ def on_receive_ack(match, state):
reader_addr = parse_guid(state, remote[0], remote[1], remote[2])
reader_oid = get_oid(remote[3])
seqnum = parse_sn(match[2])
bitcount = int(match[3])
epoch = int(match[4])
verb = 1 if is_builtin_entity(match[0]) else 0
log_recv(reader_addr, writer_oid,
"Received ACKNACK from reader %s for %d" % (reader_oid, seqnum),
log_recv(reader_addr,
writer_oid,
"Received ACKNACK [%d] from reader %s for %d +%d" %
(epoch, reader_oid, seqnum, bitcount),
state, verb)


Expand Down Expand Up @@ -329,6 +358,7 @@ def on_receive_data(match, state):
remote = match[5].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
packet = get_data_packet_name(remote[3]) if packet == "DATA" else packet

# Sequece number check
full_id = writer_addr + "." + writer_oid + ' to ' + reader_oid
Expand All @@ -345,7 +375,7 @@ def on_receive_data(match, state):

# Show the message after any possible warning.
verb = 1 if is_builtin_entity(remote[3]) else 0
log_recv(writer_addr, reader_oid, "Received %s (%d) from writer %s (%s)" %
log_recv(writer_addr, reader_oid, "Received %s [%d] from writer %s (%s)" %
(packet, seqnum, writer_oid, comm),
state, verb)

Expand All @@ -358,9 +388,10 @@ def on_receive_out_order_data(match, state):
remote = match[3].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
packet_name = get_data_packet_name(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_recv(writer_addr, reader_oid, "Received %s DATA (%d) from writer %s" %
(kind, seqnum, writer_oid),
log_recv(writer_addr, reader_oid, "Received %s %s [%d] from writer %s" %
(kind, packet_name, seqnum, writer_oid),
state, verb)


Expand All @@ -380,42 +411,56 @@ def on_rejected_data(match, state):
def on_receive_hb(match, state):
"""It happens when the write receives a HB."""
reader_oid = get_oid(match[0])
sn_start = parse_sn(match[1])
sn_end = parse_sn(match[2])
remote = match[4].split('.')
packet = match[1]
sn_start = parse_sn(match[2])
sn_end = parse_sn(match[3])
epoch = int(match[4])
remote = match[5].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_recv(writer_addr, reader_oid,
"Received HB from writer %s for [%d, %d]" %
(writer_oid, sn_start, sn_end), state, verb)
log_recv(writer_addr,
reader_oid,
"Received %s [%d] from writer %s for samples in [%d, %d]" %
(packet, epoch, writer_oid, sn_start, sn_end),
state,
verb)


def on_send_ack(match, state):
"""It happens when a ACK message is sent."""
reader_oid = get_oid(match[0])
lead = parse_sn(match[1])
bitcount = int(match[2])
epoch = int(match[3])
remote = match[4].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_send(writer_addr, reader_oid, "Sent ACK to writer %s for %d count %d" %
(writer_oid, lead, bitcount), state, verb)
log_send(writer_addr,
reader_oid,
"Sent ACK [%d] to writer %s for %d count %d" %
(epoch, writer_oid, lead, bitcount),
state,
verb)


def on_send_nack(match, state):
"""It happens when a NACK message is sent."""
reader_oid = get_oid(match[0])
lead = parse_sn(match[1])
bitcount = int(match[2])
epoch = int(match[3])
remote = match[4].split('.')
writer_addr = parse_guid(state, remote[0], remote[1], remote[2])
writer_oid = get_oid(remote[3])
verb = 1 if is_builtin_entity(remote[3]) else 0
log_send(writer_addr, reader_oid,
"Sent NACK to writer %s for %d count %d" %
(writer_oid, lead, bitcount), state, verb)
log_send(writer_addr,
reader_oid,
"Sent NACK [%d] to writer %s for %d count %d" %
(epoch, writer_oid, lead, bitcount),
state,
verb)


def on_sample_received_from_deleted_writer(match, state):
Expand Down
71 changes: 49 additions & 22 deletions logparser/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
+ add_statistics_packets: Add the given packet to the packet statistics.
+ add_statistics_bandwidth: Add the given packet to the bandwidth statistics.
+ obfuscate: Obfuscate the given text.
+ get_oid: Parse the entity object ID and conver to text.
+ is_builtin_entity: Get if the OID hex number is for a built-in entity.
+ get_oid: Get a name for the entity ID in hexadecimal text format.
+ is_builtin_entity: Return if the OID hex number is for a built-in entity.
+ get_data_packet_name: Return the DATA packet name.
+ get_topic_name: Get the topic name, obfuscating if needed.
+ get_type_name: Get the type name, obfuscating if needed.
+ get_port_number: Get the port number, obfuscating if needed.
Expand Down Expand Up @@ -166,35 +167,61 @@ def obfuscate(text, state):


def get_oid(oid):
"""Parse the entity object ID and conver to text."""
oid_names = {
# Built-in OID names.
0x00000000: "UNKNOWN", 0x000001c1: "BUILTIN_PARTIC",
0x000002c2: "TOPIC_WRITER", 0x000002c7: "TOPIC_READER",
0x000003c2: "PUB_WRITER", 0x000003c7: "PUB_READER",
0x000004c2: "SUB_WRITER", 0x000004c7: "SUB_READER",
0x000100c2: "PARTIC_WRITER", 0x000100c7: "PARTIC_READER",
0x000200c2: "MESSAGE_WRITER", 0x000200c7: "MESSAGE_READER"}
user_oid_kind = {
# Application defined entities.
0x00: "UNK", 0x01: "PAR",
"""Get a name for the entity ID in hexadecimal text format."""
# Information from RTPS Spec: http://www.omg.org/spec/DDSI-RTPS/2.2/PDF/
# Security entities: http://www.omg.org/spec/DDS-SECURITY/1.0/Beta2/
BUILTIN_NAMES = {
# Built-in Entity GUIDs
0x00000000: "UNKNOWN", 0x000001c1: "PARTICIPANT",
0x000002c2: "SED_TOPIC_WRITER", 0x000002c7: "SED_TOPIC_READER",
0x000003c2: "SED_PUB_WRITER", 0x000003c7: "SED_PUB_READER",
0x000004c2: "SED_SUB_WRITER", 0x000004c7: "SED_SUB_READER",
0x000100c2: "SPD_PART_WRITER", 0x000100c7: "SPD_PART_READER",
0x000200c2: "MESSAGE_WRITER", 0x000200c7: "MESSAGE_READER",
# Security Built-in Entity GUIDs
0xff0003c2: "SED_PUB_SEC_WRITER", 0xff0003c7: "SED_PUB_SEC_READER",
0xff0004c2: "SED_SUB_SEC_WRITER", 0xff0004c7: "SED_SUB_SEC_READER",
0xff0200c2: "MSG_SEC_WRITER", 0xff0200c7: "MSG_SEC_READER",
0x000201c2: "MSG_STA_SEC_WRITER", 0x000201c7: "MSG_STA_SEC_READER",
0xff0202c2: "MSG_VOL_SEC_WRITER", 0xff0202c7: "MSG_VOL_SEC_READER"}
ENTITY_ORIGINS = {0x00: "USER", 0x40: "VEND", 0xc0: "BUILTIN"}
ENTITY_KINDS = {
0x00: "UNK", 0x01: "PART",
0x02: "W+K", 0x03: "W-K",
0x04: "R-K", 0x07: "R+K"}

# Convert into a number from the hexadecimal text representation
oid_num = int(oid, 16)
if oid_num & 0x80000000 == 0:
name = oid_names[oid_num] if oid_num in oid_names else oid

# Analyze the entity kind
entity_kind = oid_num & 0xFF
origin = ENTITY_ORIGINS[entity_kind & 0xC0]
kind = ENTITY_KINDS[entity_kind & 0x3F]

if origin == "BUILTIN":
name = BUILTIN_NAMES[oid_num]
elif origin == "USER":
name = kind + "_" + hex(oid_num >> 8)[2:].zfill(6)
else:
kind = oid_num & 0xFF
kind = user_oid_kind[kind] if kind in user_oid_kind else "INV"
num = (oid_num & 0x7FFFF000) >> 13
name = str(num).zfill(2) + "_" + kind
name = origin + "_" + kind + "_" + hex(oid_num >> 8)[2:].zfill(6)
return name


def is_builtin_entity(oid):
"""Get if the OID hex number is for a built-in entity."""
"""Return if the OID hex number is for a built-in entity."""
# More information in get_oid
oid_num = int(oid, 16)
return oid_num & 0x80000000 == 0
return oid_num & 0xC0 == 0xC0


def get_data_packet_name(oid):
"""Return the DATA packet name."""
# More information in get_oid
entity_name = get_oid(oid)
PACKET_NAMES = {
"SED_PUB_WRITER": "DATA(w)", "SED_SUB_WRITER": "DATA(r)",
"SPD_PART_WRITER": "DATA(p)", "MESSAGE_WRITER": "DATA(m)"}
return PACKET_NAMES[entity_name] if entity_name in PACKET_NAMES else "DATA"


def get_topic_name(topic, state):
Expand Down