Files
2024-04-29 17:29:42 +09:00

689 lines
25 KiB
Python

# Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import json
from itertools import chain
from threading import Thread
import subprocess
import os
import netaddr
import toml
import yaml
from lib.base import (
community_str,
wait_for_completion,
BGPContainer,
CmdBuffer,
BGP_ATTR_TYPE_AS_PATH,
BGP_ATTR_TYPE_NEXT_HOP,
BGP_ATTR_TYPE_MULTI_EXIT_DISC,
BGP_ATTR_TYPE_LOCAL_PREF,
BGP_ATTR_TYPE_COMMUNITIES,
BGP_ATTR_TYPE_MP_REACH_NLRI,
GRACEFUL_RESTART_TIME,
LONG_LIVED_GRACEFUL_RESTART_TIME,
BGP_FSM_IDLE,
BGP_FSM_ACTIVE,
BGP_FSM_ESTABLISHED,
yellow,
indent,
local,
)
def extract_path_attribute(path, typ):
for a in path['attrs']:
if a['type'] == typ:
return a
return None
class GoBGPContainer(BGPContainer):
SHARED_VOLUME = '/root/shared_volume'
QUAGGA_VOLUME = '/etc/quagga'
def __init__(self, name, asn, router_id, ctn_image_name='osrg/gobgp',
log_level='debug', zebra=False, config_format='toml',
zapi_version=2, bgp_config=None, ospfd_config=None,
zebra_multipath_enabled=False):
super(GoBGPContainer, self).__init__(name, asn, router_id,
ctn_image_name)
self.shared_volumes.append((self.config_dir, self.SHARED_VOLUME))
self.quagga_config_dir = '{0}/quagga'.format(self.config_dir)
self.shared_volumes.append((self.quagga_config_dir, self.QUAGGA_VOLUME))
self.log_level = log_level
self.prefix_set = None
self.neighbor_set = None
self.bgp_set = None
self.statements = None
self.default_policy = None
self.zebra = zebra
self.zapi_version = zapi_version
self.zebra_multipath_enabled = zebra_multipath_enabled
self.config_format = config_format
# bgp_config is equivalent to config.BgpConfigSet structure
# Example:
# bgp_config = {
# 'global': {
# 'confederation': {
# 'config': {
# 'identifier': 10,
# 'member-as-list': [65001],
# }
# },
# },
# }
self.bgp_config = bgp_config or {}
# To start OSPFd in GoBGP container, specify 'ospfd_config' as a dict
# type value.
# Example:
# ospfd_config = {
# 'redistributes': [
# 'connected',
# ],
# 'networks': {
# '192.168.1.0/24': '0.0.0.0', # <network>: <area>
# },
# }
self.ospfd_config = ospfd_config or {}
def _start_gobgp(self, graceful_restart=False):
c = CmdBuffer()
c << '#!/bin/sh'
c << '/go/bin/gobgpd -f {0}/gobgpd.conf -l {1} -p {2} -t {3} > ' \
'{0}/gobgpd.log 2>&1'.format(self.SHARED_VOLUME, self.log_level, '-r' if graceful_restart else '', self.config_format)
cmd = 'echo "{0:s}" > {1}/start.sh'.format(str(c), self.config_dir)
local(cmd, capture=True)
cmd = "chmod 755 {0}/start.sh".format(self.config_dir)
local(cmd, capture=True)
self.local("{0}/start.sh".format(self.SHARED_VOLUME), detach=True)
def start_gobgp(self, graceful_restart=False):
if self._is_running():
raise RuntimeError('GoBGP is already running')
self._start_gobgp(graceful_restart=graceful_restart)
self._wait_for_boot()
def stop_gobgp(self):
self.local("pkill -KILL gobgpd")
def _start_zebra(self):
if self.zapi_version == 2:
daemon_bin = '/usr/lib/quagga/zebra'
else:
daemon_bin = 'zebra'
cmd = '{0} -f {1}/zebra.conf'.format(daemon_bin, self.QUAGGA_VOLUME)
self.local(cmd, detach=True)
def _start_ospfd(self):
if self.zapi_version == 2:
daemon_bin = '/usr/lib/quagga/ospfd'
else:
daemon_bin = 'ospfd'
cmd = '{0} -f {1}/ospfd.conf'.format(daemon_bin, self.QUAGGA_VOLUME)
self.local(cmd, detach=True)
def _get_enabled_quagga_daemons(self):
daemons = []
if self.zebra:
daemons.append('zebra')
if self.ospfd_config:
daemons.append('ospfd')
return daemons
def _is_running(self):
return self.local('gobgp global'
' > /dev/null 2>&1; echo $?', capture=True) == '0'
def _wait_for_boot(self):
for daemon in self._get_enabled_quagga_daemons():
def _f_quagga():
ret = self.local("vtysh -d {0} -c 'show run' > /dev/null 2>&1; echo $?".format(daemon), capture=True)
return ret == '0'
wait_for_completion(_f_quagga)
wait_for_completion(self._is_running)
def run(self):
super(GoBGPContainer, self).run()
if self.zebra:
self._start_zebra()
if self.ospfd_config:
self._start_ospfd()
self._start_gobgp()
self._wait_for_boot()
return self.WAIT_FOR_BOOT
@staticmethod
def _get_as_path(path):
asps = (p['as_paths'] for p in path['attrs']
if p['type'] == BGP_ATTR_TYPE_AS_PATH and 'as_paths' in p and p['as_paths'] is not None)
asps = chain.from_iterable(asps)
asns = (asp['asns'] for asp in asps)
return list(chain.from_iterable(asns))
@staticmethod
def _get_nexthop(path):
for p in path['attrs']:
if p['type'] == BGP_ATTR_TYPE_NEXT_HOP or p['type'] == BGP_ATTR_TYPE_MP_REACH_NLRI:
return p['nexthop']
@staticmethod
def _get_local_pref(path):
for p in path['attrs']:
if p['type'] == BGP_ATTR_TYPE_LOCAL_PREF:
return p['value']
return None
@staticmethod
def _get_med(path):
for p in path['attrs']:
if p['type'] == BGP_ATTR_TYPE_MULTI_EXIT_DISC:
return p['metric']
return None
@staticmethod
def _get_community(path):
for p in path['attrs']:
if p['type'] == BGP_ATTR_TYPE_COMMUNITIES:
return [community_str(c) for c in p['communities']]
return None
def _get_rib(self, dests_dict):
dests = []
for k, v in list(dests_dict.items()):
for p in v:
p["nexthop"] = self._get_nexthop(p)
p["aspath"] = self._get_as_path(p)
p["local-pref"] = self._get_local_pref(p)
p["community"] = self._get_community(p)
p["med"] = self._get_med(p)
p["prefix"] = k
path_id = p.get("id", None)
if path_id:
p["identifier"] = p["id"]
dests.append({'paths': v, 'prefix': k})
return dests
def _trigger_peer_cmd(self, cmd, peer):
peer_addr = self.peer_name(peer)
cmd = 'gobgp neighbor {0} {1}'.format(peer_addr, cmd)
self.local(cmd)
def disable_peer(self, peer):
self._trigger_peer_cmd('disable', peer)
def enable_peer(self, peer):
self._trigger_peer_cmd('enable', peer)
def reset(self, peer):
self._trigger_peer_cmd('reset', peer)
def softreset(self, peer, rf='ipv4', type='in'):
self._trigger_peer_cmd('softreset{0} -a {1}'.format(type, rf), peer)
def get_local_rib(self, peer, prefix='', rf='ipv4'):
peer_addr = self.peer_name(peer)
cmd = 'gobgp -j neighbor {0} local {1} -a {2}'.format(peer_addr, prefix, rf)
output = self.local(cmd, capture=True)
return self._get_rib(json.loads(output))
def get_global_rib(self, prefix='', rf='ipv4'):
cmd = 'gobgp -j global rib {0} -a {1}'.format(prefix, rf)
output = self.local(cmd, capture=True)
return self._get_rib(json.loads(output))
def monitor_global_rib(self, queue, rf='ipv4'):
host = self.ip_addrs[0][1].split('/')[0]
if not os.path.exists('{0}/gobgp'.format(self.config_dir)):
self.local('cp /go/bin/gobgp {0}/'.format(self.SHARED_VOLUME))
args = '{0}/gobgp -u {1} -j monitor global rib -a {2}'.format(self.config_dir, host, rf).split(' ')
def monitor():
process = subprocess.Popen(args, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, ''):
p = json.loads(line)[0]
p["nexthop"] = self._get_nexthop(p)
p["aspath"] = self._get_as_path(p)
p["local-pref"] = self._get_local_pref(p)
p["med"] = self._get_med(p)
queue.put(p)
t = Thread(target=monitor)
t.daemon = True
t.start()
def _get_adj_rib(self, adj_type, peer, prefix='', rf='ipv4', add_path_enabled=False):
peer_addr = self.peer_name(peer)
cmd = 'gobgp neighbor {0} adj-{1} {2} -a {3} -j'.format(peer_addr,
adj_type,
prefix, rf)
output = self.local(cmd, capture=True)
if add_path_enabled:
return self._get_rib(json.loads(output))
ret = [p[0] for p in json.loads(output).values()]
for p in ret:
p["nexthop"] = self._get_nexthop(p)
p["aspath"] = self._get_as_path(p)
p["prefix"] = p['nlri']['prefix']
p["local-pref"] = self._get_local_pref(p)
p["med"] = self._get_med(p)
return ret
def get_adj_rib_in(self, peer, prefix='', rf='ipv4', add_path_enabled=False):
return self._get_adj_rib('in', peer, prefix, rf, add_path_enabled)
def get_adj_rib_out(self, peer, prefix='', rf='ipv4', add_path_enabled=False):
return self._get_adj_rib('out', peer, prefix, rf, add_path_enabled)
def get_neighbor(self, peer):
cmd = 'gobgp -j neighbor {0}'.format(self.peer_name(peer))
return json.loads(self.local(cmd, capture=True))
def get_neighbor_state(self, peer):
s = self.get_neighbor(peer)['state']['session_state']
if s == 1:
return BGP_FSM_IDLE
elif s == 3:
return BGP_FSM_ACTIVE
elif s == 6:
return BGP_FSM_ESTABLISHED
return "unknown"
def clear_policy(self):
self.policies = {}
for info in self.peers.values():
info['policies'] = {}
self.prefix_set = []
self.neighbor_set = []
self.statements = []
def set_prefix_set(self, ps):
if not isinstance(ps, list):
ps = [ps]
self.prefix_set = ps
def add_prefix_set(self, ps):
if self.prefix_set is None:
self.prefix_set = []
self.prefix_set.append(ps)
def set_neighbor_set(self, ns):
if not isinstance(ns, list):
ns = [ns]
self.neighbor_set = ns
def add_neighbor_set(self, ns):
if self.neighbor_set is None:
self.neighbor_set = []
self.neighbor_set.append(ns)
def set_bgp_defined_set(self, bs):
self.bgp_set = bs
def create_config(self):
self._create_config_bgp()
if self.zebra:
local('mkdir -p {0}'.format(self.quagga_config_dir))
local('chmod 777 {0}'.format(self.quagga_config_dir))
self._create_config_zebra()
if self.ospfd_config:
self._create_config_ospfd()
def _merge_dict(self, dct, merge_dct):
for k, v in merge_dct.items():
if (k in dct and isinstance(dct[k], dict)
and isinstance(merge_dct[k], collections.abc.Mapping)):
self._merge_dict(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]
def _create_config_bgp(self):
config = {
'global': {
'config': {
'as': self.asn,
'router-id': self.router_id,
},
'route-selection-options': {
'config': {
'external-compare-router-id': True,
},
},
},
'neighbors': [],
}
self._merge_dict(config, self.bgp_config)
if self.zebra and self.zapi_version == 2:
config['global']['use-multiple-paths'] = {'config': {'enabled': True}}
else:
config['global']['use-multiple-paths'] = {'config': {'enabled': self.zebra_multipath_enabled}}
for peer, info in self.peers.items():
afi_safi_list = []
if info['interface'] != '':
afi_safi_list.append({'config': {'afi-safi-name': 'ipv4-unicast'}})
afi_safi_list.append({'config': {'afi-safi-name': 'ipv6-unicast'}})
else:
version = netaddr.IPNetwork(info['neigh_addr']).version
if version == 4:
afi_safi_list.append({'config': {'afi-safi-name': 'ipv4-unicast'}})
elif version == 6:
afi_safi_list.append({'config': {'afi-safi-name': 'ipv6-unicast'}})
else:
Exception('invalid ip address version. {0}'.format(version))
if info['vpn']:
afi_safi_list.append({'config': {'afi-safi-name': 'l3vpn-ipv4-unicast'}})
afi_safi_list.append({'config': {'afi-safi-name': 'l3vpn-ipv6-unicast'}})
afi_safi_list.append({'config': {'afi-safi-name': 'l2vpn-evpn'}})
afi_safi_list.append({'config': {'afi-safi-name': 'rtc'}, 'route-target-membership': {'config': {'deferral-time': 10}}})
if info['flowspec']:
afi_safi_list.append({'config': {'afi-safi-name': 'ipv4-flowspec'}})
afi_safi_list.append({'config': {'afi-safi-name': 'l3vpn-ipv4-flowspec'}})
afi_safi_list.append({'config': {'afi-safi-name': 'ipv6-flowspec'}})
afi_safi_list.append({'config': {'afi-safi-name': 'l3vpn-ipv6-flowspec'}})
if info['mup']:
afi_safi_list.append({'config': {'afi-safi-name': 'ipv4-mup'}})
afi_safi_list.append({'config': {'afi-safi-name': 'ipv6-mup'}})
neigh_addr = None
interface = None
peer_as = None
if info['interface'] == '':
neigh_addr = info['neigh_addr'].split('/')[0]
peer_as = info['remote_as']
else:
interface = info['interface']
n = {
'config': {
'neighbor-address': neigh_addr,
'neighbor-interface': interface,
'peer-as': peer_as,
'auth-password': info['passwd'],
'vrf': info['vrf'],
'remove-private-as': info['remove_private_as'],
},
'afi-safis': afi_safi_list,
'timers': {
'config': {
'connect-retry': 10,
},
},
'transport': {
'config': {},
},
}
n['as-path-options'] = {'config': {}}
if info['allow_as_in'] > 0:
n['as-path-options']['config']['allow-own-as'] = info['allow_as_in']
if info['replace_peer_as']:
n['as-path-options']['config']['replace-peer-as'] = info['replace_peer_as']
if ':' in info['local_addr']:
n['transport']['config']['local-address'] = info['local_addr'].split('/')[0]
if info['passive']:
n['transport']['config']['passive-mode'] = True
if info['is_rs_client']:
n['route-server'] = {'config': {'route-server-client': True}}
if info['local_as']:
n['config']['local-as'] = info['local_as']
if info['prefix_limit']:
for v in afi_safi_list:
v['prefix-limit'] = {'config': {'max-prefixes': info['prefix_limit'], 'shutdown-threshold-pct': 80}}
if info['graceful_restart'] is not None:
n['graceful-restart'] = {'config': {'enabled': True, 'restart-time': GRACEFUL_RESTART_TIME}}
for afi_safi in afi_safi_list:
afi_safi['mp-graceful-restart'] = {'config': {'enabled': True}}
if info['llgr'] is not None:
n['graceful-restart']['config']['restart-time'] = 1
n['graceful-restart']['config']['long-lived-enabled'] = True
for afi_safi in afi_safi_list:
afi_safi['long-lived-graceful-restart'] = {'config': {'enabled': True, 'restart-time': LONG_LIVED_GRACEFUL_RESTART_TIME}}
if info['is_rr_client']:
cluster_id = self.router_id
if 'cluster_id' in info and info['cluster_id'] is not None:
cluster_id = info['cluster_id']
n['route-reflector'] = {'config': {'route-reflector-client': True,
'route-reflector-cluster-id': cluster_id}}
if info["addpath"] > 0:
n["add-paths"] = {
"config": {"receive": True, "send-max": info["addpath"]}
}
if len(info.get('default-policy', [])) + len(info.get('policies', [])) > 0:
n['apply-policy'] = {'config': {}}
for typ, p in info.get('policies', {}).items():
n['apply-policy']['config']['{0}-policy-list'.format(typ)] = [p['name']]
def _f(v):
if v == 'reject':
return 'reject-route'
elif v == 'accept':
return 'accept-route'
raise Exception('invalid default policy type {0}'.format(v))
for typ, d in info.get('default-policy', {}).items():
n['apply-policy']['config']['default-{0}-policy'.format(typ)] = _f(d)
if info['treat_as_withdraw']:
n['error-handling'] = {'config': {'treat-as-withdraw': True}}
config['neighbors'].append(n)
config['defined-sets'] = {}
if self.prefix_set:
config['defined-sets']['prefix-sets'] = self.prefix_set
if self.neighbor_set:
config['defined-sets']['neighbor-sets'] = self.neighbor_set
if self.bgp_set:
config['defined-sets']['bgp-defined-sets'] = self.bgp_set
policy_list = []
for p in self.policies.values():
policy = {'name': p['name']}
if 'statements' in p:
policy['statements'] = p['statements']
policy_list.append(policy)
if len(policy_list) > 0:
config['policy-definitions'] = policy_list
if self.zebra:
config['zebra'] = {'config': {'enabled': True,
'redistribute-route-type-list': ['connect'],
'version': self.zapi_version}}
with open('{0}/gobgpd.conf'.format(self.config_dir), 'w') as f:
print(yellow('[{0}\'s new gobgpd.conf]'.format(self.name)))
if self.config_format == "toml":
raw = toml.dumps(config)
elif self.config_format == "yaml":
raw = yaml.dump(config)
elif self.config_format == "json":
raw = json.dumps(config)
else:
raise Exception('invalid config_format {0}'.format(self.config_format))
raw = raw.strip()
print(yellow(indent(raw)))
f.write(raw)
def _create_config_zebra(self):
c = CmdBuffer()
c << 'hostname zebra'
c << 'password zebra'
c << 'log file {0}/zebra.log'.format(self.QUAGGA_VOLUME)
c << 'debug zebra packet'
c << 'debug zebra kernel'
c << 'debug zebra rib'
c << 'ipv6 forwarding'
c << ''
with open('{0}/zebra.conf'.format(self.quagga_config_dir), 'w') as f:
print(yellow('[{0}\'s new zebra.conf]'.format(self.name)))
c = str(c).strip()
print(yellow(indent(c)))
f.writelines(c)
def _create_config_ospfd(self):
c = CmdBuffer()
c << 'hostname ospfd'
c << 'password zebra'
c << 'router ospf'
for redistribute in self.ospfd_config.get('redistributes', []):
c << ' redistribute {0}'.format(redistribute)
for network, area in list(self.ospfd_config.get('networks', {}).items()):
c << ' network {0} area {1}'.format(network, area)
c << 'log file {0}/ospfd.log'.format(self.QUAGGA_VOLUME)
c << ''
with open('{0}/ospfd.conf'.format(self.quagga_config_dir), 'w') as f:
print(yellow('[{0}\'s new ospfd.conf]'.format(self.name)))
print(yellow(indent(str(c))))
f.writelines(str(c))
def reload_config(self):
for daemon in self._get_enabled_quagga_daemons():
self.local('pkill -SIGHUP {0}'.format(daemon), capture=True)
self.local('pkill -SIGHUP gobgpd', capture=True)
self._wait_for_boot()
def add_route(self, route, rf='ipv4', attribute=None, aspath=None,
community=None, med=None, extendedcommunity=None,
nexthop=None, matchs=None, thens=None,
local_pref=None, identifier=None, reload_config=False):
if not self._is_running():
raise RuntimeError('GoBGP is not yet running')
self.routes.setdefault(route, [])
path = {
'prefix': route,
'rf': rf,
'attr': attribute,
'next-hop': nexthop,
'as-path': aspath,
'community': community,
'med': med,
'local-pref': local_pref,
'extended-community': extendedcommunity,
'identifier': identifier,
'matchs': matchs,
'thens': thens,
}
c = CmdBuffer(' ')
c << 'gobgp global rib -a {0} add'.format(rf)
if rf in ('ipv4', 'ipv6'):
c << route
if path['identifier']:
c << 'identifier {0}'.format(path['identifier'])
if path['next-hop']:
c << 'nexthop {0}'.format(path['next-hop'])
if path['local-pref']:
c << 'local-pref {0}'.format(path['local-pref'])
if path['med']:
c << 'med {0}'.format(path['med'])
if path['community']:
comm = str(path['community'])
if isinstance(path['community'], (list, tuple)):
comm = ','.join(path['community'])
c << 'community {0}'.format(comm)
elif rf.endswith('-flowspec'):
c << 'match {0}'.format(' '.join(path['matchs']))
c << 'then {0}'.format(' '.join(path['thens']))
else:
raise Exception('unsupported address family: {0}'.format(rf))
self.local(str(c), capture=True)
self.routes[route].append(path)
def del_route(self, route, identifier=None, reload_config=True):
if not self._is_running():
raise RuntimeError('GoBGP is not yet running')
if route not in self.routes:
return
new_paths = []
for path in self.routes[route]:
if path['identifier'] != identifier:
new_paths.append(path)
else:
r = CmdBuffer(' ')
r << 'gobgp global -a {0}'.format(path['rf'])
prefix = path['prefix']
if path['rf'].endswith('-flowspec'):
prefix = 'match {0}'.format(' '.join(path['matchs']))
r << 'rib del {0}'.format(prefix)
if identifier:
r << 'identifier {0}'.format(identifier)
cmd = str(r)
self.local(cmd, capture=True)
self.routes[route] = new_paths
# no need to reload config
class RawGoBGPContainer(GoBGPContainer):
def __init__(self, name, config, ctn_image_name='osrg/gobgp',
log_level='debug', zebra=False, config_format='yaml'):
if config_format == "toml":
d = toml.loads(config)
elif config_format == "yaml":
d = yaml.load(config)
elif config_format == "json":
d = json.loads(config)
else:
raise Exception('invalid config format {0}'.format(config_format))
asn = d['global']['config']['as']
router_id = d['global']['config']['router-id']
self.config = config
super(RawGoBGPContainer, self).__init__(name, asn, router_id,
ctn_image_name, log_level,
zebra, config_format)
def create_config(self):
with open('{0}/gobgpd.conf'.format(self.config_dir), 'w') as f:
print(yellow('[{0}\'s new gobgpd.conf]'.format(self.name)))
print(yellow(indent(self.config)))
f.write(self.config)