From 0383fcbb48b742d114d27dfa90d6e91192107799 Mon Sep 17 00:00:00 2001 From: Roopa Prabhu Date: Fri, 19 Aug 2016 11:09:57 -0700 Subject: [PATCH 1/2] addons: bridge: fix running_vids value when cache is stale Ticket: CM-12552 Reviewed By: julien, nikhil Testing Done: tested with failing config with bridge-access 1 This is similar to the fix done in the below commit for pvid: "5061730ea5bf ("addons: bridge: fix default pvid handling in cases where cache is stale")" easier steps to reproduce: - have a vlan aware bridge with more than one ports - add 'bridge-access 1' to one of the ports - boot the box with the config - check that the vlans are fine - ifdown - ifreload -a - the interface with bridge_access 1 does not have the pvid flag on vlan 1 This patch makes sure we assume the right running vid and pvid value ie [1] and 1 if the cache returns no values. vid = [1] and pvid = 1 are the kernel default/initial values for a port. Signed-off-by: Roopa Prabhu --- addons/bridge.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/addons/bridge.py b/addons/bridge.py index b9b7d5e..fe51ef3 100644 --- a/addons/bridge.py +++ b/addons/bridge.py @@ -882,17 +882,21 @@ class bridge(moduleBase): (running_vids, running_pvid) = self._get_running_vids_n_pvid( bportifaceobj.name) - if running_vids: - (vids_to_del, vids_to_add) = \ - self._diff_vids(vids_to_add, running_vids) - - if not running_pvid: + if not running_vids and not running_pvid: # There cannot be a no running pvid. # It might just not be in our cache: # this can happen if at the time we were # creating the bridge vlan cache, the port - # was not part of the bridge + # was not part of the bridge. And we need + # to make sure both vids and pvid is not in + # the cache, to declare that our cache may + # be stale. running_pvid = 1 + running_vids = [1] + + if running_vids: + (vids_to_del, vids_to_add) = \ + self._diff_vids(vids_to_add, running_vids) if running_pvid: if running_pvid != pvid_int and running_pvid != 0: From d3d4f288cb5a6645e3f8bf1fe659aee24b20786f Mon Sep 17 00:00:00 2001 From: Julien Fortin Date: Sat, 20 Aug 2016 20:43:12 -0700 Subject: [PATCH 2/2] nlmanager: rdnbrd "Interrupted system call" traceback in nlmanager Signed-off-by: Daniel Walton Reviewed-by: roopa@cumulusnetworks.com Ticket: CM-12487 Signed-off-by: Julien Fortin --- nlmanager/nllistener.py | 98 ++++++++++++++++++++--------------------- nlmanager/nlmanager.py | 63 ++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 58 deletions(-) diff --git a/nlmanager/nllistener.py b/nlmanager/nllistener.py index 4e6c190..802f7d6 100644 --- a/nlmanager/nllistener.py +++ b/nlmanager/nllistener.py @@ -68,7 +68,7 @@ class NetlinkListener(Thread): log.info("%s: shutting down" % self) return - # Only block for 1 second so we can wake up to see + # Only block for 1 second so we can wake up to see if shutdown_event is set try: (readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1) except Exception as e: @@ -80,18 +80,14 @@ class NetlinkListener(Thread): set_alarm = False set_tx_socket_rxed_ack_alarm = False - missed_a_packet = False for s in readable: - data = None + data = [] try: data = s.recv(4096) - - # If recv() failed, we missed a packet except Exception as e: log.error('recv() error: ' + str(e)) - missed_a_packet = True continue total_length = len(data) @@ -101,30 +97,23 @@ class NetlinkListener(Thread): (length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN]) if manager.debug_listener: - log.info('%s: RXed %s seq %d, pid %d, %d bytes (%d total)' % - (socket_string[s], NetlinkPacket.type_to_string[msgtype], - seq, pid, length, total_length)) + log.debug('%s %s: RXed %s seq %d, pid %d, %d bytes (%d total)' % + (self, socket_string[s], NetlinkPacket.type_to_string[msgtype], + seq, pid, length, total_length)) - # 99% of the time when we see an ERROR the error code is - # zero which means ACK - possible_ack = False - - if msgtype == NLMSG_DONE: - possible_ack = True - - elif msgtype == NLMSG_ERROR: - # TODO - change this > to = ? - error_code = int(unpack('>H', data[header_LEN:header_LEN+2])[0]) + if msgtype == NLMSG_ERROR: + # The error code is a signed negative number. + error_code = abs(unpack('=i', data[header_LEN:header_LEN+4])[0]) + msg = Error(msgtype, True) + msg.decode_packet(length, flags, seq, pid, data) if error_code: - log.debug("%s: RXed NLMSG_ERROR code %d" % (socket_string[s], error_code)) - else: - possible_ack = True + log.debug("%s %s: RXed NLMSG_ERROR code %s (%d)" % (self, socket_string[s], msg.error_to_string.get(error_code), error_code)) - if possible_ack and seq == manager.target_seq and pid == manager.target_pid: + if seq == manager.target_seq and pid == manager.target_pid: if manager.target_seq_pid_debug: - log.debug("%s: Setting RXed ACK alarm for seq %d, pid %d" % - (socket_string[s], seq, pid)) + log.debug("%s %s: Setting RXed ACK alarm for seq %d, pid %d" % + (self, socket_string[s], seq, pid)) set_tx_socket_rxed_ack_alarm = True # Put the message on the manager's netlinkq @@ -142,11 +131,11 @@ class NetlinkListener(Thread): # as a reminder to add support for them. else: if msgtype in NetlinkPacket.type_to_string: - log.warning('%s: RXed unsupported message %s (type %d)' % - (socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype)) + log.warning('%s %s: RXed unsupported message %s (type %d)' % + (self, socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype)) else: - log.warning('%s: RXed unknown message type %d' % - (socket_string[s], msgtype)) + log.warning('%s %s: RXed unknown message type %d' % + (self, socket_string[s], msgtype)) # Track the previous PID sequence number for RX and TX sockets if s == self.rx_socket: @@ -155,8 +144,7 @@ class NetlinkListener(Thread): prev_seq = manager.tx_socket_prev_seq if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq): - log.info('%s: Went from seq %d to %d' % (socket_string[s], prev_seq[pid], seq)) - missed_a_packet = True + log.debug('%s %s: went from seq %d to %d' % (self, socket_string[s], prev_seq[pid], seq)) prev_seq[pid] = seq data = data[length:] @@ -209,17 +197,25 @@ class NetlinkManagerWithListener(NetlinkManager): def signal_term_handler(self, signal, frame): log.info("NetlinkManagerWithListener: Caught SIGTERM") + + if self.listener: + self.listener.shutdown_event.set() + self.shutdown_flag = True # For NetlinkManager shutdown self.shutdown_event.set() self.alarm.set() def signal_int_handler(self, signal, frame): log.info("NetlinkManagerWithListener: Caught SIGINT") + + if self.listener: + self.listener.shutdown_event.set() + self.shutdown_flag = True # For NetlinkManager shutdown self.shutdown_event.set() self.alarm.set() - def tx_nlpacket_ack_on_listener(self, nlpacket): + def tx_nlpacket_get_response(self, nlpacket): """ TX the message and wait for an ack """ @@ -243,34 +239,38 @@ class NetlinkManagerWithListener(NetlinkManager): # various netlink message types. Odds are our child class will redefine these # to do more than log a message. def rx_rtm_newlink(self, msg): - log.info("RX RTM_NEWLINK for %s, state %s" % (msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down")) + log.debug("RXed RTM_NEWLINK seq %d, pid %d, %d bytes, for %s, state %s" % + (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down")) def rx_rtm_dellink(self, msg): - log.info("RX RTM_DELLINK for %s, state %s" % (msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down")) + log.debug("RXed RTM_DELLINK seq %d, pid %d, %d bytes, for %s, state %s" % + (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down")) def rx_rtm_newaddr(self, msg): - log.info("RX RTM_NEWADDR for %s/%d on %s" % - (msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex))) + log.debug("RXed RTM_NEWADDR seq %d, pid %d, %d bytes, for %s/%d on %s" % + (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex))) def rx_rtm_deladdr(self, msg): - log.info("RX RTM_DELADDR for %s/%d on %s" % - (msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex))) + log.debug("RXed RTM_DELADDR seq %d, pid %d, %d bytes, for %s/%d on %s" % + (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex))) def rx_rtm_newneigh(self, msg): - log.info("RX RTM_NEWNEIGH for %s on %s" % (msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex))) + log.debug("RXed RTM_NEWNEIGH seq %d, pid %d, %d bytes, for %s on %s" % + (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex))) def rx_rtm_delneigh(self, msg): - log.info("RX RTM_DELNEIGH for %s on %s" % (msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex))) + log.debug("RXed RTM_DELNEIGH seq %d, pid %d, %d bytes, for %s on %s" % + (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex))) def rx_rtm_newroute(self, msg): - log.info("RX RTM_NEWROUTE for %s%s" % - (msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index))) + log.debug("RXed RTM_NEWROUTE seq %d, pid %d, %d bytes, for %s%s" % + (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index))) def rx_rtm_delroute(self, msg): - log.info("RX RTM_DELROUTE for %s%s" % - (msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index))) + log.debug("RXed RTM_DELROUTE seq %d, pid %d, %d bytes, for %s%s" % + (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index))) - # Note that tx_nlpacket_ack_on_listener will block until NetlinkListener has RXed + # Note that tx_nlpacket_get_response will block until NetlinkListener has RXed # an Ack/DONE for the message we TXed def get_all_addresses(self): family = socket.AF_UNSPEC @@ -284,7 +284,7 @@ class NetlinkManagerWithListener(NetlinkManager): if debug: self.debug_seq_pid[(addr.seq, addr.pid)] = True - self.tx_nlpacket_ack_on_listener(addr) + self.tx_nlpacket_get_response(addr) def get_all_links(self): family = socket.AF_UNSPEC @@ -298,7 +298,7 @@ class NetlinkManagerWithListener(NetlinkManager): if debug: self.debug_seq_pid[(link.seq, link.pid)] = True - self.tx_nlpacket_ack_on_listener(link) + self.tx_nlpacket_get_response(link) def get_all_neighbors(self): family = socket.AF_UNSPEC @@ -312,7 +312,7 @@ class NetlinkManagerWithListener(NetlinkManager): if debug: self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True - self.tx_nlpacket_ack_on_listener(neighbor) + self.tx_nlpacket_get_response(neighbor) def get_all_routes(self): family = socket.AF_UNSPEC @@ -326,7 +326,7 @@ class NetlinkManagerWithListener(NetlinkManager): if debug: self.debug_seq_pid[(route.seq, route.pid)] = True - self.tx_nlpacket_ack_on_listener(route) + self.tx_nlpacket_get_response(route) def nested_attributes_match(self, msg, attr_filter): """ diff --git a/nlmanager/nlmanager.py b/nlmanager/nlmanager.py index d8daab0..72e0295 100644 --- a/nlmanager/nlmanager.py +++ b/nlmanager/nlmanager.py @@ -15,7 +15,11 @@ class NetlinkError(Exception): pass -class NetlinkNoAddressError(Exception): +class NetlinkNoAddressError(NetlinkError): + pass + + +class NetlinkInterruptedSystemCall(NetlinkError): pass @@ -137,13 +141,15 @@ class NetlinkManager(object): # packet via the decode_packet call...so avoid printing # two messages for one packet. if not nlpacket.debug: - log.debug("TXed %12s, pid %d, seq %d, %d bytes" % + log.info("TXed %12s, pid %d, seq %d, %d bytes" % (nlpacket.get_type_string(), nlpacket.pid, nlpacket.seq, nlpacket.length)) header_PACK = NetlinkPacket.header_PACK header_LEN = NetlinkPacket.header_LEN null_read = 0 - MAX_NULL_READS = 30 + nle_intr_count = 0 + MAX_NULL_READS = 3 + MAX_ERROR_NLE_INTR = 3 msgs = [] # Now listen to our socket and wait for the reply @@ -154,21 +160,51 @@ class NetlinkManager(object): return msgs # Only block for 1 second so we can wake up to see if self.shutdown_flag is True - (readable, writeable, exceptional) = select([self.tx_socket, ], [], [self.tx_socket, ], 1) + try: + (readable, writeable, exceptional) = select([self.tx_socket, ], [], [self.tx_socket, ], 1) + except Exception as e: + # 4 is Interrupted system call + if isinstance(e.args, tuple) and e[0] == 4: + nle_intr_count += 1 + log.info("select() Interrupted system call %d/%d" % (nle_intr_count, MAX_ERROR_NLE_INTR)) - if not readable: + if nle_intr_count >= MAX_ERROR_NLE_INTR: + raise NetlinkInterruptedSystemCall(error_str) + else: + continue + else: + raise + + if readable: + null_read = 0 + else: null_read += 1 # Safety net to make sure we do not spend too much time in # this while True loop if null_read >= MAX_NULL_READS: - log.warning('Socket was not readable for %d attempts' % null_read) + log.info('Socket was not readable for %d attempts' % null_read) return msgs - - continue + else: + continue for s in readable: - data = s.recv(4096) + data = [] + + try: + data = s.recv(4096) + except Exception as e: + # 4 is Interrupted system call + if isinstance(e.args, tuple) and e[0] == 4: + nle_intr_count += 1 + log.info("%s: recv() Interrupted system call %d/%d" % (s, nle_intr_count, MAX_ERROR_NLE_INTR)) + + if nle_intr_count >= MAX_ERROR_NLE_INTR: + raise NetlinkInterruptedSystemCall(error_str) + else: + continue + else: + raise if not data: log.info('RXed zero length data, the socket is closed') @@ -207,12 +243,20 @@ class NetlinkManager(object): # 0 is NLE_SUCCESS...everything else is a true error if error_code: error_code_str = msg.error_to_string.get(error_code) + if error_code_str != 'None': error_str = 'Operation failed with \'%s\' (%s)' % (error_code_str, debug_str) else: error_str = 'Operation failed with code %s (%s)' % (error_code, debug_str) + if error_code == Error.NLE_NOADDR: raise NetlinkNoAddressError(error_str) + elif error_code == Error.NLE_INTR: + nle_intr_count += 1 + log.info("%s: RXed NLE_INTR Interrupted system call %d/%d" % (s, nle_intr_count, MAX_ERROR_NLE_INTR)) + + if nle_intr_count >= MAX_ERROR_NLE_INTR: + raise NetlinkInterruptedSystemCall(error_str) else: if error_code_str == 'None': try: @@ -227,6 +271,7 @@ class NetlinkManager(object): # No ACK...create a nlpacket object and append it to msgs else: + nle_intr_count = 0 # If debugs are enabled we will print the contents of the # packet via the decode_packet call...so avoid printing