#!/usr/bin/env python3 """ BNG Blaster Control Socket Client Simple script to interact with the BNG Blaster control socket JSON RPC API. Christian Giese, January 2021 Copyright (C) 2020-2021, RtBrick, Inc. SPDX-License-Identifier: BSD-3-Clause """ import sys import socket import os import json import ast def usage(): print(""" BNG Blaster Control Socket Client {c} [arguments] Examples: {c} run.sock session-info session-id 1 {c} run.sock igmp-join session-id 1 group 239.0.0.1 source1 1.1.1.1 source2 2.2.2.2 source3 3.3.3.3 {c} run.sock igmp-info session-id 1 {c} run.sock l2tp-csurq tunnel-id 1 sessions [1,2] """.format(c=sys.argv[0])) sys.exit(1) def error(*args, **kwargs): """print error and exit""" print(*args, file=sys.stderr, **kwargs) sys.exit(1) def main(): """main function""" request = {} if(len(sys.argv)) < 3: usage() socket_path = sys.argv[1] request["command"] = sys.argv[2] if(len(sys.argv)) > 4: request["arguments"] = {} for i in range(3, len(sys.argv), 2): arg = sys.argv[i+1] try: # integer arguments like "session-id 1" request["arguments"][sys.argv[i]] = int(arg) except: try: # list arguments like "sessions [1,2]"" request["arguments"][sys.argv[i]] = ast.literal_eval(arg) except: # string arguments like "group 239.0.0.1" request["arguments"][sys.argv[i]] = arg #print(json.dumps(request).encode('utf-8')) if os.path.exists(socket_path): client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: client.connect(socket_path) client.send(json.dumps(request).encode('utf-8')) data = "" while True: junk = client.recv(1024) if junk: data += junk.decode('utf-8') else: break print(json.dumps(json.loads(data), indent=4)) except Exception as e: error(e) finally: client.close() else: error("socket %s not found" % socket_path) if __name__ == "__main__": main()