Source code for FABulous.fabric_generator.parser.parse_switchmatrix

"""Parser functions for switch matrix and list file configurations.

This module provides utilities for parsing switch matrix CSV files and list files used
in fabric definition. It handles expansion of port definitions, connection mappings, and
validation of port configurations.
"""

import re
from pathlib import Path
from typing import Literal, overload

from loguru import logger

from FABulous.custom_exception import (
    InvalidListFileDefinition,
    InvalidPortType,
    InvalidSwitchMatrixDefinition,
)
from FABulous.fabric_definition.define import IO, Direction, Side
from FABulous.fabric_definition.Port import Port

[docs] oppositeDic = {"NORTH": "SOUTH", "SOUTH": "NORTH", "EAST": "WEST", "WEST": "EAST"}
[docs] def parseMatrix(fileName: Path, tileName: str) -> dict[str, list[str]]: """Parse the matrix CSV into a dictionary from destination to source. Parameters ---------- fileName : Path Directory of the matrix CSV file. tileName : str Name of the tile needed to be parsed. Raises ------ ValueError Non matching matrix file content and tile name Returns ------- dict : [str, list[str]] Dictionary from destination to a list of sources. """ connectionsDic = {} with fileName.open() as f: file = f.read() file = re.sub(r"#.*", "", file) file = file.split("\n") if file[0].split(",")[0] != tileName: raise InvalidSwitchMatrixDefinition( f"{fileName} {file[0].split(',')} {tileName}\n" "Tile name (top left element) in csv file does not match tile name " "in tile object" ) destList = file[0].split(",")[1:] for i in file[1:]: i = i.split(",") portName, connections = i[0], i[1:] if portName == "": continue indices = [k for k, v in enumerate(connections) if v == "1"] connectionsDic[portName] = [destList[j] for j in indices] return connectionsDic
@overload
[docs] def parseList( filePath: Path, collect: Literal["pair"] = "pair" ) -> list[tuple[str, str]]: pass
@overload def parseList( filePath: Path, collect: Literal["source", "sink"] ) -> dict[str, list[str]]: pass
[docs] def expandListPorts(port: str, PortList: list[str]) -> None: """Expand the .list file entry into a list of tuples. Parameters ---------- port : str The port entry to expand. If it contains "[", it's split into multiple entries based on "|". PortList : list The list where expanded port entries are appended. Raises ------ ValueError If the port entry contains "[" or "{" without matching closing bracket "]"/"}". """ if port.count("[") != port.count("]") and port.count("{") != port.count("}"): raise ValueError(f"Invalid port entry: {port}, mismatched brackets") # a leading '[' tells us that we have to expand the list if "[" in port: # port.find gives us the first occurrence index in a string left_index = port.find("[") right_index = port.find("]") before_left_index = port[0:left_index] # right_index is the position of the ']' so we need everything after that after_right_index = port[(right_index + 1) :] ExpandList = [] ExpandList = re.split(r"\|", port[left_index + 1 : right_index]) for entry in ExpandList: ExpandListItem = before_left_index + entry + after_right_index expandListPorts(ExpandListItem, PortList) else: # Multiply ports by the number of multipliers, given in the curly braces. # We let all curly braces in the port Expansion to be expanded and # calculate the total number of ports to be added afterward, # based on the number of multipliers. # Also remove the multipliers from port name, before adding it to the list. port = port.replace(" ", "") # remove spaces multipliers = re.findall(r"\{(\d+)\}", port) portMultiplier = sum([int(m) for m in multipliers]) if portMultiplier != 0: port = re.sub(r"\{(\d+)\}", "", port) logger.debug(f"Port {port} has {portMultiplier} multipliers") for _i in range(portMultiplier): PortList.append(port) else: PortList.append(port)
def parseList( filePath: Path, collect: Literal["pair", "source", "sink"] = "pair", ) -> list[tuple[str, str]] | dict[str, list[str]]: """Parse a list file and expand the list file information into a list of tuples. Parameters ---------- fileName : Path "" collect : (Literal["", "source", "sink"], optional) Collect value by source, sink or just as pair. Defaults to "pair". Raises ------ ValueError The file does not exist. ValueError Invalid format in the list file. Returns ------- Union : [list[tuple[str, str]], dict[str, list[str]]] Return either a list of connection pairs or a dictionary of lists which is collected by the specified option, source or sink. """ if not filePath.exists(): raise FileNotFoundError(f"The file {filePath} does not exist.") resultList = [] with filePath.open() as f: file = f.read() file = re.sub(r"#.*", "", file) file = file.split("\n") for i, line in enumerate(file): line = line.replace(" ", "").replace("\t", "").split(",") line = [i for i in line if i != ""] if not line: continue if len(line) != 2: raise InvalidListFileDefinition( f"Invalid list formatting in file: {filePath} at line {i}: {line}" ) left, right = line[0], line[1] if left == "INCLUDE": resultList.extend(parseList(filePath.parent.joinpath(right), "pair")) continue leftList = [] rightList = [] expandListPorts(left, leftList) expandListPorts(right, rightList) if len(leftList) != len(rightList): raise InvalidListFileDefinition( f"List file {filePath} does not have the same number of source and " f"sink ports at line {i}: {line}" ) resultList += list(zip(leftList, rightList, strict=False)) result = list(dict.fromkeys(resultList)) resultDic = {} if collect == "source": for k, v in result: if k not in resultDic: resultDic[k] = [] resultDic[k].append(v) return resultDic if collect == "sink": for k, v in result: for i in v: if i not in resultDic: resultDic[i] = [] resultDic[i].append(k) return resultDic return result
[docs] def parsePortLine(line: str) -> tuple[list[Port], tuple[str, str] | None]: """Parse a single line of the port configuration from the CSV file. Parameters ---------- line : str CSV line containing port configuration data. Returns ------- tuple[list[Port], tuple[str, str] | None] A tuple containing a list of parsed ports and an optional common wire pair. """ ports = [] commonWirePair: tuple[str, str] | None temp: list[str] = line.split(",") if temp[0] in ["NORTH", "SOUTH", "EAST", "WEST"]: ports.append( Port( Direction[temp[0]], temp[1], int(temp[2]), int(temp[3]), temp[4], int(temp[5]), temp[1], IO.OUTPUT, Side[temp[0]], ) ) ports.append( Port( Direction[temp[0]], temp[1], int(temp[2]), int(temp[3]), temp[4], int(temp[5]), temp[4], IO.INPUT, Side[oppositeDic[temp[0]].upper()], ) ) commonWirePair = (f"{temp[1]}", f"{temp[4]}") elif temp[0] == "JUMP": ports.append( Port( Direction.JUMP, temp[1], int(temp[2]), int(temp[3]), temp[4], int(temp[5]), temp[1], IO.OUTPUT, Side.ANY, ) ) ports.append( Port( Direction.JUMP, temp[1], int(temp[2]), int(temp[3]), temp[4], int(temp[5]), temp[4], IO.INPUT, Side.ANY, ) ) commonWirePair = None else: raise InvalidPortType(f"Unknown port type: {temp[0]}") return (ports, commonWirePair)