add support for reading tracelb outputs in sc_warts.py

This commit is contained in:
Rafael Almeida
2017-02-22 14:02:31 -03:00
parent 0e64227dac
commit 3c4fd01cee
+234 -1
View File
@@ -42,7 +42,7 @@ import sys
obj_type = {'NONE' : 0x00, 'LIST' : 0x01, 'CYCLESTART' : 0x02, 'CYCLE' : 0x03,
'CYCLE_STOP': 0x04, 'ADDRESS': 0x05, 'TRACE' : 0x06, 'PING' : 0x07,
'MAGIC' : 0x1205}
'TRACELB': 0x08, 'MAGIC' : 0x1205}
def unpack_uint8_t(b):
return (struct.unpack('B', b[0])[0], 1)
@@ -452,6 +452,237 @@ class WartsTraceHop(WartsBaseObject):
return (ret_string, tot_len+2)
class WartsTracelb(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelb, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.nodes = []
self.links = []
self.flag_defines = [
('listid', unpack_uint32_t),
('cycleid', unpack_uint32_t),
('srcipid', self.read_referenced_address),
('dstipid', self.read_referenced_address),
('timeval', read_timeval),
('srcport', unpack_uint16_t),
('dstport', unpack_uint16_t),
('probesiz', unpack_uint16_t),
('tracelbtyp', unpack_uint8_t),
('firsttl', unpack_uint8_t),
('timeout', unpack_uint8_t),
('minwait', unpack_uint8_t),
('attempts', unpack_uint8_t),
('confidence', unpack_uint8_t),
('iptos', unpack_uint8_t),
('nodec', unpack_uint16_t),
('linkc', unpack_uint16_t),
('probec', unpack_uint32_t),
('probecmax', unpack_uint32_t),
('gaplimit', unpack_uint8_t),
('srcaddr', self.unpack_address),
('dstaddr', self.unpack_address),
('usrid', unpack_uint32_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Flags:", self.flags
print "Nodes:", self.flags['nodec']
print "Links:", self.flags['linkc']
offset = flag_bytes
# Read the nodes
for i in range(self.flags['nodec']):
w = WartsTracelbNode(data[offset:], self.referenced_address, self.verbose)
self.nodes.append(w.flags)
offset += w.offset
# Read the links
for i in range(self.flags['linkc']):
w = WartsTracelbLink(data[offset:], self.referenced_address, self.verbose)
self.links.append({'link': w.flags, 'probe_sets': w.probe_sets})
offset += w.offset
class WartsTracelbNode(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbNode, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('addrgid', self.read_referenced_address),
('flags', unpack_uint8_t),
('linkc', unpack_uint16_t),
('qttl', unpack_uint8_t),
('addr', self.unpack_address),
]
self.offset = self.read_flags()
if self.verbose:
print "Node flags:", self.flags
class WartsTracelbLink(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbLink, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('from', unpack_uint16_t),
('to', unpack_uint16_t),
('hopc', unpack_uint8_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Link flags:", self.flags
self.offset = flag_bytes
self.link = {'flags': self.flags, 'probe_sets': []}
self.probe_sets = []
# Read the probe sets
if 'hopc' in self.flags:
for i in range(self.flags['hopc']):
w = WartsTracelbProbeSet(data[self.offset:], self.referenced_address, self.verbose)
self.offset += w.offset
self.probe_sets = w.probe_set
class WartsTracelbProbeSet(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbProbeSet, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('probec', unpack_uint16_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Probe count:", self.flags['probec']
self.offset = flag_bytes
self.probe_set = []
# Read the probes
for i in range(self.flags['probec']):
w = WartsTracelbProbe(data[self.offset:], self.referenced_address, self.verbose)
self.offset += w.offset
self.probe_set.append({'probe': w.flags, 'replies': w.replies})
class WartsTracelbProbe(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbProbe, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('timeval', read_timeval),
('flowid', unpack_uint16_t),
('ttl', unpack_uint8_t),
('attempt', unpack_uint8_t),
('rxc', unpack_uint16_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Probe flags:", self.flags
self.offset = flag_bytes
self.replies = []
# Read the replies
if 'rxc' in self.flags:
for i in range(self.flags['rxc']):
w = WartsTracelbReply(data[self.offset:], self.referenced_address, self.verbose)
self.offset += w.offset
self.replies.append(w.flags)
class WartsTracelbReply(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbReply, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('rx', read_timeval),
('ipid', unpack_uint16_t),
('ttl', unpack_uint8_t),
('flags', unpack_uint8_t),
('icmptc', unpack_uint16_t),
('tcpflags', unpack_uint8_t),
('icmpext', self.read_icmpext),
('icmpqttl', unpack_uint8_t),
('icmpqtos', unpack_uint8_t),
('fromgid', self.read_referenced_address),
('from', self.unpack_address),
]
self.offset = self.read_flags()
if self.verbose:
print "Reply flags:", self.flags
# TODO: maybe parse_mpls_icmpext and read_icmpext should be
# moved to WartsBaseObject to avoid duplicated code
# both methods are used here and in WartsTraceHop
@staticmethod
# copied blindly/stupidly from scamper/scamper_icmpext.h
def parse_mpls_icmpext(ie):
u32 = struct.unpack('I', ie)[0]
b0 = (u32 >> 0) & 0xFF
b1 = (u32 >> 8) & 0xFF
b2 = (u32 >> 16) & 0xFF
b3 = (u32 >> 24) & 0xFF
mpls_s = b2 & 0x01
#print "MPLS_S:", mpls_s
mpls_ttl = b3
#print "MPLS_TTL:", mpls_ttl
mpls_exp = (b2 >> 1) & 0x07
#print "MPLS_EXP:", mpls_exp
mpls_label = (b0 << 12) + (b1 << 4) + ((b2 >> 4) & 0xFF)
#print "MPLS_Label:", mpls_label
extension = "mpls ext ttl: %d, s: %d, exp: %d, label: %d" % (mpls_ttl, mpls_s, mpls_exp, mpls_label)
return extension
@staticmethod
def read_icmpext(b):
""" read ICMP extension header """
current_byte = 0
(tot_len, bytes_read) = unpack_uint16_t(b[current_byte:current_byte+2])
ret_string = ""
#print "ICMP Extension Total Len:", tot_len
current_byte+=bytes_read
remaining = tot_len
while remaining > 0:
(ie_dl, bytes_read) = unpack_uint16_t(b[current_byte:current_byte+2]) # data length
current_byte+=bytes_read
#print "data len:", ie_dl
(ie_cn, bytes_read) = unpack_uint8_t(b[current_byte:current_byte+1]) # class number
current_byte+=bytes_read
#print "class num:", ie_cn
(ie_ct, bytes_read) = unpack_uint8_t(b[current_byte:current_byte+1]) # class type
current_byte+=bytes_read
#print "class type:", ie_ct
# is MPLS?
if ie_cn == 1 and ie_ct == 1:
ie_dl_read = ie_dl
while ie_dl_read >= 4:
buf = b[current_byte:current_byte+4]
current_byte+=4
ie_dl_read-=4
ret_string += WartsTracelbReply.parse_mpls_icmpext(buf) + "\n"
# we don't understand this type. return a hexdump.
else:
buf = b[current_byte:current_byte+ie_dl]
current_byte+=ie_dl
ret_string += "buf: " + hexdump(buf)
remaining = remaining - 4 - ie_dl
return (ret_string, tot_len+2)
class WartsReader(object):
def __init__(self, wartsfile, verbose=False):
self.address_ref = dict()
@@ -512,6 +743,8 @@ class WartsReader(object):
return WartsTrace(data, refs=self.address_ref, verbose=self.verbose)
elif typ == obj_type['PING']:
return WartsPing(data, refs=self.address_ref, verbose=self.verbose)
elif typ == obj_type['TRACELB']:
return WartsTracelb(data, refs=self.address_ref, verbose=self.verbose)
elif typ == obj_type['ADDRESS']:
self.deprecated_addresses = True
wd = WartsDeprecatedAddress(data, verbose=self.verbose)