diff --git a/clients/python/client.py b/clients/python/client.py index daff5bb0d..b6e543d17 100644 --- a/clients/python/client.py +++ b/clients/python/client.py @@ -5,52 +5,53 @@ import villas_pb2 import time, socket, errno, sys, os, signal -layer = sys.argv[1] if len(sys.argv) == 2 else 'udp' +layer = sys.argv[1] if len(sys.argv) == 2 else "udp" -if layer not in ['udp', 'unix']: - raise Exception('Unsupported socket type') +if layer not in ["udp", "unix"]: + raise Exception("Unsupported socket type") -if layer == 'unix': - local = '/var/run/villas-node.client.sock' - remote = '/var/run/villas-node.server.sock' +if layer == "unix": + local = "/var/run/villas-node.client.sock" + remote = "/var/run/villas-node.server.sock" - skt = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + skt = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - # Delete stale sockets - if os.path.exists(local): - os.unlink(local) -elif layer == 'udp': - local = ('0.0.0.0', 12001) - remote = ('127.0.0.1', 12000) + # Delete stale sockets + if os.path.exists(local): + os.unlink(local) +elif layer == "udp": + local = ("0.0.0.0", 12001) + remote = ("127.0.0.1", 12000) - skt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + skt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -print('Start client...') +print("Start client...") skt.bind(local) # Try to connect in case Unix domain socket does not exist yet.. connected = False while not connected: - try: - skt.connect(remote) - except socket.error as serr: - if serr.errno not in [ errno.ECONNREFUSED, errno.ENOENT ]: - raise serr - - print('Not connected. Retrying in 1 sec') - time.sleep(1) - else: - connected = True + try: + skt.connect(remote) + except socket.error as serr: + if serr.errno not in [errno.ECONNREFUSED, errno.ENOENT]: + raise serr -print('Ready. Ctrl-C to quit.') + print("Not connected. Retrying in 1 sec") + time.sleep(1) + else: + connected = True + +print("Ready. Ctrl-C to quit.") msg = villas_pb2.Message() # Gracefully shutdown def sighandler(signum, frame): - running = False + running = False + signal.signal(signal.SIGINT, sighandler) signal.signal(signal.SIGTERM, sighandler) @@ -58,15 +59,15 @@ signal.signal(signal.SIGTERM, sighandler) running = True while running: - dgram = skt.recv(1024) - if not dgram: - break - else: - msg.ParseFromString(dgram) - print(msg) - - skt.send(msg.SerializeToString()) + dgram = skt.recv(1024) + if not dgram: + break + else: + msg.ParseFromString(dgram) + print(msg) + + skt.send(msg.SerializeToString()) skt.close() -print('Bye.') +print("Bye.") diff --git a/etc/python/example.py b/etc/python/example.py index 0132ad884..6be8b623e 100644 --- a/etc/python/example.py +++ b/etc/python/example.py @@ -1,5 +1,5 @@ #!/bin/env python3 -''' Example Python config +""" Example Python config This example demonstrates how you can use Python to generate complex configuration files. @@ -11,7 +11,7 @@ Author: Steffen Vogel SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University SPDX-License-Identifier: Apache-2.0 -''' +""" import json import sys @@ -19,59 +19,38 @@ import sys N = 10 nodes = { - 'raspberry': { - 'type': 'socket', - 'layer': 'udp', - 'format': 'protobuf', - - 'in': { - 'address': '*:12000', + "raspberry": { + "type": "socket", + "layer": "udp", + "format": "protobuf", + "in": { + "address": "*:12000", }, - 'out': { - 'address': '1.2.3.4:12000' - } + "out": {"address": "1.2.3.4:12000"}, } } for i in range(N): - name = f'agent{i}' + name = f"agent{i}" port = 12000 + i nodes[name] = { - 'type': 'socket', - 'layer': 'udp', - 'format': 'protobuf', - - 'in': { - 'address': '*:12000', - 'signals': [ - { - 'name': 'in', - 'type': 'float' - } - ] - }, - 'out': { - 'address': f'5.6.7.8:{port}' - } + "type": "socket", + "layer": "udp", + "format": "protobuf", + "in": {"address": "*:12000", "signals": [{"name": "in", "type": "float"}]}, + "out": {"address": f"5.6.7.8:{port}"}, } paths = [ { - 'in': [f'agent{i}' for i in range(N)], - 'out': 'raspberry', - 'mode': 'any', - 'hooks': [ - { - 'type': 'print' - } - ] + "in": [f"agent{i}" for i in range(N)], + "out": "raspberry", + "mode": "any", + "hooks": [{"type": "print"}], }, ] -config = { - 'nodes': nodes, - 'paths': paths -} +config = {"nodes": nodes, "paths": paths} json.dump(config, sys.stdout, indent=2) diff --git a/tools/hwdef-parse.py b/tools/hwdef-parse.py index 05ea87da0..13981aa06 100755 --- a/tools/hwdef-parse.py +++ b/tools/hwdef-parse.py @@ -34,291 +34,335 @@ import re import json whitelist = [ - [ 'xilinx.com', 'ip', 'zynq_ultra_ps_e' ], - [ 'xilinx.com', 'ip', 'axi_timer' ], - [ 'xilinx.com', 'ip', 'axis_switch' ], - [ 'xilinx.com', 'ip', 'axi_fifo_mm_s' ], - [ 'xilinx.com', 'ip', 'axi_dma' ], - [ 'xilinx.com', 'ip', 'aurora_8b10b' ], - [ 'xilinx.com', 'ip', 'axi_gpio' ], - [ 'xilinx.com', 'ip', 'axi_bram_ctrl' ], - [ 'xilinx.com', 'ip', 'axi_pcie' ], - [ 'xilinx.com', 'ip', 'axi_iic' ], - [ 'xilinx.com', 'module_ref', 'dinoif_fast' ], - [ 'xilinx.com', 'module_ref', 'dinoif_dac' ], - [ 'xilinx.com', 'module_ref', 'axi_pcie_intc' ], - [ 'xilinx.com', 'module_ref', 'registerif' ], - [ 'xilinx.com', 'hls', 'rtds2gpu' ], - [ 'xilinx.com', 'hls', 'mem' ], - [ 'acs.eonerc.rwth-aachen.de', 'user', 'axi_pcie_intc' ], - [ 'acs.eonerc.rwth-aachen.de', 'user', 'rtds_axis' ], - [ 'acs.eonerc.rwth-aachen.de', 'user', 'aurora_axis' ], - [ 'acs.eonerc.rwth-aachen.de', 'hls' ], - [ 'acs.eonerc.rwth-aachen.de', 'sysgen' ] + ["xilinx.com", "ip", "zynq_ultra_ps_e"], + ["xilinx.com", "ip", "axi_timer"], + ["xilinx.com", "ip", "axis_switch"], + ["xilinx.com", "ip", "axi_fifo_mm_s"], + ["xilinx.com", "ip", "axi_dma"], + ["xilinx.com", "ip", "aurora_8b10b"], + ["xilinx.com", "ip", "axi_gpio"], + ["xilinx.com", "ip", "axi_bram_ctrl"], + ["xilinx.com", "ip", "axi_pcie"], + ["xilinx.com", "ip", "axi_iic"], + ["xilinx.com", "module_ref", "dinoif_fast"], + ["xilinx.com", "module_ref", "dinoif_dac"], + ["xilinx.com", "module_ref", "axi_pcie_intc"], + ["xilinx.com", "module_ref", "registerif"], + ["xilinx.com", "hls", "rtds2gpu"], + ["xilinx.com", "hls", "mem"], + ["acs.eonerc.rwth-aachen.de", "user", "axi_pcie_intc"], + ["acs.eonerc.rwth-aachen.de", "user", "rtds_axis"], + ["acs.eonerc.rwth-aachen.de", "user", "aurora_axis"], + ["acs.eonerc.rwth-aachen.de", "hls"], + ["acs.eonerc.rwth-aachen.de", "sysgen"], ] # List of VLNI ids of AXI4-Stream infrastructure IP cores which do not alter data # see PG085 (AXI4-Stream Infrastructure IP Suite v2.2) axi_converter_whitelist = [ - [ 'xilinx.com', 'ip', 'axis_subset_converter' ], - [ 'xilinx.com', 'ip', 'axis_clock_converter' ], - [ 'xilinx.com', 'ip', 'axis_register_slice' ], - [ 'xilinx.com', 'ip', 'axis_dwidth_converter' ], - [ 'xilinx.com', 'ip', 'axis_register_slice' ], - [ 'xilinx.com', 'ip', 'axis_data_fifo' ], - [ 'xilinx.com', 'ip', 'floating_point' ] + ["xilinx.com", "ip", "axis_subset_converter"], + ["xilinx.com", "ip", "axis_clock_converter"], + ["xilinx.com", "ip", "axis_register_slice"], + ["xilinx.com", "ip", "axis_dwidth_converter"], + ["xilinx.com", "ip", "axis_register_slice"], + ["xilinx.com", "ip", "axis_data_fifo"], + ["xilinx.com", "ip", "floating_point"], ] opponent = { - 'MASTER' : ('SLAVE', 'TARGET'), - 'SLAVE' : ('MASTER', 'INITIATOR'), - 'INITIATOR' : ('TARGET', 'SLAVE'), - 'TARGET' : ('INITIATOR', 'MASTER') + "MASTER": ("SLAVE", "TARGET"), + "SLAVE": ("MASTER", "INITIATOR"), + "INITIATOR": ("TARGET", "SLAVE"), + "TARGET": ("INITIATOR", "MASTER"), } + def bus_trace(root, busname, type, whitelist): - module = root.xpath('.//MODULE[.//BUSINTERFACE[@BUSNAME="{}" and (@TYPE="{}" or @TYPE="{}")]]'.format(busname, type[0], type[1])) + module = root.xpath( + './/MODULE[.//BUSINTERFACE[@BUSNAME="{}" and (@TYPE="{}" or @TYPE="{}")]]'.format( + busname, type[0], type[1] + ) + ) - vlnv = module[0].get('VLNV') - instance = module[0].get('INSTANCE') + vlnv = module[0].get("VLNV") + instance = module[0].get("INSTANCE") - if vlnv_match(vlnv, whitelist): - return instance, busname - elif vlnv_match(vlnv, axi_converter_whitelist): - next_bus = module[0].xpath('.//BUSINTERFACE[@TYPE="{}" or @TYPE="{}"]'.format(opponent[type[0]][0], opponent[type[0]][1])) - next_busname = next_bus[0].get('BUSNAME') + if vlnv_match(vlnv, whitelist): + return instance, busname + elif vlnv_match(vlnv, axi_converter_whitelist): + next_bus = module[0].xpath( + './/BUSINTERFACE[@TYPE="{}" or @TYPE="{}"]'.format( + opponent[type[0]][0], opponent[type[0]][1] + ) + ) + next_busname = next_bus[0].get("BUSNAME") + + return bus_trace(root, next_busname, type, whitelist) + else: + raise TypeError("Unsupported AXI4-Stream IP core: %s (%s)" % (instance, vlnv)) - return bus_trace(root, next_busname, type, whitelist) - else: - raise TypeError('Unsupported AXI4-Stream IP core: %s (%s)' % (instance, vlnv)) def vlnv_match(vlnv, whitelist): - c = vlnv.split(':') + c = vlnv.split(":") - for w in whitelist: - if c[:len(w)] == w: - return True + for w in whitelist: + if c[: len(w)] == w: + return True + + return False - return False def remove_prefix(text, prefix): - return text[text.startswith(prefix) and len(prefix):] + return text[text.startswith(prefix) and len(prefix) :] + def sanitize_name(name): - name = remove_prefix(name, 'S_') - name = remove_prefix(name, 'M_') - name = remove_prefix(name, 'AXI_') - name = remove_prefix(name, 'AXIS_') + name = remove_prefix(name, "S_") + name = remove_prefix(name, "M_") + name = remove_prefix(name, "AXI_") + name = remove_prefix(name, "AXIS_") + + return name - return name if len(sys.argv) < 2: - print('Usage: {} path/to/*.hwdef'.format(sys.argv[0])) - print(' {} path/to/*.hwh'.format(sys.argv[0])) - sys.exit(1) + print("Usage: {} path/to/*.hwdef".format(sys.argv[0])) + print(" {} path/to/*.hwh".format(sys.argv[0])) + sys.exit(1) try: - # read .hwdef which is actually a zip-file - zip = zipfile.ZipFile(sys.argv[1], 'r') - hwh = zip.read('top.hwh') + # read .hwdef which is actually a zip-file + zip = zipfile.ZipFile(sys.argv[1], "r") + hwh = zip.read("top.hwh") except: - f = open(sys.argv[1], 'r') - hwh = f.read() + f = open(sys.argv[1], "r") + hwh = f.read() # parse .hwh file which is actually XML try: - root = etree.XML(hwh) + root = etree.XML(hwh) except: - print('Bad format of "{}"! Did you choose the right file?'.format(sys.argv[1])) - sys.exit(1) + print('Bad format of "{}"! Did you choose the right file?'.format(sys.argv[1])) + sys.exit(1) ips = {} # find all whitelisted modules -modules = root.find('.//MODULES') +modules = root.find(".//MODULES") for module in modules: - instance = module.get('INSTANCE') - vlnv = module.get('VLNV') + instance = module.get("INSTANCE") + vlnv = module.get("VLNV") - # Ignroing unkown - if not vlnv_match(vlnv, whitelist): - continue + # Ignroing unkown + if not vlnv_match(vlnv, whitelist): + continue - ips[instance] = { - 'vlnv' : vlnv - } + ips[instance] = {"vlnv": vlnv} - # populate parameters - params = module.find('.//PARAMETERS') - if params is not None and instance != "zynq_ultra_ps_e_0": #! Parameters of "zynq" ignored - p = ips[instance].setdefault('parameters', {}) + # populate parameters + params = module.find(".//PARAMETERS") + if ( + params is not None and instance != "zynq_ultra_ps_e_0" + ): #! Parameters of "zynq" ignored + p = ips[instance].setdefault("parameters", {}) - for param in params: - name = param.get('NAME').lower() - value = param.get('VALUE') + for param in params: + name = param.get("NAME").lower() + value = param.get("VALUE") - try: - value = int(value, 0) - except ValueError: - pass + try: + value = int(value, 0) + except ValueError: + pass - p[name] = value + p[name] = value - # populate memory view - mmap = module.find('.//MEMORYMAP') - if mmap is None: - continue + # populate memory view + mmap = module.find(".//MEMORYMAP") + if mmap is None: + continue - mem = ips[instance].setdefault('memory-view', {}) - for mrange in mmap: - mem_interface = mrange.get('MASTERBUSINTERFACE') - mem_instance = mrange.get('INSTANCE') - mem_block = mrange.get('ADDRESSBLOCK') + mem = ips[instance].setdefault("memory-view", {}) + for mrange in mmap: + mem_interface = mrange.get("MASTERBUSINTERFACE") + mem_instance = mrange.get("INSTANCE") + mem_block = mrange.get("ADDRESSBLOCK") - _interface = mem.setdefault(mem_interface, {}) - _instance = _interface.setdefault(mem_instance, {}) - _block = _instance.setdefault(mem_block, {}) + _interface = mem.setdefault(mem_interface, {}) + _instance = _interface.setdefault(mem_instance, {}) + _block = _instance.setdefault(mem_block, {}) - _block['baseaddr'] = int(mrange.get('BASEVALUE'), 16) - _block['highaddr'] = int(mrange.get('HIGHVALUE'), 16) - _block['size'] = _block['highaddr'] - _block['baseaddr'] + 1 + _block["baseaddr"] = int(mrange.get("BASEVALUE"), 16) + _block["highaddr"] = int(mrange.get("HIGHVALUE"), 16) + _block["size"] = _block["highaddr"] - _block["baseaddr"] + 1 # find AXI-Stream switch port mapping switch = root.find('.//MODULE[@MODTYPE="axis_switch"]') -busifs = switch.find('.//BUSINTERFACES') +busifs = switch.find(".//BUSINTERFACES") switch_ports = 0 for busif in busifs: - if busif.get('VLNV') != 'xilinx.com:interface:axis:1.0': - continue + if busif.get("VLNV") != "xilinx.com:interface:axis:1.0": + continue - switch_ports += 1 + switch_ports += 1 - busname = busif.get('BUSNAME') - name = busif.get('NAME') - type = busif.get('TYPE') + busname = busif.get("BUSNAME") + name = busif.get("NAME") + type = busif.get("TYPE") - r = re.compile('(M|S)([0-9]+)_AXIS') - m = r.search(name) + r = re.compile("(M|S)([0-9]+)_AXIS") + m = r.search(name) - port = int(m.group(2)) + port = int(m.group(2)) - switch_ip_ports = ips[switch.get('INSTANCE')].setdefault('ports', []) + switch_ip_ports = ips[switch.get("INSTANCE")].setdefault("ports", []) - ep, busname_ep = bus_trace(root, busname, opponent[type], whitelist) - if ep in ips: + ep, busname_ep = bus_trace(root, busname, opponent[type], whitelist) + if ep in ips: + ports = ips[ep].setdefault("ports", []) + ports.append( + { + "role": opponent[type][0].lower(), + "target": "{}:{}".format(switch.get("INSTANCE"), name), + } + ) - ports = ips[ep].setdefault('ports', []) - ports.append({ - 'role': opponent[type][0].lower(), - 'target': '{}:{}'.format(switch.get('INSTANCE'), name) - }) + module_ep = root.find('.//MODULE[@INSTANCE="{}"]'.format(ep)) + busif_ep = module_ep.find('.//BUSINTERFACE[@BUSNAME="{}"]'.format(busname_ep)) + if busif_ep is None: + print("cannot find businterface: {}".format(busname_ep)) + sys.exit(1) - module_ep = root.find('.//MODULE[@INSTANCE="{}"]'.format(ep)) - busif_ep = module_ep.find('.//BUSINTERFACE[@BUSNAME="{}"]'.format(busname_ep)) - if busif_ep is None: - print("cannot find businterface: {}".format(busname_ep)) - sys.exit(1) + busif_name = ports[-1]["name"] = sanitize_name(busif_ep.get("NAME")) + ports[-1]["name"] = busif_name - busif_name = ports[-1]['name'] = sanitize_name(busif_ep.get('NAME')) - ports[-1]['name'] = busif_name - - switch_ip_ports.append({ - 'role': type.lower(), - 'target': '{}:{}'.format(ep, busif_name), - 'name': name - }) + switch_ip_ports.append( + { + "role": type.lower(), + "target": "{}:{}".format(ep, busif_name), + "name": name, + } + ) # set number of master/slave port pairs for switch -ips[switch.get('INSTANCE')]['num_ports'] = int(switch_ports / 2) +ips[switch.get("INSTANCE")]["num_ports"] = int(switch_ports / 2) # find interrupt assignments intc = root.find('.//MODULE[@MODTYPE="axi_pcie_intc"]') if intc is not None: - intr = intc.xpath('.//PORT[@NAME="intr" and @DIR="I"]')[0] - concat = root.xpath('.//MODULE[@MODTYPE="xlconcat" and .//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(intr.get('SIGNAME')))[0] - ports = concat.xpath('.//PORT[@DIR="I"]') + intr = intc.xpath('.//PORT[@NAME="intr" and @DIR="I"]')[0] + concat = root.xpath( + './/MODULE[@MODTYPE="xlconcat" and .//PORT[@SIGNAME="{}" and @DIR="O"]]'.format( + intr.get("SIGNAME") + ) + )[0] + ports = concat.xpath('.//PORT[@DIR="I"]') - for port in ports: - name = port.get('NAME') - signame = port.get('SIGNAME') + for port in ports: + name = port.get("NAME") + signame = port.get("SIGNAME") - # Skip unconnected IRQs - if not signame: - continue + # Skip unconnected IRQs + if not signame: + continue - r = re.compile('In([0-9+])') - m = r.search(name) + r = re.compile("In([0-9+])") + m = r.search(name) - irq = int(m.group(1)) - ip = root.xpath('.//MODULE[.//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(signame))[0] + irq = int(m.group(1)) + ip = root.xpath( + './/MODULE[.//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(signame) + )[0] - instance = ip.get('INSTANCE') - vlnv = ip.get('VLNV') - modtype = ip.get('MODTYPE') + instance = ip.get("INSTANCE") + vlnv = ip.get("VLNV") + modtype = ip.get("MODTYPE") - originators = [] + originators = [] - # follow one level of OR gates merging interrupts (may be generalized later) - if modtype == 'util_vector_logic': - logic_op = ip.xpath('.//PARAMETER[@NAME="C_OPERATION"]')[0] - if logic_op.get('VALUE') == 'or': - # hardware interrupts sharing the same IRQ at the controller - ports = ip.xpath('.//PORT[@DIR="I"]') - for port in ports: - signame = port.get('SIGNAME') - ip = root.xpath('.//MODULE[.//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(signame))[0] - instance = ip.get('INSTANCE') - originators.append((instance, signame)) - else: - # consider this instance as originator - originators.append((instance, signame)) + # follow one level of OR gates merging interrupts (may be generalized later) + if modtype == "util_vector_logic": + logic_op = ip.xpath('.//PARAMETER[@NAME="C_OPERATION"]')[0] + if logic_op.get("VALUE") == "or": + # hardware interrupts sharing the same IRQ at the controller + ports = ip.xpath('.//PORT[@DIR="I"]') + for port in ports: + signame = port.get("SIGNAME") + ip = root.xpath( + './/MODULE[.//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(signame) + )[0] + instance = ip.get("INSTANCE") + originators.append((instance, signame)) + else: + # consider this instance as originator + originators.append((instance, signame)) + for instance, signame in originators: + ip = root.xpath( + './/MODULE[.//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(signame) + )[0] + port = ip.xpath('.//PORT[@SIGNAME="{}" and @DIR="O"]'.format(signame))[0] + irqname = port.get("NAME") - for instance, signame in originators: - ip = root.xpath('.//MODULE[.//PORT[@SIGNAME="{}" and @DIR="O"]]'.format(signame))[0] - port = ip.xpath('.//PORT[@SIGNAME="{}" and @DIR="O"]'.format(signame))[0] - irqname = port.get('NAME') - - if instance in ips: - irqs = ips[instance].setdefault('irqs', {}) - irqs[irqname] = '{}:{}'.format(intc.get('INSTANCE'), irq) + if instance in ips: + irqs = ips[instance].setdefault("irqs", {}) + irqs[irqname] = "{}:{}".format(intc.get("INSTANCE"), irq) # Find BRAM storage depths (size) brams = root.xpath('.//MODULE[@MODTYPE="axi_bram_ctrl"]') for bram in brams: - instance = bram.get('INSTANCE') + instance = bram.get("INSTANCE") - width = bram.find('.//PARAMETER[@NAME="DATA_WIDTH"]').get('VALUE') - depth = bram.find('.//PARAMETER[@NAME="MEM_DEPTH"]').get('VALUE') + width = bram.find('.//PARAMETER[@NAME="DATA_WIDTH"]').get("VALUE") + depth = bram.find('.//PARAMETER[@NAME="MEM_DEPTH"]').get("VALUE") - size = int(width) * int(depth) / 8 + size = int(width) * int(depth) / 8 - if instance in ips: - ips[instance]['size'] = int(size) + if instance in ips: + ips[instance]["size"] = int(size) pcies = root.xpath('.//MODULE[@MODTYPE="axi_pcie"]') for pcie in pcies: - instance = pcie.get('INSTANCE') - axi_bars = ips[instance].setdefault('axi_bars', {}) - pcie_bars = ips[instance].setdefault('pcie_bars', {}) + instance = pcie.get("INSTANCE") + axi_bars = ips[instance].setdefault("axi_bars", {}) + pcie_bars = ips[instance].setdefault("pcie_bars", {}) - for from_bar, to_bar, from_bars in (('AXIBAR', 'PCIEBAR', axi_bars), ('PCIEBAR', 'AXIBAR', pcie_bars)): - from_bar_num = int(pcie.find('.//PARAMETER[@NAME="C_{}_NUM"]'.format(from_bar)).get('VALUE')) + for from_bar, to_bar, from_bars in ( + ("AXIBAR", "PCIEBAR", axi_bars), + ("PCIEBAR", "AXIBAR", pcie_bars), + ): + from_bar_num = int( + pcie.find('.//PARAMETER[@NAME="C_{}_NUM"]'.format(from_bar)).get("VALUE") + ) - for i in range(0, from_bar_num): - from_bar_to_bar_offset = int(pcie.find('.//PARAMETER[@NAME="C_{}2{}_{}"]'.format(from_bar, to_bar, i)).get('VALUE'), 16) - from_bars['BAR{}'.format(i)] = { 'translation': from_bar_to_bar_offset } + for i in range(0, from_bar_num): + from_bar_to_bar_offset = int( + pcie.find( + './/PARAMETER[@NAME="C_{}2{}_{}"]'.format(from_bar, to_bar, i) + ).get("VALUE"), + 16, + ) + from_bars["BAR{}".format(i)] = {"translation": from_bar_to_bar_offset} - if from_bar == 'AXIBAR': - axi_bar_lo = int(pcie.find('.//PARAMETER[@NAME="C_{}_{}"]'.format(from_bar, i)).get('VALUE'), 16) - axi_bar_hi = int(pcie.find('.//PARAMETER[@NAME="C_{}_HIGHADDR_{}"]'.format(from_bar, i)).get('VALUE'), 16) - axi_bar_size = axi_bar_hi - axi_bar_lo + 1 + if from_bar == "AXIBAR": + axi_bar_lo = int( + pcie.find('.//PARAMETER[@NAME="C_{}_{}"]'.format(from_bar, i)).get( + "VALUE" + ), + 16, + ) + axi_bar_hi = int( + pcie.find( + './/PARAMETER[@NAME="C_{}_HIGHADDR_{}"]'.format(from_bar, i) + ).get("VALUE"), + 16, + ) + axi_bar_size = axi_bar_hi - axi_bar_lo + 1 - axi_bar = from_bars['BAR{}'.format(i)] - axi_bar['baseaddr'] = axi_bar_lo - axi_bar['highaddr'] = axi_bar_hi - axi_bar['size'] = axi_bar_size + axi_bar = from_bars["BAR{}".format(i)] + axi_bar["baseaddr"] = axi_bar_lo + axi_bar["highaddr"] = axi_bar_hi + axi_bar["size"] = axi_bar_size print(json.dumps(ips, indent=2)) -