Source code for FABulous.fabric_generator.parser.parse_csv

"""Contains functions for parsing CSV files related to the fabric definition."""

import re
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING

from loguru import logger

from FABulous.custom_exception import (
    InvalidFabricDefinition,
    InvalidFabricParameter,
    InvalidFileType,
    InvalidSupertileDefinition,
    InvalidTileDefinition,
)
from FABulous.fabric_definition.define import IO, ConfigBitMode, MultiplexerStyle
from FABulous.fabric_definition.Fabric import Fabric
from FABulous.fabric_definition.Gen_IO import Gen_IO
from FABulous.fabric_definition.SuperTile import SuperTile
from FABulous.fabric_definition.Tile import Tile
from FABulous.fabric_generator.gen_fabric.fabric_automation import (
    addBelsToPrim,
    generateCustomTileConfig,
    generateSwitchmatrixList,
)
from FABulous.fabric_generator.parser.parse_hdl import parseBelFile
from FABulous.fabric_generator.parser.parse_switchmatrix import (
    parseList,
    parseMatrix,
    parsePortLine,
)
from FABulous.FABulous_settings import get_context

if TYPE_CHECKING:
    from FABulous.fabric_definition.Bel import Bel
    from FABulous.fabric_definition.Port import Port


[docs] def parseTilesCSV(fileName: Path) -> tuple[list[Tile], list[tuple[str, str]]]: """Parse a CSV tile configuration file and returns all tile objects. Parameters ---------- fileName : str The path to the CSV file. Returns ------- Tuple[List[Tile], List[Tuple[str, str]]] A tuple containing a list of Tile objects and a list of common wire pairs. Raises ------ ValueError : If the input csv file is not a CSV file. ValueError : If the input csv file does not exist. ValueError : If the BEL file is not a VHDL or Verilog file. ValueError : If GEN_IO pins are lower than 1 ValueError : If a GEN_IO is a CONFIGACCESS and not an OUTPUT. ValueError : If a GEN_IO with the same prefix already exists in the tile. ValueError : If a GEN_IO is CLOCKED and CLOCKED_COMB at the same time. ValueError : If the MATRIX file is not a .list, .csv, .v, or .vhdl file. """ logger.info(f"Reading tile configuration: {fileName}") if fileName.suffix != ".csv": raise InvalidFileType("File must be a CSV file.") if not fileName.exists(): raise FileExistsError(f"File {fileName} does not exist.") filePathParent = fileName.parent with fileName.open() as f: file = f.read() file = re.sub(r"#.*", "", file) tilesData = re.findall(r"TILE(.*?)EndTILE", file, re.MULTILINE | re.DOTALL) new_tiles = [] commonWirePairs = [] proj_dir = get_context().proj_dir # Parse each tile config for t in tilesData: t = t.split("\n") tileName = t[0].split(",")[1].strip() ports: list[Port] = [] bels: list[Bel] = [] matrixDir: Path | None = None gen_ios: list[Gen_IO] = [] withUserCLK = False configBit = 0 genMatrixList = False tileCarry: dict[str, dict[IO, str]] = {} localSharedPorts: dict[str, list[Port]] = {} for item in t: temp: list[str] = item.split(",") temp = [i.strip() for i in temp] if not temp or temp[0] == "": continue if temp[0] in ["NORTH", "SOUTH", "EAST", "WEST", "JUMP"]: port, commonWirePair = parsePortLine(item) if "CARRY" in temp[6]: # For prefix after carry carryPrefix = re.search(r'CARRY="([^"]+)"', temp[6]) if not carryPrefix: carryPrefix = "FABulous_default" else: carryPrefix = carryPrefix.group(1) if carryPrefix not in tileCarry: tileCarry[carryPrefix] = {} tileCarry[carryPrefix][IO.OUTPUT] = f"{temp[1]}0" tileCarry[carryPrefix][IO.INPUT] = f"{temp[4]}0" else: raise ValueError( "There is already a carrychain with the prefix " f"{carryPrefix}" ) if "SHARED_" in temp[6]: if "JUMP" not in temp[0]: raise InvalidTileDefinition( "LOCAL SHARED_ Ports can only be used with JUMP ports." ) localShared = temp[6].split("_")[1] if localShared is None or localShared == "": raise InvalidTileDefinition("SHARED_ cannot be empty.") if localShared not in ["RESET", "ENABLE"]: raise InvalidTileDefinition( f"LOCAL SHARED_ port {localShared} is not supported. " "Only SHARED_RESET and SHARED_ENABLE are supported." ) if localShared not in localSharedPorts: localSharedPorts[localShared] = port else: raise InvalidTileDefinition( f"LOCAL SHARED_ port {localShared} already exists." ) ports.extend(port) if commonWirePair: commonWirePairs.append(commonWirePair) elif temp[0] == "BEL": belFilePath = filePathParent.joinpath(temp[1]) bel_prefix = temp[2] if len(temp) > 2 else "" if ( temp[1].endswith(".vhdl") or temp[1].endswith(".v") or temp[1].endswith(".sv") ): bels.append(parseBelFile(belFilePath, bel_prefix)) else: raise InvalidFileType( f"File {belFilePath} is not a .vhdl or .v file. " "Please check the BEL file." ) if "ADD_AS_CUSTOM_PRIM" in temp[3:]: primsFile = proj_dir.joinpath("user_design/custom_prims.v") logger.info(f"Adding bels to custom prims file: {primsFile}") addBelsToPrim(primsFile, [bels[-1]]) elif temp[0] == "GEN_IO": configBit = 0 configAccess = False inverted = False clocked = False clockedComb = False clockedMux = False pins = int(temp[1]) if pins <= 0: raise InvalidTileDefinition( f"GEN_IO pins must be greater than 0, but is {pins}" ) # Additional params can be added for param in temp[4:]: param = param.strip() param = param.upper() if param == "CONFIGACCESS": if temp[2] != "OUTPUT": raise InvalidTileDefinition( "CONFIGACCESS GEN_IO can only be used with OUTPUT, " f"but is {temp[2]}" ) if not configAccess and temp[2] != "OUTPUT": raise InvalidTileDefinition( "CONFIGACCESS GEN_IO can only be used with OUTPUT, " f"but is {temp[2]}" ) configAccess = True configBit = int(temp[1]) elif param == "INVERTED": inverted = True elif param == "CLOCKED": clocked = True elif param == "CLOCKED_COMB": clockedComb = True elif param == "CLOCKED_MUX": clockedMux = True configBit = int(temp[1]) elif param is None or param == "": continue else: raise InvalidTileDefinition( f"Unknown parameter {param} in GEN_IO. " "Valid parameters are CONFIGACCESS, INVERTED, CLOCKED, " "CLOCKED_COMB, CLOCKED_MUX." ) if configAccess and (clocked or clockedComb or clockedMux): raise InvalidTileDefinition( "CONFIGACCESS GEN_IO can not be clocked" ) if sum([clocked, clockedComb, clockedMux]) > 1: raise InvalidTileDefinition( "CLOCKED, CLOCKED_COMB or CLOCKED_MUX can not be combined " "for one GEN_IO" ) if temp[3] not in (gio.prefix for gio in gen_ios): gen_ios.append( Gen_IO( temp[3], int(temp[1]), IO[temp[2]], configBit, configAccess, inverted, clocked, clockedComb, clockedMux, ) ) else: raise InvalidTileDefinition( f"GEN_IO with prefix {temp[3]} already exists in tile " f"{tileName}." ) elif temp[0] == "MATRIX": configBit = 0 if "GENERATE" in temp: logger.info(f"Generating switch matrix list for tile {tileName}") genMatrixList = True if len(temp) <= 2: # only MATRIX, GENERATE in csv matrixDir = fileName.parent else: matrixDir = fileName.parent.joinpath(temp[2]) if matrixDir.is_file() and matrixDir.suffix == ".list": logger.warning( f"Matrix file {matrixDir} already exists and will be " "overwritten." ) elif matrixDir.parent == proj_dir.joinpath("Tile"): matrixDir = matrixDir.joinpath( f"{tileName}_generated_switch_matrix.list" ) logger.info(f"Generating matrix file {matrixDir}") else: matrixDir = proj_dir.joinpath( f"./Tile/{tileName}/{tileName}_generated_switch_matrix.list" ) logger.warning( "No destination directory for matrix file sepicified, " f"using default path {matrixDir}." ) if not matrixDir.parent.exists(): matrixDir.parent.mkdir(parents=True) logger.warning(f"Creating directory {matrixDir.parent}.") else: matrixDir = fileName.parent.joinpath(temp[1]) match matrixDir.suffix: case ".list": for _, v in parseList(matrixDir, "source").items(): muxSize = len(v) if muxSize >= 2: configBit += (muxSize - 1).bit_length() case "_matrix.csv": for _, v in parseMatrix(matrixDir, tileName).items(): muxSize = len(v) if muxSize >= 2: configBit += (muxSize - 1).bit_length() case ".vhdl" | ".v": with matrixDir.open() as f: f = f.read() if configBit := re.search( r"NumberOfConfigBits: (\d+)", f ): configBit = int(configBit.group(1)) else: configBit = 0 logger.warning( "Cannot find NumberOfConfigBits in " f"{matrixDir} assume 0 config bits." ) case _: raise InvalidFileType("Unknown file extension for matrix.") elif temp[0] == "INCLUDE": p = fileName.parent.joinpath(temp[1]) if not p.exists(): raise InvalidTileDefinition( f"Cannot find {str(p)} in tile {tileName}" ) with p.open() as f: iFile = f.read() iFile = re.sub(r"#.*", "", iFile) for line in iFile.split("\n"): lineItem = line.split(",") if not lineItem[0]: continue port, commonWirePair = parsePortLine(line) ports.extend(port) if commonWirePair: commonWirePairs.append(commonWirePair) else: raise InvalidTileDefinition( f"Unknown tile description {temp[0]} in tile {tileName}. " "Valid descriptions are NORTH, SOUTH, EAST, WEST, JUMP, BEL, " "GEN_IO, MATRIX, and INCLUDE." ) withUserCLK = any(bel.withUserCLK for bel in bels) if genMatrixList: generateSwitchmatrixList( tileName, bels, matrixDir, tileCarry, localSharedPorts ) for _, v in parseList(matrixDir, "source").items(): muxSize = len(v) if muxSize >= 2: configBit += (muxSize - 1).bit_length() new_tiles.append( Tile( name=tileName, ports=ports, bels=bels, tileDir=fileName, matrixDir=matrixDir, gen_ios=gen_ios, userCLK=withUserCLK, configBit=configBit, ) ) return (new_tiles, commonWirePairs)
[docs] def parseSupertilesCSV(fileName: Path, tileDic: dict[str, Tile]) -> list[SuperTile]: """Parse a CSV supertile configuration file and returns all SuperTile objects. Parameters ---------- fileName : str The path to the CSV file. tileDic : Dict[str, Tile] Dict of tiles. Returns ------- List[SuperTile] List of SuperTile objects. """ logger.info(f"Reading supertile configuration: {fileName}") if not fileName.suffix == ".csv": raise InvalidFileType("File must be a csv file.") if not fileName.exists(): raise FileNotFoundError(f"File {fileName} does not exist.") filePath = fileName.parent with fileName.open() as f: file = f.read() file = re.sub(r"#.*", "", file) superTilesData = re.findall( r"SuperTILE(.*?)EndSuperTILE", file, re.MULTILINE | re.DOTALL ) new_supertiles = [] # Parse each supertile config for t in superTilesData: description = t.split("\n") name = description[0].split(",")[1] tileMap = [] tiles = [] bels = [] withUserCLK = False for i in description[1:-1]: line = i.split(",") line = [i for i in line if i != "" and i != " "] row = [] if line[0] == "BEL": belFilePath = filePath.joinpath(line[1]) bels.append(parseBelFile(belFilePath, line[2])) for j in line: if j in tileDic: # mark the tile as part of super tile tileDic[j].partOfSuperTile = True t = deepcopy(tileDic[j]) row.append(t) if t not in tiles: tiles.append(t) elif j == "Null" or j == "NULL" or j == "None": row.append(None) else: raise InvalidSupertileDefinition( f"The super tile {name} contains definitions that are not " "tiles or Null." ) tileMap.append(row) withUserCLK = any(bel.withUserCLK for bel in bels) new_supertiles.append(SuperTile(name, tiles, tileMap, bels, withUserCLK)) return new_supertiles
[docs] def parseFabricCSV(fileName: str) -> Fabric: """Parse a CSV file and returns a fabric object. Parameters ---------- fileName : str Directory of the CSV file. Raises ------ ValueError - File provided is not a CSV file. - CSV file does not exist. - FabricBegin and FabricEnd regions cannot be found. - ParametersBegin and ParametersEnd regions cannot be found. - Bel entry extension is not ".v" or ".vhdl". - Matrix entry extension is not ".list", ".csv", ".v", or ".vhdl". - Unknown tile description entry is found in the CSV file. - Unknown tile is found in the fabric entry in the CSV file. - Unknown super tile is found in the super tile entry in the CSV file. - Invalid ConfigBitMode is found in the parameter entry in the CSV file. - Invalid MultiplexerStyle is found in the parameter entry in the CSV file. - Invalid parameter entry is found in the CSV file. Returns ------- Fabric The fabric object. """ fName = Path(fileName) if fName.suffix != ".csv": raise InvalidFileType("File must be a csv file") if not fName.exists(): raise FileNotFoundError(f"File {fName} does not exist.") filePath = fName.parent with fName.open() as f: file = f.read() file = re.sub(r"#.*", "", file) # read in the csv file and part them if fabricDescription := re.search( r"FabricBegin(.*?)FabricEnd", file, re.MULTILINE | re.DOTALL ): fabricDescription = fabricDescription.group(1) else: raise InvalidFabricDefinition( "Cannot find FabricBegin and FabricEnd in csv file." ) if parameters := re.search( r"ParametersBegin(.*?)ParametersEnd", file, re.MULTILINE | re.DOTALL ): parameters = parameters.group(1) else: raise InvalidFabricDefinition( "Cannot find ParametersBegin and ParametersEnd in csv file." ) fabricDescription = fabricDescription.split("\n") parameters = parameters.split("\n") # Lists for tiles tileTypes = [] tileDefs = [] commonWirePair: list[tuple[str, str]] = [] fabricTiles = [] tileDic = {} unusedTileDic = {} # list for supertiles superTileDic = {} unusedSuperTileDic = {} # For backwards compatibility parse tiles in fabric config new_tiles, new_commonWirePair = parseTilesCSV(fName) tileTypes += [new_tile.name for new_tile in new_tiles] tileDefs += new_tiles commonWirePair += new_commonWirePair tileDic = dict(zip(tileTypes, tileDefs, strict=False)) new_supertiles = parseSupertilesCSV(fName, tileDic) for new_supertile in new_supertiles: superTileDic[new_supertile.name] = new_supertile if new_tiles or new_supertiles: logger.warning( f"Deprecation warning: {fName} should not contain tile descriptions." ) # parse the parameters height = 0 width = 0 configBitMode = ConfigBitMode.FRAME_BASED frameBitsPerRow = 32 maxFramesPerCol = 20 package = "use work.my_package.all;" generateDelayInSwitchMatrix = 80 multiplexerStyle = MultiplexerStyle.CUSTOM superTileEnable = True for i in parameters: i = i.split(",") i = [j for j in i if j != ""] i = [i.strip() for i in i] if not i: continue if i[0].startswith("Tile"): if "GENERATE" in i: # we generate the tile right before we parse everything i[1] = str(generateCustomTileConfig(filePath.joinpath(i[1]))) new_tiles, new_commonWirePair = parseTilesCSV(filePath.joinpath(i[1])) tileTypes += [new_tile.name for new_tile in new_tiles] tileDefs += new_tiles commonWirePair += new_commonWirePair tileDic = dict(zip(tileTypes, tileDefs, strict=False)) elif i[0].startswith("Supertile"): new_supertiles = parseSupertilesCSV(filePath.joinpath(i[1]), tileDic) for new_supertile in new_supertiles: superTileDic[new_supertile.name] = new_supertile elif i[0].startswith("ConfigBitMode"): if i[1] == "frame_based": configBitMode = ConfigBitMode.FRAME_BASED elif i[1] == "FlipFlopChain": configBitMode = ConfigBitMode.FLIPFLOP_CHAIN else: raise InvalidFabricParameter( f"Invalid config bit mode {i[1]} in parameters. " "Valid options are frame_based and FlipFlopChain." ) elif i[0].startswith("FrameBitsPerRow"): frameBitsPerRow = int(i[1]) elif i[0].startswith("MaxFramesPerCol"): maxFramesPerCol = int(i[1]) elif i[0].startswith("Package"): package = i[1] elif i[0].startswith("GenerateDelayInSwitchMatrix"): generateDelayInSwitchMatrix = int(i[1]) elif i[0].startswith("MultiplexerStyle"): if i[1] == "custom": multiplexerStyle = MultiplexerStyle.CUSTOM elif i[1] == "generic": multiplexerStyle = MultiplexerStyle.GENERIC else: raise InvalidFabricParameter( f"Invalid multiplexer style {i[1]} in parameters. " "Valid options are custom and generic." ) elif i[0].startswith("SuperTileEnable"): superTileEnable = i[1] == "TRUE" else: raise InvalidFabricParameter(f"The following parameter is not valid: {i}") # form the fabric data structure usedTile = set() for f in fabricDescription: fabricLineTmp = f.split(",") fabricLineTmp = [i for i in fabricLineTmp if i != ""] fabricLineTmp = [i.strip() for i in fabricLineTmp] if not fabricLineTmp: continue fabricLine = [] for i in fabricLineTmp: if i in tileDic: fabricLine.append(deepcopy(tileDic[i])) usedTile.add(i) elif i == "Null" or i == "NULL" or i == "None": fabricLine.append(None) else: raise InvalidFabricDefinition( f"Unknown tile {i} in fabric description. " "Please check the tile definitions." ) fabricTiles.append(fabricLine) for i in list(tileDic.keys()): if i not in usedTile: logger.info( f"Tile {i} is not used in the fabric. Removing from tile dictionary." ) unusedTileDic[i] = tileDic[i] del tileDic[i] for i in list(superTileDic.keys()): if any(j.name not in usedTile for j in superTileDic[i].tiles): logger.info( f"Supertile {i} is not used in the fabric. " "Removing from tile dictionary." ) unusedSuperTileDic[i] = superTileDic[i] del superTileDic[i] height = len(fabricTiles) width = len(fabricTiles[0]) commonWirePair = list(dict.fromkeys(commonWirePair)) commonWirePair = [ (i, j) for (i, j) in commonWirePair if "NULL" not in i and "NULL" not in j ] return Fabric( tile=fabricTiles, numberOfColumns=width, numberOfRows=height, configBitMode=configBitMode, frameBitsPerRow=frameBitsPerRow, maxFramesPerCol=maxFramesPerCol, package=package, generateDelayInSwitchMatrix=generateDelayInSwitchMatrix, multiplexerStyle=multiplexerStyle, numberOfBRAMs=int(height / 2), superTileEnable=superTileEnable, tileDic=tileDic, superTileDic=superTileDic, unusedTileDic=unusedTileDic, unusedSuperTileDic=unusedSuperTileDic, commonWirePair=commonWirePair, )