1
0
mirror of https://github.com/rtbrick/bngblaster.git synced 2024-05-06 15:54:57 +00:00
Files
rtbrick-bngblaster/code/ldpupdate
2023-01-03 13:12:52 +01:00

156 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""
LDP RAW Update Generator
Christian Giese, December 2022
Copyright (C) 2020-2022, RtBrick, Inc.
SPDX-License-Identifier: BSD-3-Clause
"""
import argparse
import ipaddress
import json
import logging
import struct
import sys
try:
from scapy.all import *
log_runtime.setLevel(logging.ERROR)
from scapy.contrib.ldp import *
log_runtime.setLevel(logging.INFO)
except:
print("Failed to load scapy!")
exit(1)
# ==============================================================
# DEFINITIONS
# ==============================================================
DESCRIPTION = """
The LDP RAW update generator is a simple
tool to generate LDP RAW update streams
for use with the BNG Blaster.
"""
LOG_LEVELS = {
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG
}
MPLS_LABEL_MIN = 1
MPLS_LABEL_MAX = 1048575
# ==============================================================
# FUNCTIONS
# ==============================================================
def init_logging(log_level: int) -> logging.Logger:
"""Init logging."""
level = LOG_LEVELS[log_level]
log = logging.getLogger()
log.setLevel(level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter('[%(asctime)s][%(levelname)-7s] %(message)s')
formatter.datefmt = '%Y-%m-%d %H:%M:%S'
handler.setFormatter(formatter)
log.addHandler(handler)
return log
def label_type(label: int) -> int:
"""Argument parser type for MPLS labels."""
label = int(label)
if label < MPLS_LABEL_MIN or label > MPLS_LABEL_MAX:
raise argparse.ArgumentTypeError("MPLS label out of range %s - %s" % (MPLS_LABEL_MIN, MPLS_LABEL_MAX))
return label
# ==============================================================
# MAIN
# ==============================================================
def main():
# parse arguments
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument('-l', '--lsr-id', metavar='ADDRESS', type=ipaddress.ip_address, required=True, help='LSR identifier')
parser.add_argument('-i', '--message-id-base', metavar='N', type=int, default=1000, help='message identifier base')
parser.add_argument('-p', '--prefix-base', metavar='PREFIX', type=ipaddress.ip_network, required=True, help='prefix base network')
parser.add_argument('-P', '--prefix-num', metavar='N', type=int, default=1, help='prefix count')
parser.add_argument('-m', '--label-base', metavar='LABEL', type=label_type, default=10000, help='label base')
parser.add_argument('-M', '--label-num', metavar='N', type=int, default=1, help='label count')
parser.add_argument('-f', '--file', type=str, default="out.ldp", help='output file')
parser.add_argument('--append', action="store_true", help="append to file if exist")
parser.add_argument('--pcap', metavar='FILE', type=str, help="write LDP updates to PCAP file")
parser.add_argument('--log-level', type=str, default='info', choices=LOG_LEVELS.keys(), help='logging Level')
args = parser.parse_args()
# init logging
log = init_logging(args.log_level)
if args.prefix_base.version != 4:
log.error("prefix must be IPv4")
exit(1)
log.info("init %s labeled IPv4 prefixes" % (args.prefix_num))
# Here we will store packets for optional PCAP output
pcap_packets = []
def pcap(message):
if args.pcap:
pcap_packets.append(Ether()/IP()/TCP(sport=len(pcap_packets)+10000, dport=646, seq=1, flags='PA')/message)
prefixes = []
label_index = 0
prefix = args.prefix_base
label = args.label_base
for _ in range(args.prefix_num):
log.debug("add prefix %s via label %s" % (prefix, label))
prefixes.append((prefix, label))
label_index += 1
if label_index < args.label_num:
label = args.label_base + label_index
if label > MPLS_LABEL_MAX:
label_index = 0
label = args.label_base
else:
label_index = 0
label = args.label_base
prefix = ipaddress.ip_network("%s/%s" % (prefix.broadcast_address+1, prefix.prefixlen))
if args.append:
log.info("open file %s (append)" % args.file)
file_flags = "ab"
else:
log.info("open file %s (replace)" % args.file)
file_flags = "wb"
with open(args.file, file_flags) as f:
mid = args.message_id_base
for prefix, label in prefixes:
pdu = LDP(id=args.lsr_id)/LDPLabelMM(id=mid, fec=[(str(prefix.network_address),int(prefix.prefixlen))],label=label)
mid+=1
pdu_bin = bytearray(pdu.build())
pcap(pdu)
f.write(pdu_bin)
if args.pcap:
log.info("create PCAP file %s" % args.pcap)
try:
wrpcap(args.pcap, pcap_packets)
except Exception as e:
log.error("failed to create PCAP file")
log.debug(e)
log.info("finished")
if __name__ == "__main__":
main()