Merge pull request #6 from rlcalmeida/master

Add support for reading tracelb
This commit is contained in:
rbeverly
2018-09-04 16:53:27 -07:00
committed by GitHub
+176 -1
View File
@@ -42,7 +42,7 @@ import sys
obj_type = {'NONE' : 0x00, 'LIST' : 0x01, 'CYCLESTART' : 0x02, 'CYCLE' : 0x03,
'CYCLESTOP': 0x04, 'ADDRESS': 0x05, 'TRACE' : 0x06, 'PING' : 0x07,
'MAGIC' : 0x1205}
'TRACELB': 0x08, 'MAGIC' : 0x1205}
def unpack_uint8_t(b):
return (struct.unpack('B', b[0])[0], 1)
@@ -477,6 +477,179 @@ class WartsTraceHop(WartsBaseObject):
return (ret_string, tot_len+2)
class WartsTracelb(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelb, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.nodes = []
self.links = []
self.flag_defines = [
('listid', unpack_uint32_t),
('cycleid', unpack_uint32_t),
('srcipid', self.read_referenced_address),
('dstipid', self.read_referenced_address),
('timeval', read_timeval),
('srcport', unpack_uint16_t),
('dstport', unpack_uint16_t),
('probesiz', unpack_uint16_t),
('tracelbtyp', unpack_uint8_t),
('firsttl', unpack_uint8_t),
('timeout', unpack_uint8_t),
('minwait', unpack_uint8_t),
('attempts', unpack_uint8_t),
('confidence', unpack_uint8_t),
('iptos', unpack_uint8_t),
('nodec', unpack_uint16_t),
('linkc', unpack_uint16_t),
('probec', unpack_uint32_t),
('probecmax', unpack_uint32_t),
('gaplimit', unpack_uint8_t),
('srcaddr', self.unpack_address),
('dstaddr', self.unpack_address),
('usrid', unpack_uint32_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Flags:", self.flags
print "Nodes:", self.flags['nodec']
print "Links:", self.flags['linkc']
offset = flag_bytes
# Read the nodes
for i in range(self.flags['nodec']):
w = WartsTracelbNode(data[offset:], self.referenced_address, self.verbose)
self.nodes.append(w.flags)
offset += w.offset
# Read the links
for i in range(self.flags['linkc']):
w = WartsTracelbLink(data[offset:], self.referenced_address, self.verbose)
self.links.append({'link': w.flags, 'probe_sets': w.probe_sets})
offset += w.offset
class WartsTracelbNode(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbNode, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('addrgid', self.read_referenced_address),
('flags', unpack_uint8_t),
('linkc', unpack_uint16_t),
('qttl', unpack_uint8_t),
('addr', self.unpack_address),
]
self.offset = self.read_flags()
if self.verbose:
print "Node flags:", self.flags
class WartsTracelbLink(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbLink, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('from', unpack_uint16_t),
('to', unpack_uint16_t),
('hopc', unpack_uint8_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Link flags:", self.flags
self.offset = flag_bytes
self.link = {'flags': self.flags, 'probe_sets': []}
self.probe_sets = []
# Read the probe sets
if 'hopc' in self.flags:
for i in range(self.flags['hopc']):
w = WartsTracelbProbeSet(data[self.offset:], self.referenced_address, self.verbose)
self.offset += w.offset
self.probe_sets = w.probe_set
class WartsTracelbProbeSet(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbProbeSet, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('probec', unpack_uint16_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Probe count:", self.flags['probec']
self.offset = flag_bytes
self.probe_set = []
# Read the probes
for i in range(self.flags['probec']):
w = WartsTracelbProbe(data[self.offset:], self.referenced_address, self.verbose)
self.offset += w.offset
self.probe_set.append({'probe': w.flags, 'replies': w.replies})
class WartsTracelbProbe(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbProbe, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('timeval', read_timeval),
('flowid', unpack_uint16_t),
('ttl', unpack_uint8_t),
('attempt', unpack_uint8_t),
('rxc', unpack_uint16_t),
]
flag_bytes = self.read_flags()
if self.verbose:
print "Probe flags:", self.flags
self.offset = flag_bytes
self.replies = []
# Read the replies
if 'rxc' in self.flags:
for i in range(self.flags['rxc']):
w = WartsTracelbReply(data[self.offset:], self.referenced_address, self.verbose)
self.offset += w.offset
self.replies.append(w.flags)
class WartsTracelbReply(WartsBaseObject):
def __init__(self, data, refs=None, verbose=False):
super(WartsTracelbReply, self).__init__(obj_type['TRACELB'], verbose)
if refs:
self.update_ref(refs)
self.data = data
self.flagdata = data
self.flag_defines = [
('rx', read_timeval),
('ipid', unpack_uint16_t),
('ttl', unpack_uint8_t),
('flags', unpack_uint8_t),
('icmptc', unpack_uint16_t),
('tcpflags', unpack_uint8_t),
('icmpext', WartsTraceHop.read_icmpext),
('icmpqttl', unpack_uint8_t),
('icmpqtos', unpack_uint8_t),
('fromgid', self.read_referenced_address),
('from', self.unpack_address),
]
self.offset = self.read_flags()
if self.verbose:
print "Reply flags:", self.flags
class WartsReader(object):
def __init__(self, wartsfile, verbose=False):
self.address_ref = dict()
@@ -539,6 +712,8 @@ class WartsReader(object):
return WartsTrace(data, refs=self.address_ref, verbose=self.verbose)
elif typ == obj_type['PING']:
return WartsPing(data, refs=self.address_ref, verbose=self.verbose)
elif typ == obj_type['TRACELB']:
return WartsTracelb(data, refs=self.address_ref, verbose=self.verbose)
elif typ == obj_type['ADDRESS']:
self.deprecated_addresses = True
wd = WartsDeprecatedAddress(data, verbose=self.verbose)