1
0
mirror of https://github.com/CumulusNetworks/ifupdown2.git synced 2024-05-06 15:54:50 +00:00

nlmanager: rdnbrd "Interrupted system call" traceback in nlmanager

Signed-off-by: Daniel Walton <dwalton@cumulusnetworks.com>
Reviewed-by:   roopa@cumulusnetworks.com
Ticket: CM-12487

Signed-off-by: Julien Fortin <julien@cumulusnetworks.com>
This commit is contained in:
Julien Fortin
2016-08-20 20:43:12 -07:00
committed by Nikhil
parent 5859b3228e
commit cee2e13ffb
2 changed files with 103 additions and 58 deletions

View File

@@ -68,7 +68,7 @@ class NetlinkListener(Thread):
log.info("%s: shutting down" % self)
return
# Only block for 1 second so we can wake up to see
# Only block for 1 second so we can wake up to see if shutdown_event is set
try:
(readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1)
except Exception as e:
@@ -80,18 +80,14 @@ class NetlinkListener(Thread):
set_alarm = False
set_tx_socket_rxed_ack_alarm = False
missed_a_packet = False
for s in readable:
data = None
data = []
try:
data = s.recv(4096)
# If recv() failed, we missed a packet
except Exception as e:
log.error('recv() error: ' + str(e))
missed_a_packet = True
continue
total_length = len(data)
@@ -101,30 +97,23 @@ class NetlinkListener(Thread):
(length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN])
if manager.debug_listener:
log.info('%s: RXed %s seq %d, pid %d, %d bytes (%d total)' %
(socket_string[s], NetlinkPacket.type_to_string[msgtype],
seq, pid, length, total_length))
log.debug('%s %s: RXed %s seq %d, pid %d, %d bytes (%d total)' %
(self, socket_string[s], NetlinkPacket.type_to_string[msgtype],
seq, pid, length, total_length))
# 99% of the time when we see an ERROR the error code is
# zero which means ACK
possible_ack = False
if msgtype == NLMSG_DONE:
possible_ack = True
elif msgtype == NLMSG_ERROR:
# TODO - change this > to = ?
error_code = int(unpack('>H', data[header_LEN:header_LEN+2])[0])
if msgtype == NLMSG_ERROR:
# The error code is a signed negative number.
error_code = abs(unpack('=i', data[header_LEN:header_LEN+4])[0])
msg = Error(msgtype, True)
msg.decode_packet(length, flags, seq, pid, data)
if error_code:
log.debug("%s: RXed NLMSG_ERROR code %d" % (socket_string[s], error_code))
else:
possible_ack = True
log.debug("%s %s: RXed NLMSG_ERROR code %s (%d)" % (self, socket_string[s], msg.error_to_string.get(error_code), error_code))
if possible_ack and seq == manager.target_seq and pid == manager.target_pid:
if seq == manager.target_seq and pid == manager.target_pid:
if manager.target_seq_pid_debug:
log.debug("%s: Setting RXed ACK alarm for seq %d, pid %d" %
(socket_string[s], seq, pid))
log.debug("%s %s: Setting RXed ACK alarm for seq %d, pid %d" %
(self, socket_string[s], seq, pid))
set_tx_socket_rxed_ack_alarm = True
# Put the message on the manager's netlinkq
@@ -142,11 +131,11 @@ class NetlinkListener(Thread):
# as a reminder to add support for them.
else:
if msgtype in NetlinkPacket.type_to_string:
log.warning('%s: RXed unsupported message %s (type %d)' %
(socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype))
log.warning('%s %s: RXed unsupported message %s (type %d)' %
(self, socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype))
else:
log.warning('%s: RXed unknown message type %d' %
(socket_string[s], msgtype))
log.warning('%s %s: RXed unknown message type %d' %
(self, socket_string[s], msgtype))
# Track the previous PID sequence number for RX and TX sockets
if s == self.rx_socket:
@@ -155,8 +144,7 @@ class NetlinkListener(Thread):
prev_seq = manager.tx_socket_prev_seq
if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq):
log.info('%s: Went from seq %d to %d' % (socket_string[s], prev_seq[pid], seq))
missed_a_packet = True
log.debug('%s %s: went from seq %d to %d' % (self, socket_string[s], prev_seq[pid], seq))
prev_seq[pid] = seq
data = data[length:]
@@ -209,17 +197,25 @@ class NetlinkManagerWithListener(NetlinkManager):
def signal_term_handler(self, signal, frame):
log.info("NetlinkManagerWithListener: Caught SIGTERM")
if self.listener:
self.listener.shutdown_event.set()
self.shutdown_flag = True # For NetlinkManager shutdown
self.shutdown_event.set()
self.alarm.set()
def signal_int_handler(self, signal, frame):
log.info("NetlinkManagerWithListener: Caught SIGINT")
if self.listener:
self.listener.shutdown_event.set()
self.shutdown_flag = True # For NetlinkManager shutdown
self.shutdown_event.set()
self.alarm.set()
def tx_nlpacket_ack_on_listener(self, nlpacket):
def tx_nlpacket_get_response(self, nlpacket):
"""
TX the message and wait for an ack
"""
@@ -243,34 +239,38 @@ class NetlinkManagerWithListener(NetlinkManager):
# various netlink message types. Odds are our child class will redefine these
# to do more than log a message.
def rx_rtm_newlink(self, msg):
log.info("RX RTM_NEWLINK for %s, state %s" % (msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
log.debug("RXed RTM_NEWLINK seq %d, pid %d, %d bytes, for %s, state %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
def rx_rtm_dellink(self, msg):
log.info("RX RTM_DELLINK for %s, state %s" % (msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
log.debug("RXed RTM_DELLINK seq %d, pid %d, %d bytes, for %s, state %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
def rx_rtm_newaddr(self, msg):
log.info("RX RTM_NEWADDR for %s/%d on %s" %
(msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
log.debug("RXed RTM_NEWADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_deladdr(self, msg):
log.info("RX RTM_DELADDR for %s/%d on %s" %
(msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
log.debug("RXed RTM_DELADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_newneigh(self, msg):
log.info("RX RTM_NEWNEIGH for %s on %s" % (msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
log.debug("RXed RTM_NEWNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_delneigh(self, msg):
log.info("RX RTM_DELNEIGH for %s on %s" % (msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
log.debug("RXed RTM_DELNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_newroute(self, msg):
log.info("RX RTM_NEWROUTE for %s%s" %
(msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
log.debug("RXed RTM_NEWROUTE seq %d, pid %d, %d bytes, for %s%s" %
(msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
def rx_rtm_delroute(self, msg):
log.info("RX RTM_DELROUTE for %s%s" %
(msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
log.debug("RXed RTM_DELROUTE seq %d, pid %d, %d bytes, for %s%s" %
(msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
# Note that tx_nlpacket_ack_on_listener will block until NetlinkListener has RXed
# Note that tx_nlpacket_get_response will block until NetlinkListener has RXed
# an Ack/DONE for the message we TXed
def get_all_addresses(self):
family = socket.AF_UNSPEC
@@ -284,7 +284,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug:
self.debug_seq_pid[(addr.seq, addr.pid)] = True
self.tx_nlpacket_ack_on_listener(addr)
self.tx_nlpacket_get_response(addr)
def get_all_links(self):
family = socket.AF_UNSPEC
@@ -298,7 +298,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug:
self.debug_seq_pid[(link.seq, link.pid)] = True
self.tx_nlpacket_ack_on_listener(link)
self.tx_nlpacket_get_response(link)
def get_all_neighbors(self):
family = socket.AF_UNSPEC
@@ -312,7 +312,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug:
self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True
self.tx_nlpacket_ack_on_listener(neighbor)
self.tx_nlpacket_get_response(neighbor)
def get_all_routes(self):
family = socket.AF_UNSPEC
@@ -326,7 +326,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug:
self.debug_seq_pid[(route.seq, route.pid)] = True
self.tx_nlpacket_ack_on_listener(route)
self.tx_nlpacket_get_response(route)
def nested_attributes_match(self, msg, attr_filter):
"""

View File

@@ -15,7 +15,11 @@ class NetlinkError(Exception):
pass
class NetlinkNoAddressError(Exception):
class NetlinkNoAddressError(NetlinkError):
pass
class NetlinkInterruptedSystemCall(NetlinkError):
pass
@@ -137,13 +141,15 @@ class NetlinkManager(object):
# packet via the decode_packet call...so avoid printing
# two messages for one packet.
if not nlpacket.debug:
log.debug("TXed %12s, pid %d, seq %d, %d bytes" %
log.info("TXed %12s, pid %d, seq %d, %d bytes" %
(nlpacket.get_type_string(), nlpacket.pid, nlpacket.seq, nlpacket.length))
header_PACK = NetlinkPacket.header_PACK
header_LEN = NetlinkPacket.header_LEN
null_read = 0
MAX_NULL_READS = 30
nle_intr_count = 0
MAX_NULL_READS = 3
MAX_ERROR_NLE_INTR = 3
msgs = []
# Now listen to our socket and wait for the reply
@@ -154,21 +160,51 @@ class NetlinkManager(object):
return msgs
# Only block for 1 second so we can wake up to see if self.shutdown_flag is True
(readable, writeable, exceptional) = select([self.tx_socket, ], [], [self.tx_socket, ], 1)
try:
(readable, writeable, exceptional) = select([self.tx_socket, ], [], [self.tx_socket, ], 1)
except Exception as e:
# 4 is Interrupted system call
if isinstance(e.args, tuple) and e[0] == 4:
nle_intr_count += 1
log.info("select() Interrupted system call %d/%d" % (nle_intr_count, MAX_ERROR_NLE_INTR))
if not readable:
if nle_intr_count >= MAX_ERROR_NLE_INTR:
raise NetlinkInterruptedSystemCall(error_str)
else:
continue
else:
raise
if readable:
null_read = 0
else:
null_read += 1
# Safety net to make sure we do not spend too much time in
# this while True loop
if null_read >= MAX_NULL_READS:
log.warning('Socket was not readable for %d attempts' % null_read)
log.info('Socket was not readable for %d attempts' % null_read)
return msgs
continue
else:
continue
for s in readable:
data = s.recv(4096)
data = []
try:
data = s.recv(4096)
except Exception as e:
# 4 is Interrupted system call
if isinstance(e.args, tuple) and e[0] == 4:
nle_intr_count += 1
log.info("%s: recv() Interrupted system call %d/%d" % (s, nle_intr_count, MAX_ERROR_NLE_INTR))
if nle_intr_count >= MAX_ERROR_NLE_INTR:
raise NetlinkInterruptedSystemCall(error_str)
else:
continue
else:
raise
if not data:
log.info('RXed zero length data, the socket is closed')
@@ -207,12 +243,20 @@ class NetlinkManager(object):
# 0 is NLE_SUCCESS...everything else is a true error
if error_code:
error_code_str = msg.error_to_string.get(error_code)
if error_code_str != 'None':
error_str = 'Operation failed with \'%s\' (%s)' % (error_code_str, debug_str)
else:
error_str = 'Operation failed with code %s (%s)' % (error_code, debug_str)
if error_code == Error.NLE_NOADDR:
raise NetlinkNoAddressError(error_str)
elif error_code == Error.NLE_INTR:
nle_intr_count += 1
log.info("%s: RXed NLE_INTR Interrupted system call %d/%d" % (s, nle_intr_count, MAX_ERROR_NLE_INTR))
if nle_intr_count >= MAX_ERROR_NLE_INTR:
raise NetlinkInterruptedSystemCall(error_str)
else:
if error_code_str == 'None':
try:
@@ -227,6 +271,7 @@ class NetlinkManager(object):
# No ACK...create a nlpacket object and append it to msgs
else:
nle_intr_count = 0
# If debugs are enabled we will print the contents of the
# packet via the decode_packet call...so avoid printing