Skip to content

Registries API

Registries map component names to their source .comp files. mccode-antlr supports local directories, remote (GitHub-hosted) releases, and in-memory registries for testing.

registry_from_specification

The most convenient entry point — accepts several specification formats:

from mccode_antlr.reader import registry_from_specification

# Local directory
reg = registry_from_specification("/path/to/components")

# Local directory with explicit name
reg = registry_from_specification("mylib /path/to/components")

# GitHub release (short pip-style form)
reg = registry_from_specification("git+https://github.com/mccode-dev/McCode@v3.5.15")

# GitHub release (full form: name url version registry-file)
reg = registry_from_specification(
    "mccode https://github.com/mccode-dev/McCode v3.5.15 pooch-registry.txt"
)

mccode_antlr.reader.registry_from_specification(spec)

Construct a Local or Remote Registry instance from a specification string

Expected specifications are:

  1. {resolvable folder path}
  2. {name} {resolvable folder path}
  3. {name} {resolvable url} {resolvable file path}
  4. {name} {resolvable url} {version} {registry file name}
  5. git+{url}@{version} or git+{url}@{version}#{registry-file}
  6. {owner}/{repo}@{version} or {owner}/{repo}@{version}#{registry-file}

The first two variants make a LocalRegistry, which searches the provided directory for files. The third makes a ModuleRemoteRegistry using pooch. The resolvable file path should point at a Pooch registry file. The fourth makes a GitHubRegistry, which uses the specific folder structure of GitHub. Formats 5 and 6 are compact git-reference forms that also produce a GitHubRegistry. Format 6 expands {owner}/{repo} to https://github.com/{owner}/{repo}. For formats 5 and 6 the registry file defaults to pooch-registry.txt when the #{registry-file} fragment is omitted.

Source code in src/mccode_antlr/reader/registry.py
def registry_from_specification(spec: str):
    """Construct a Local or Remote Registry instance from a specification string

    Expected specifications are:

    1. ``{resolvable folder path}``
    2. ``{name} {resolvable folder path}``
    3. ``{name} {resolvable url} {resolvable file path}``
    4. ``{name} {resolvable url} {version} {registry file name}``
    5. ``git+{url}@{version}`` or ``git+{url}@{version}#{registry-file}``
    6. ``{owner}/{repo}@{version}`` or ``{owner}/{repo}@{version}#{registry-file}``

    The first two variants make a LocalRegistry, which searches the provided directory for files.
    The third makes a ModuleRemoteRegistry using pooch. The resolvable file path should point at a Pooch registry file.
    The fourth makes a GitHubRegistry, which uses the specific folder structure of GitHub.
    Formats 5 and 6 are compact git-reference forms that also produce a GitHubRegistry.
    Format 6 expands ``{owner}/{repo}`` to ``https://github.com/{owner}/{repo}``.
    For formats 5 and 6 the registry file defaults to ``pooch-registry.txt`` when
    the ``#{registry-file}`` fragment is omitted.
    """
    if isinstance(spec, Registry):
        return spec

    # Formats 5 & 6: compact git-reference specs (no spaces, contain '@')
    parsed = _parse_gitref_spec(spec)
    if parsed is not None:
        name, url, version, registry_file = parsed
        return GitHubRegistry(name, url, version, registry_file)

    parts = spec.split()
    if len(parts) == 0:
        return None
    elif len(parts) == 1:
        p1, p2, p3, p4, p5 = parts[0], parts[0], None, None, None
    elif len(parts) < 4:
        p1, p2, p3, p4, p5 = parts[0], parts[1], None if len(parts) < 3 else parts[2], None, None
    else:
        p1, p2, p3, p4 = parts[0], parts[1], parts[2], parts[3]
        p5 = parts[4] if len(parts) >= 5 else None
    # convert string literals to strings:
    p1 = p1[1:-1] if p1.startswith('"') and p1.endswith('"') else p1
    p2 = p2[1:-1] if p2.startswith('"') and p2.endswith('"') else p2
    p3 = p3[1:-1] if p3 is not None and p3.startswith('"') and p3.endswith('"') else p3
    p4 = p4[1:-1] if p4 is not None and p4.startswith('"') and p4.endswith('"') else p4
    p5 = p5[1:-1] if p5 is not None and p5.startswith('"') and p5.endswith('"') else p5

    if Path(p2).exists() and Path(p2).is_dir():
        return LocalRegistry(p1, str(Path(p2).resolve()))

    # (simple) URL validation:
    if not simple_url_validator(p2, file_ok=True):
        return None

    if p3 is not None and Path(p3).exists() and Path(p3).is_file():
        return ModuleRemoteRegistry(p1, p2, Path(p3).resolve().as_posix())

    if p4 is not None:
        return GitHubRegistry(p1, p2, p3, p4, registry=p5)

    return None

Registry (base class)

mccode_antlr.reader.registry.Registry

Source code in src/mccode_antlr/reader/registry.py
class Registry:
    name = None
    root = None
    pooch = None
    version = None
    priority: int = 0

    def __str__(self):
        from mccode_antlr.common import TextWrapper
        return self.to_string(TextWrapper())

    def __hash__(self):
        return hash(str(self))

    def to_string(self, wrapper):
        from io import StringIO
        output = StringIO()
        self.to_file(output, wrapper)
        return output.getvalue()

    def to_file(self, output, wrapper):
        print(f'Registry<{self.name=},{self.root=},{self.pooch=},{self.version=},{self.priority=}>', file=output)

    def known(self, name: str, ext: str = None, strict: bool = False):
        pass

    def unique(self, name: str):
        pass

    def fullname(self, name: str, ext: str = None):
        pass

    def is_available(self, name: str, ext: str = None):
        pass

    def path(self, name: str, ext: str = None) -> Path:
        pass

    def filenames(self) -> list[str]:
        pass

    def search(self, regex: Pattern):
        """Return filenames containing the regex pattern, uses regex search"""
        regex = ensure_regex_pattern(regex)
        return [x for x in self.filenames() if regex.search(x) is not None]

    def match(self, regex: Pattern):
        """Return regex *matching* registered file names -- which *start* with the regex pattern"""
        regex = ensure_regex_pattern(regex)
        return [x for x in self.filenames() if regex.match(x) is not None]

    def contents(self, *args, **kwargs):
        """Return the text contents of a Registry file"""
        return self.path(*args, **kwargs).read_text()

search(regex)

Return filenames containing the regex pattern, uses regex search

Source code in src/mccode_antlr/reader/registry.py
def search(self, regex: Pattern):
    """Return filenames containing the regex pattern, uses regex search"""
    regex = ensure_regex_pattern(regex)
    return [x for x in self.filenames() if regex.search(x) is not None]

match(regex)

Return regex matching registered file names -- which start with the regex pattern

Source code in src/mccode_antlr/reader/registry.py
def match(self, regex: Pattern):
    """Return regex *matching* registered file names -- which *start* with the regex pattern"""
    regex = ensure_regex_pattern(regex)
    return [x for x in self.filenames() if regex.match(x) is not None]

contents(*args, **kwargs)

Return the text contents of a Registry file

Source code in src/mccode_antlr/reader/registry.py
def contents(self, *args, **kwargs):
    """Return the text contents of a Registry file"""
    return self.path(*args, **kwargs).read_text()

LocalRegistry

mccode_antlr.reader.registry.LocalRegistry

Bases: Registry

Source code in src/mccode_antlr/reader/registry.py
class LocalRegistry(Registry):
    def __init__(self, name: str, root: str, priority: int = 10):
        self.name = name
        self.root = Path(root)
        self.version = mccode_antlr_version()
        self.priority = priority

    def __repr__(self):
        return f'LocalRegistry({self.name!r}, {self.root!r}, {self.priority!r})'

    def __hash__(self):
        return hash(str(self))

    def file_contents(self):
        return {'name': self.name, 'root': self.root.as_posix(), 'priority': self.priority}

    def to_file(self, output, wrapper):
        print(wrapper.line('Registry:', [self.name, wrapper.url(self.root.as_posix())]), file=output)

    def _filetype_iterator(self, filetype: str):
        return self.root.glob(f'**/*.{filetype}')

    def _file_iterator(self, name: str):
        return self.root.glob(f'**/{name}')

    def _exact_file_iterator(self, name: str):
        return self.root.glob(name)

    def known(self, name: str, ext: str = None, strict: bool = False):
        compare = _name_plus_suffix(name, ext)
        return len(list(self._file_iterator(compare))) > 0

    def unique(self, name: str):
        return len(list(self._file_iterator(name))) == 1

    def fullname(self, name: str, ext: str = None, exact: bool = False):
        compare = _name_plus_suffix(name, ext)
        # Complete match
        is_compare = list(self._exact_file_iterator(compare))
        if len(is_compare) == 1:
            return is_compare[0]
        # Complete match if name happens to be missing the extension
        is_name = list(self._exact_file_iterator(name))
        if len(is_name) == 1:
            return is_name[0]
        if not exact:
            from loguru import logger
            ends_with_compare = list(self._file_iterator(compare))
            if len(ends_with_compare) == 1:
                return ends_with_compare[0]
            # Complete match if name happens to be missing the extension
            ends_with_name = list(self._file_iterator(name))
            if len(ends_with_name) == 1:
                return ends_with_name[0]
        # Or matching *any* file that contains name
        matches = list(self._file_iterator(name))
        if len(matches) == 0:
            raise RuntimeError(f'No match for {compare} or {name} under {self.root}')
        if len(matches) != 1:
            raise RuntimeError(f'More than one match for {name}:{ext}, which is required of:\n{matches}')
        return matches[0]

    def is_available(self, name: str, ext: str = None):
        return self.known(name, ext)

    def path(self, name: str, ext: str = None, exact: bool = False) -> Path:
        return self.root.joinpath(self.fullname(name, ext, exact))

    def filenames(self) -> list[str]:
        return [str(x) for x in self.root.glob('**')]

    def __eq__(self, other):
        if not isinstance(other, Registry):
            return False
        if other.name != self.name:
            return False
        if other.root != self.root:
            return False
        return True

GitHubRegistry

mccode_antlr.reader.registry.GitHubRegistry

Bases: RemoteRegistry

Source code in src/mccode_antlr/reader/registry.py
class GitHubRegistry(RemoteRegistry):
    def __init__(self, name: str, url: str, version: str, filename: str | None = None,
                 registry: str | dict | None = None, priority: int = 0):
        from os import access, R_OK, W_OK
        if filename is None:
            filename = f'{name}-registry.txt'
        super().__init__(name, url, version, filename, priority)

        # If registry is a string url, we expect the registry file to be available from _that_ url
        self._stashed_registry = None
        if isinstance(registry, str) and simple_url_validator(registry, file_ok=True):
            self._stashed_registry = registry
            registry = f'{registry}/raw/{self.version}/'

        base_url = f'{self.url}/raw/{self.version}/'
        cache_path = pooch.os_cache(f'mccodeantlr/{self.name}')
        registry_file = self.filename or 'pooch-registry.txt'
        registry_file_path = cache_path.joinpath(self.version, registry_file)
        if registry_file_path.exists() and registry_file_path.is_file() and access(registry_file_path, R_OK):
            with registry_file_path.open('r') as file:
                registry = {k: v for k, v in [x.strip().split(maxsplit=1) for x in file.readlines() if len(x)]}
        else:
            # We allow a full-dictionary to be provided, otherwise we expect the registry file to be available from the
            # base_url where all subsequent files are also expected to be available
            if not isinstance(registry, dict):
                r = _fetch_registry_with_retry((registry or base_url) + registry_file)
                if not r.ok:
                    raise RuntimeError(f"Could not retrieve {r.url} because {r.reason}")
                registry = {k: v for k, v in [x.split(maxsplit=1) for x in r.text.split('\n') if len(x)]}
            # stash-away the registry file to be re-read next time
            check = registry_file_path.parent
            last = Path('/')
            while not check.exists() and check != last:
                last, check = check, check.parent
            # check is now a directory that exists, it may be the root of the filesystem
            if access(check, W_OK):
                registry_file_path.parent.mkdir(parents=True, exist_ok=True)
                with registry_file_path.open('w') as file:
                    file.writelines('\n'.join([f'{k} {v}' for k, v in registry.items()]))
            else:
                logger.warning(f'Can not output {registry_file_path}, you lack write permissions for {check}')

        self.pooch = pooch.create(
            path=cache_path,
            base_url=base_url,
            version=version if version.startswith('v') else None,
            version_dev="main",
            registry=registry,
        )

    @property
    def registry(self):
        return self._stashed_registry

    def to_file(self, output, wrapper):
        filename = self.filename or f'{self.name}-registry.txt'
        items = [self.name, wrapper.url(self.url or ''), self.version or '', filename]
        if self._stashed_registry:
            items.append(wrapper.url(self._stashed_registry))
        print(wrapper.line('Registry:', items), file=output)

    def file_contents(self) -> dict[str, str]:
        fc = super().file_contents()
        fc['registry'] = self._stashed_registry or ''
        return fc

    @classmethod
    def file_keys(cls) -> tuple[str, ...]:
        return super().file_keys() + ('registry',)

InMemoryRegistry

mccode_antlr.reader.registry.InMemoryRegistry

Bases: Registry

Source code in src/mccode_antlr/reader/registry.py
class InMemoryRegistry(Registry):
    def __init__(self, name, priority: int = 100, **components):
        self.name = name
        self.root = '/proc/memory/'  # Something pathlike is needed?
        self.version = mccode_antlr_version()
        self.components = {k if k.lower().endswith('.comp') else f'{k}.comp': v for k, v in components.items()}
        self.priority = priority

    def to_file(self, output, wrapper):
        print(wrapper.line('InMemoryRegistry:', [self.name, f'({len(self.components)} components)']), file=output)

    def add(self, name: str, definition: str):
        self.components[name] = definition

    def add_comp(self, name: str, definition: str):
        if not name.lower().endswith('.comp'):
            name += '.comp'
        self.add(name, definition)

    def add_instr(self, name: str, definition: str):
        if not name.lower().endswith('.instr'):
            name += '.instr'
        self.add(name, definition)

    def filenames(self) -> list[str]:
        return list(self.components.keys())

    def fullname(self, name: str, ext: str | None = None):
        full_name = name if ext is None else name + ext
        return full_name if full_name in self.components else None

    def known(self, name: str, ext: str | None = None, strict: bool = False):
        full_name = self.fullname(name, ext=ext)
        if full_name is not None and full_name in self.components:
            return True
        return False

    def contents(self, name: str, ext: str | None = None):
        full_name = self.fullname(name, ext=ext)
        if full_name is not None and full_name in self.components:
            return self.components[full_name]
        raise KeyError(f'InMemoryRegistry does not know of {name if ext is None else name + ext}')