1
0
mirror of https://github.com/CumulusNetworks/ifupdown2.git synced 2024-05-06 15:54:50 +00:00

Merge branch 'dev' into release/cl-stable

This commit is contained in:
Julien Fortin
2016-08-20 20:48:53 -07:00
3 changed files with 113 additions and 64 deletions

View File

@@ -882,17 +882,21 @@ class bridge(moduleBase):
(running_vids, running_pvid) = self._get_running_vids_n_pvid( (running_vids, running_pvid) = self._get_running_vids_n_pvid(
bportifaceobj.name) bportifaceobj.name)
if running_vids: if not running_vids and not running_pvid:
(vids_to_del, vids_to_add) = \
self._diff_vids(vids_to_add, running_vids)
if not running_pvid:
# There cannot be a no running pvid. # There cannot be a no running pvid.
# It might just not be in our cache: # It might just not be in our cache:
# this can happen if at the time we were # this can happen if at the time we were
# creating the bridge vlan cache, the port # creating the bridge vlan cache, the port
# was not part of the bridge # was not part of the bridge. And we need
# to make sure both vids and pvid is not in
# the cache, to declare that our cache may
# be stale.
running_pvid = 1 running_pvid = 1
running_vids = [1]
if running_vids:
(vids_to_del, vids_to_add) = \
self._diff_vids(vids_to_add, running_vids)
if running_pvid: if running_pvid:
if running_pvid != pvid_int and running_pvid != 0: if running_pvid != pvid_int and running_pvid != 0:

View File

@@ -68,7 +68,7 @@ class NetlinkListener(Thread):
log.info("%s: shutting down" % self) log.info("%s: shutting down" % self)
return return
# Only block for 1 second so we can wake up to see # Only block for 1 second so we can wake up to see if shutdown_event is set
try: try:
(readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1) (readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1)
except Exception as e: except Exception as e:
@@ -80,18 +80,14 @@ class NetlinkListener(Thread):
set_alarm = False set_alarm = False
set_tx_socket_rxed_ack_alarm = False set_tx_socket_rxed_ack_alarm = False
missed_a_packet = False
for s in readable: for s in readable:
data = None data = []
try: try:
data = s.recv(4096) data = s.recv(4096)
# If recv() failed, we missed a packet
except Exception as e: except Exception as e:
log.error('recv() error: ' + str(e)) log.error('recv() error: ' + str(e))
missed_a_packet = True
continue continue
total_length = len(data) total_length = len(data)
@@ -101,30 +97,23 @@ class NetlinkListener(Thread):
(length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN]) (length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN])
if manager.debug_listener: if manager.debug_listener:
log.info('%s: RXed %s seq %d, pid %d, %d bytes (%d total)' % log.debug('%s %s: RXed %s seq %d, pid %d, %d bytes (%d total)' %
(socket_string[s], NetlinkPacket.type_to_string[msgtype], (self, socket_string[s], NetlinkPacket.type_to_string[msgtype],
seq, pid, length, total_length)) seq, pid, length, total_length))
# 99% of the time when we see an ERROR the error code is if msgtype == NLMSG_ERROR:
# zero which means ACK # The error code is a signed negative number.
possible_ack = False error_code = abs(unpack('=i', data[header_LEN:header_LEN+4])[0])
msg = Error(msgtype, True)
if msgtype == NLMSG_DONE: msg.decode_packet(length, flags, seq, pid, data)
possible_ack = True
elif msgtype == NLMSG_ERROR:
# TODO - change this > to = ?
error_code = int(unpack('>H', data[header_LEN:header_LEN+2])[0])
if error_code: if error_code:
log.debug("%s: RXed NLMSG_ERROR code %d" % (socket_string[s], error_code)) log.debug("%s %s: RXed NLMSG_ERROR code %s (%d)" % (self, socket_string[s], msg.error_to_string.get(error_code), error_code))
else:
possible_ack = True
if possible_ack and seq == manager.target_seq and pid == manager.target_pid: if seq == manager.target_seq and pid == manager.target_pid:
if manager.target_seq_pid_debug: if manager.target_seq_pid_debug:
log.debug("%s: Setting RXed ACK alarm for seq %d, pid %d" % log.debug("%s %s: Setting RXed ACK alarm for seq %d, pid %d" %
(socket_string[s], seq, pid)) (self, socket_string[s], seq, pid))
set_tx_socket_rxed_ack_alarm = True set_tx_socket_rxed_ack_alarm = True
# Put the message on the manager's netlinkq # Put the message on the manager's netlinkq
@@ -142,11 +131,11 @@ class NetlinkListener(Thread):
# as a reminder to add support for them. # as a reminder to add support for them.
else: else:
if msgtype in NetlinkPacket.type_to_string: if msgtype in NetlinkPacket.type_to_string:
log.warning('%s: RXed unsupported message %s (type %d)' % log.warning('%s %s: RXed unsupported message %s (type %d)' %
(socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype)) (self, socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype))
else: else:
log.warning('%s: RXed unknown message type %d' % log.warning('%s %s: RXed unknown message type %d' %
(socket_string[s], msgtype)) (self, socket_string[s], msgtype))
# Track the previous PID sequence number for RX and TX sockets # Track the previous PID sequence number for RX and TX sockets
if s == self.rx_socket: if s == self.rx_socket:
@@ -155,8 +144,7 @@ class NetlinkListener(Thread):
prev_seq = manager.tx_socket_prev_seq prev_seq = manager.tx_socket_prev_seq
if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq): if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq):
log.info('%s: Went from seq %d to %d' % (socket_string[s], prev_seq[pid], seq)) log.debug('%s %s: went from seq %d to %d' % (self, socket_string[s], prev_seq[pid], seq))
missed_a_packet = True
prev_seq[pid] = seq prev_seq[pid] = seq
data = data[length:] data = data[length:]
@@ -209,17 +197,25 @@ class NetlinkManagerWithListener(NetlinkManager):
def signal_term_handler(self, signal, frame): def signal_term_handler(self, signal, frame):
log.info("NetlinkManagerWithListener: Caught SIGTERM") log.info("NetlinkManagerWithListener: Caught SIGTERM")
if self.listener:
self.listener.shutdown_event.set()
self.shutdown_flag = True # For NetlinkManager shutdown self.shutdown_flag = True # For NetlinkManager shutdown
self.shutdown_event.set() self.shutdown_event.set()
self.alarm.set() self.alarm.set()
def signal_int_handler(self, signal, frame): def signal_int_handler(self, signal, frame):
log.info("NetlinkManagerWithListener: Caught SIGINT") log.info("NetlinkManagerWithListener: Caught SIGINT")
if self.listener:
self.listener.shutdown_event.set()
self.shutdown_flag = True # For NetlinkManager shutdown self.shutdown_flag = True # For NetlinkManager shutdown
self.shutdown_event.set() self.shutdown_event.set()
self.alarm.set() self.alarm.set()
def tx_nlpacket_ack_on_listener(self, nlpacket): def tx_nlpacket_get_response(self, nlpacket):
""" """
TX the message and wait for an ack TX the message and wait for an ack
""" """
@@ -243,34 +239,38 @@ class NetlinkManagerWithListener(NetlinkManager):
# various netlink message types. Odds are our child class will redefine these # various netlink message types. Odds are our child class will redefine these
# to do more than log a message. # to do more than log a message.
def rx_rtm_newlink(self, msg): def rx_rtm_newlink(self, msg):
log.info("RX RTM_NEWLINK for %s, state %s" % (msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down")) log.debug("RXed RTM_NEWLINK seq %d, pid %d, %d bytes, for %s, state %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
def rx_rtm_dellink(self, msg): def rx_rtm_dellink(self, msg):
log.info("RX RTM_DELLINK for %s, state %s" % (msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down")) log.debug("RXed RTM_DELLINK seq %d, pid %d, %d bytes, for %s, state %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
def rx_rtm_newaddr(self, msg): def rx_rtm_newaddr(self, msg):
log.info("RX RTM_NEWADDR for %s/%d on %s" % log.debug("RXed RTM_NEWADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
(msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex))) (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_deladdr(self, msg): def rx_rtm_deladdr(self, msg):
log.info("RX RTM_DELADDR for %s/%d on %s" % log.debug("RXed RTM_DELADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
(msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex))) (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_newneigh(self, msg): def rx_rtm_newneigh(self, msg):
log.info("RX RTM_NEWNEIGH for %s on %s" % (msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex))) log.debug("RXed RTM_NEWNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_delneigh(self, msg): def rx_rtm_delneigh(self, msg):
log.info("RX RTM_DELNEIGH for %s on %s" % (msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex))) log.debug("RXed RTM_DELNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
(msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
def rx_rtm_newroute(self, msg): def rx_rtm_newroute(self, msg):
log.info("RX RTM_NEWROUTE for %s%s" % log.debug("RXed RTM_NEWROUTE seq %d, pid %d, %d bytes, for %s%s" %
(msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index))) (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
def rx_rtm_delroute(self, msg): def rx_rtm_delroute(self, msg):
log.info("RX RTM_DELROUTE for %s%s" % log.debug("RXed RTM_DELROUTE seq %d, pid %d, %d bytes, for %s%s" %
(msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index))) (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
# Note that tx_nlpacket_ack_on_listener will block until NetlinkListener has RXed # Note that tx_nlpacket_get_response will block until NetlinkListener has RXed
# an Ack/DONE for the message we TXed # an Ack/DONE for the message we TXed
def get_all_addresses(self): def get_all_addresses(self):
family = socket.AF_UNSPEC family = socket.AF_UNSPEC
@@ -284,7 +284,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug: if debug:
self.debug_seq_pid[(addr.seq, addr.pid)] = True self.debug_seq_pid[(addr.seq, addr.pid)] = True
self.tx_nlpacket_ack_on_listener(addr) self.tx_nlpacket_get_response(addr)
def get_all_links(self): def get_all_links(self):
family = socket.AF_UNSPEC family = socket.AF_UNSPEC
@@ -298,7 +298,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug: if debug:
self.debug_seq_pid[(link.seq, link.pid)] = True self.debug_seq_pid[(link.seq, link.pid)] = True
self.tx_nlpacket_ack_on_listener(link) self.tx_nlpacket_get_response(link)
def get_all_neighbors(self): def get_all_neighbors(self):
family = socket.AF_UNSPEC family = socket.AF_UNSPEC
@@ -312,7 +312,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug: if debug:
self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True
self.tx_nlpacket_ack_on_listener(neighbor) self.tx_nlpacket_get_response(neighbor)
def get_all_routes(self): def get_all_routes(self):
family = socket.AF_UNSPEC family = socket.AF_UNSPEC
@@ -326,7 +326,7 @@ class NetlinkManagerWithListener(NetlinkManager):
if debug: if debug:
self.debug_seq_pid[(route.seq, route.pid)] = True self.debug_seq_pid[(route.seq, route.pid)] = True
self.tx_nlpacket_ack_on_listener(route) self.tx_nlpacket_get_response(route)
def nested_attributes_match(self, msg, attr_filter): def nested_attributes_match(self, msg, attr_filter):
""" """

View File

@@ -15,7 +15,11 @@ class NetlinkError(Exception):
pass pass
class NetlinkNoAddressError(Exception): class NetlinkNoAddressError(NetlinkError):
pass
class NetlinkInterruptedSystemCall(NetlinkError):
pass pass
@@ -137,13 +141,15 @@ class NetlinkManager(object):
# packet via the decode_packet call...so avoid printing # packet via the decode_packet call...so avoid printing
# two messages for one packet. # two messages for one packet.
if not nlpacket.debug: if not nlpacket.debug:
log.debug("TXed %12s, pid %d, seq %d, %d bytes" % log.info("TXed %12s, pid %d, seq %d, %d bytes" %
(nlpacket.get_type_string(), nlpacket.pid, nlpacket.seq, nlpacket.length)) (nlpacket.get_type_string(), nlpacket.pid, nlpacket.seq, nlpacket.length))
header_PACK = NetlinkPacket.header_PACK header_PACK = NetlinkPacket.header_PACK
header_LEN = NetlinkPacket.header_LEN header_LEN = NetlinkPacket.header_LEN
null_read = 0 null_read = 0
MAX_NULL_READS = 30 nle_intr_count = 0
MAX_NULL_READS = 3
MAX_ERROR_NLE_INTR = 3
msgs = [] msgs = []
# Now listen to our socket and wait for the reply # Now listen to our socket and wait for the reply
@@ -154,21 +160,51 @@ class NetlinkManager(object):
return msgs return msgs
# Only block for 1 second so we can wake up to see if self.shutdown_flag is True # Only block for 1 second so we can wake up to see if self.shutdown_flag is True
try:
(readable, writeable, exceptional) = select([self.tx_socket, ], [], [self.tx_socket, ], 1) (readable, writeable, exceptional) = select([self.tx_socket, ], [], [self.tx_socket, ], 1)
except Exception as e:
# 4 is Interrupted system call
if isinstance(e.args, tuple) and e[0] == 4:
nle_intr_count += 1
log.info("select() Interrupted system call %d/%d" % (nle_intr_count, MAX_ERROR_NLE_INTR))
if not readable: if nle_intr_count >= MAX_ERROR_NLE_INTR:
raise NetlinkInterruptedSystemCall(error_str)
else:
continue
else:
raise
if readable:
null_read = 0
else:
null_read += 1 null_read += 1
# Safety net to make sure we do not spend too much time in # Safety net to make sure we do not spend too much time in
# this while True loop # this while True loop
if null_read >= MAX_NULL_READS: if null_read >= MAX_NULL_READS:
log.warning('Socket was not readable for %d attempts' % null_read) log.info('Socket was not readable for %d attempts' % null_read)
return msgs return msgs
else:
continue continue
for s in readable: for s in readable:
data = []
try:
data = s.recv(4096) data = s.recv(4096)
except Exception as e:
# 4 is Interrupted system call
if isinstance(e.args, tuple) and e[0] == 4:
nle_intr_count += 1
log.info("%s: recv() Interrupted system call %d/%d" % (s, nle_intr_count, MAX_ERROR_NLE_INTR))
if nle_intr_count >= MAX_ERROR_NLE_INTR:
raise NetlinkInterruptedSystemCall(error_str)
else:
continue
else:
raise
if not data: if not data:
log.info('RXed zero length data, the socket is closed') log.info('RXed zero length data, the socket is closed')
@@ -207,12 +243,20 @@ class NetlinkManager(object):
# 0 is NLE_SUCCESS...everything else is a true error # 0 is NLE_SUCCESS...everything else is a true error
if error_code: if error_code:
error_code_str = msg.error_to_string.get(error_code) error_code_str = msg.error_to_string.get(error_code)
if error_code_str != 'None': if error_code_str != 'None':
error_str = 'Operation failed with \'%s\' (%s)' % (error_code_str, debug_str) error_str = 'Operation failed with \'%s\' (%s)' % (error_code_str, debug_str)
else: else:
error_str = 'Operation failed with code %s (%s)' % (error_code, debug_str) error_str = 'Operation failed with code %s (%s)' % (error_code, debug_str)
if error_code == Error.NLE_NOADDR: if error_code == Error.NLE_NOADDR:
raise NetlinkNoAddressError(error_str) raise NetlinkNoAddressError(error_str)
elif error_code == Error.NLE_INTR:
nle_intr_count += 1
log.info("%s: RXed NLE_INTR Interrupted system call %d/%d" % (s, nle_intr_count, MAX_ERROR_NLE_INTR))
if nle_intr_count >= MAX_ERROR_NLE_INTR:
raise NetlinkInterruptedSystemCall(error_str)
else: else:
if error_code_str == 'None': if error_code_str == 'None':
try: try:
@@ -227,6 +271,7 @@ class NetlinkManager(object):
# No ACK...create a nlpacket object and append it to msgs # No ACK...create a nlpacket object and append it to msgs
else: else:
nle_intr_count = 0
# If debugs are enabled we will print the contents of the # If debugs are enabled we will print the contents of the
# packet via the decode_packet call...so avoid printing # packet via the decode_packet call...so avoid printing