Skip to content

Instr API

Instr is the intermediate representation of a complete instrument. Build one via Assembler or parse one from a .instr file.

Reading instruments

from mccode_antlr.reader import read_mcstas_instr, read_mcxtrace_instr

instr = read_mcstas_instr("my_instrument.instr")

Writing instruments

instr.to_file("output.instr")
text = instr.to_string()

Jupyter display

In a Jupyter notebook, placing instr on the last line of a cell renders an interactive collapsible HTML view of the instrument.

Reference

mccode_antlr.instr.Instr

Bases: Struct

Intermediate representation of a McCode instrument

Read from a .instr file -- possibly including more .comp and .instr file sources For output to a runtime source file

Source code in src/mccode_antlr/instr/instr.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
class Instr(Struct):
    """Intermediate representation of a McCode instrument

    Read from a .instr file -- possibly including more .comp and .instr file sources
    For output to a runtime source file
    """
    name: Optional[str] = None  # Instrument name, e.g. {name}.instr (typically)
    source: Optional[str] = None  # Instrument *file* name
    parameters: tuple[InstrumentParameter, ...] = field(default_factory=tuple)  # runtime-set instrument parameters
    metadata: tuple[MetaData, ...] = field(default_factory=tuple)  # metadata for use by simulation consumers
    components: tuple[Instance, ...] = field(default_factory=tuple)  #
    included: tuple[str, ...] = field(default_factory=tuple)  # names of included instr definition(s)
    user: tuple[RawC, ...] = field(default_factory=tuple)  # struct members for _particle
    declare: tuple[RawC, ...] = field(default_factory=tuple)  # global parameters used in component trace
    initialize: tuple[RawC, ...] = field(default_factory=tuple)  # initialization of global declare parameters
    save: tuple[RawC, ...] = field(default_factory=tuple)  # statements executed after TRACE to save results
    final: tuple[RawC, ...] = field(default_factory=tuple)  # clean-up memory for global declare parameters
    registries: tuple[Registry, ...] = field(default_factory=tuple)  # the registries used by the reader to populate this
    dependency: tuple[str, ...] = field(default_factory=tuple)  # some (C) flags needed for compilation of the (translated) instrument
    flow_edges: tuple = field(default_factory=tuple)  # serialisable particle-flow edge records (FlowEdgeRecord)

    @classmethod
    def from_dict(cls, args: dict):
        from mccode_antlr.reader.registry import SerializableRegistry as SR
        from mccode_antlr.instr.instance import make_independent
        from mccode_antlr.instr.flow import FlowEdgeRecord
        popt = 'name', 'source'
        tpreq = 'included', 'dependency',
        tmtype = {'parameters': InstrumentParameter, 'metadata': MetaData,
                 'instances': DepInstance, 'user': RawC, 'declare': RawC,
                  'initialize': RawC, 'save': RawC, 'final': RawC, 'registries': SR
                 }
        dtype = {'components': Comp}

        data = {}
        data.update({k: args[k] for k in popt if k in args})
        data.update({k: tuple(a for a in args[k]) for k in tpreq})
        data.update({k: tuple(t.from_dict(a) for a in args[k]) for k, t in tmtype.items()})
        data.update({k: {n: t.from_dict(v) for n, v in args[k].items()} for k, t in dtype.items()})

        instances = data.pop('instances')
        components = data.pop('components')
        data['components'] = make_independent(instances, components)
        data['flow_edges'] = tuple(FlowEdgeRecord.from_dict(a) for a in args.get('flow_edges', []))
        return cls(**data)

    def to_dict(self):
        from msgspec.structs import fields
        from mccode_antlr.reader.registry import SerializableRegistry as SR
        data = {k.name: getattr(self, k.name) for k in fields(self)}
        instances = tuple(DepInstance.from_independent(inst) for inst in self.components)
        components = {inst.type.name: inst.type for inst in self.components}

        data['registries'] = [SR.from_registry(r) for r in self.registries]
        data['instances'] = instances
        data['components'] = components
        return data

    def __eq__(self, other):
        if not isinstance(other, Instr):
            return NotImplemented
        from msgspec.structs import fields
        for name in [f.name for f in fields(self)]:
            if getattr(self, name) != getattr(other, name):
                return False
        return True

    def __hash__(self):
        return hash((
            self.name, self.source, self.parameters, self.metadata, self.components,
            self.included, self.user, self.declare, self.initialize, self.save,
            self.final, self.registries, self.dependency
        ))

    def to_file(self, output=None, wrapper=None):
        if output is None:
            output = StringIO()
        if wrapper is None:
            from mccode_antlr.common import TextWrapper
            wrapper = TextWrapper(width=120)
        print(wrapper.start_block_comment(f'Instrument {self.name}'), file=output)
        print(wrapper.line('Instrument:', [self.name or 'None']), file=output)
        print(wrapper.line('Source:', [self.source or 'None']), file=output)
        print(wrapper.line('Contains:', [f'"%include {include}"' for include in self.included]), file=output)
        print(wrapper.line('Registries:', [registry.name for registry in self.registries]), file=output)
        for registry in self.registries:
            registry.to_file(output=output, wrapper=wrapper)
        print(wrapper.end_block_comment(), file=output)

        instr_parameters = wrapper.hide(', '.join(p.to_string(wrapper=wrapper) for p in self.parameters))
        first_line = wrapper.line('DEFINE INSTRUMENT', [f'{self.name}({instr_parameters})'])
        print(first_line, file=output)

        for metadata in self.metadata:
            metadata.to_file(output=output, wrapper=wrapper)
        # Print only the .instr-added DEPENDENCY line(s) here -- .comp DEPENDENCY excluded
        if self.dependency:
            print(wrapper.quoted_line('DEPENDENCY ', list(self.dependency)), file=output)

        if self.declare:
            print(wrapper.block('DECLARE', _join_raw_tuple(self.declare)), file=output)
        if self.user:
            print(wrapper.block('USERVARS', _join_raw_tuple(self.user)), file=output)
        if self.initialize:
            print(wrapper.block('INITIALIZE', _join_raw_tuple(self.initialize)), file=output)

        print(wrapper.start_list('TRACE'), file=output)
        for instance in self.components:
            print(wrapper.start_list_item(), file=output)
            instance.to_file(output, wrapper)
            print(wrapper.end_list_item(), file=output)
        if self.save:
            print(wrapper.block('SAVE', _join_raw_tuple(self.save)), file=output)
        if self.final:
            print(wrapper.block('FINALLY', _join_raw_tuple(self.final)), file=output)
        print(wrapper.end_list('END'), file=output)

    def to_string(self, wrapper):
        from io import StringIO
        output = StringIO()
        self.to_file(output, wrapper)
        return output.getvalue()

    def __str__(self):
        from mccode_antlr.common import TextWrapper
        return self.to_string(TextWrapper())

    def _repr_html_(self):
        from mccode_antlr.common import HTMLWrapper
        wrapper = HTMLWrapper()
        output = StringIO()
        self.to_file(output=output, wrapper=wrapper)
        body = output.getvalue()
        return f'<div class="mccode-instr">{body}</div>'

    def add_component(self, a: Instance):
        if any(x.name == a.name for x in self.components):
            raise RuntimeError(f"A component instance named {a.name} is already present in the instrument")
        prev = self.components[-1] if self.components else None
        self.components += (a,)
        if prev is not None:
            self._add_sequential_or_group_edge(prev, a)

    def add_flow_edge(self, src: str, dst: str, edge) -> None:
        """Append a single :class:`~mccode_antlr.instr.FlowEdgeRecord` to ``flow_edges``.

        Args:
            src: Source component instance name.
            dst: Destination component instance name.
            edge: Any :class:`~mccode_antlr.instr.flow.FlowEdge` subclass instance.
        """
        from .flow import FlowEdgeRecord
        self.flow_edges += (FlowEdgeRecord(src=src, dst=dst, edge=edge),)

    def _add_sequential_or_group_edge(self, prev: Instance, curr: Instance) -> None:
        """Add the correct sequential or group flow edge when *curr* is appended after *prev*."""
        from .flow import FlowEdgeRecord, SequentialEdge, GroupEdge, GroupEdgeKind
        same_group = prev.group is not None and prev.group == curr.group
        prev_exits_group = prev.group is not None and prev.group != (curr.group or '')

        if same_group:
            self.flow_edges += (FlowEdgeRecord(
                src=prev.name, dst=curr.name,
                edge=GroupEdge(group_name=prev.group, kind=GroupEdgeKind.TRY_NEXT),
            ),)
        elif prev_exits_group:
            # prev is the last member of its group; add group-exit edges from all group members to curr
            group_name = prev.group
            members = [inst for inst in self.components if inst.group == group_name]
            for member in members:
                self.flow_edges += (FlowEdgeRecord(
                    src=member.name, dst=curr.name,
                    edge=GroupEdge(group_name=group_name, kind=GroupEdgeKind.SCATTER_EXIT),
                ),)
            self.flow_edges += (FlowEdgeRecord(
                src=prev.name, dst=curr.name,
                edge=GroupEdge(group_name=group_name, kind=GroupEdgeKind.PASS_THROUGH),
            ),)
        else:
            self.flow_edges += (FlowEdgeRecord(
                src=prev.name, dst=curr.name,
                edge=SequentialEdge(when=curr.when),
            ),)

    def finalize_flow_edges(self) -> None:
        """Add JUMP edges to ``flow_edges`` (deferred because forward targets are unknown during parsing)."""
        from .flow import FlowEdgeRecord, JumpEdge
        components = self.components
        n = len(components)
        name_to_idx = {inst.name: idx for idx, inst in enumerate(components)}
        for inst in components:
            for jmp in inst.jump:
                target_idx = jmp.absolute_target
                if target_idx < 0:
                    target_idx = name_to_idx.get(jmp.target, -1)
                if 0 <= target_idx < n:
                    self.flow_edges += (FlowEdgeRecord(
                        src=inst.name, dst=components[target_idx].name,
                        edge=JumpEdge(
                            condition=jmp.condition,
                            iterate=jmp.iterate,
                            absolute_target=target_idx,
                        ),
                    ),)

    def build_flow_graph(self):
        """Rebuild ``flow_edges`` from scratch and return the derived :class:`networkx.MultiDiGraph`.

        This is idempotent; it replaces any existing ``flow_edges`` content.
        """
        from .flow import _build_flow_edge_records, flow_graph_from_records
        self.flow_edges = _build_flow_edge_records(self.components)
        return flow_graph_from_records(self.components, self.flow_edges)

    @property
    def flow_graph(self):
        """Derive a :class:`networkx.MultiDiGraph` from the current ``flow_edges`` (read-only view).

        Call :meth:`build_flow_graph` to (re)build ``flow_edges`` from the component list,
        or use :meth:`finalize_flow_edges` to add JUMP edges after incremental construction.
        """
        from .flow import flow_graph_from_records
        return flow_graph_from_records(self.components, self.flow_edges)

    def insert_component(
        self,
        name: str,
        component,
        *,
        before: Optional[str] = None,
        after: Optional[str] = None,
        at_relative: Optional[tuple] = None,
        rotate_relative: Optional[tuple] = None,
        parameters: tuple = (),
        group: Optional[str] = None,
        removable: bool = False,
    ) -> Instance:
        """Insert a new component instance before or after an existing one.

        Exactly one of *before* or *after* must be specified.

        The ``components`` tuple is updated in-place and ``flow_edges`` is
        updated consistently: the connecting sequential or TRY_NEXT edge
        between the two surrounding components is split into two edges.  All
        ``Jump.absolute_target`` fields in all existing instances are reset to
        ``-1``; the translator's ``set_jump_absolute_targets()`` (called during
        C generation) will re-resolve them by name.

        Parameters
        ----------
        name:
            Instance name for the new component (must be unique).
        component:
            Component type: a :class:`~mccode_antlr.instr.Comp` object or a
            string that will be looked up from the instrument's registries.
        before:
            Name of the existing component to insert *before*.
        after:
            Name of the existing component to insert *after*.
        at_relative:
            ``(Vector | tuple[float,float,float], Instance | str | None)``
            position and reference.  The reference may name a component that
            comes *after* the new instance in the final order; it will be
            re-expressed relative to the predecessor component.  If ``None``,
            the position is computed as the midpoint between the surrounding
            components (falls back to the origin of the predecessor when
            orientations are unavailable or non-constant).
        rotate_relative:
            ``(Angles | tuple[float,float,float], Instance | str | None)``
            rotation and reference.  Same forward-reference fixing applies.
            If ``None``, the absolute orientation of the downstream component
            is used (falls back to zero-rotation RELATIVE predecessor).
        parameters:
            Component instance parameters.
        group:
            GROUP name for the new instance, if any.
        removable:
            Whether the instance is marked removable.

        Returns
        -------
        Instance
            The newly created instance (already inserted into
            ``self.components``).

        Raises
        ------
        ValueError
            If neither or both of *before*/*after* are given, or if *name*
            is already present, or if the reference component is not found.
        """
        from .flow import FlowEdgeRecord, SequentialEdge, GroupEdge, GroupEdgeKind

        # ── 0. Validate ────────────────────────────────────────────────────────
        if (before is None) == (after is None):
            raise ValueError("Exactly one of 'before' or 'after' must be specified.")
        if any(x.name == name for x in self.components):
            raise ValueError(f"A component instance named {name!r} is already present.")

        # ── 1. Resolve reference component and insertion index ─────────────────
        comp_names = [c.name for c in self.components]
        if before is not None:
            ref_name = before if isinstance(before, str) else before.name
            if ref_name not in comp_names:
                raise ValueError(f"Component {ref_name!r} not found.")
            insert_idx = comp_names.index(ref_name)
            pred_inst = self.components[insert_idx - 1] if insert_idx > 0 else None
            succ_inst = self.components[insert_idx]
        else:
            ref_name = after if isinstance(after, str) else after.name
            if ref_name not in comp_names:
                raise ValueError(f"Component {ref_name!r} not found.")
            target_idx = comp_names.index(ref_name)
            insert_idx = target_idx + 1
            pred_inst = self.components[target_idx]
            succ_inst = self.components[insert_idx] if insert_idx < len(self.components) else None

        # ── 2. Resolve component type ──────────────────────────────────────────
        if isinstance(component, str):
            from ..reader import Reader
            reader = Reader(registries=list(self.registries))
            component = reader.get_component(component)

        # ── 3. Auto-compute position / rotation if not provided ────────────────
        if at_relative is None:
            at_relative = _midpoint_at_relative(pred_inst, succ_inst)
        if rotate_relative is None:
            rotate_relative = _downstream_rotate_relative(pred_inst, succ_inst)

        # ── 4. Normalise vector/angles in at_relative / rotate_relative ────────
        from .orientation import Vector, Angles
        at_relative = _normalise_vr(at_relative, self.components, Vector)
        rotate_relative = _normalise_vr(rotate_relative, self.components, Angles)

        # ── 5. Fix forward references (ref must come before the new instance) ──
        at_relative = _fix_forward_ref(at_relative, insert_idx, pred_inst)
        rotate_relative = _fix_forward_ref_rotation(rotate_relative, insert_idx, pred_inst)

        # ── 6. Reject insertion that would break a contiguous group ───────────
        if (pred_inst is not None and succ_inst is not None
                and pred_inst.group is not None
                and pred_inst.group == succ_inst.group
                and group != pred_inst.group):
            raise ValueError(
                f"Cannot insert component {name!r} (group={group!r}) between group members "
                f"{pred_inst.name!r} and {succ_inst.name!r} "
                f"(group {pred_inst.group!r}): McCode requires groups to be contiguous. "
                f"Insert before {pred_inst.name!r}, after the last member of the group, "
                f"or pass group={pred_inst.group!r} to join the group."
            )

        # ── 7. Create the instance (orientation computed in __post_init__) ──────
        new_inst = Instance(name, component, at_relative, rotate_relative,
                            parameters=parameters, group=group, removable=removable)

        # ── 8. Insert into components tuple ────────────────────────────────────
        comps = list(self.components)
        comps.insert(insert_idx, new_inst)
        self.components = tuple(comps)

        # ── 9. Update flow_edges: split the connecting positional edge ─────────
        updated_edges = list(self.flow_edges)
        if pred_inst is not None and succ_inst is not None:
            pred_name, succ_name = pred_inst.name, succ_inst.name
            # Find the single "positional" edge: SequentialEdge or TRY_NEXT between pred and succ
            connecting_idx = next(
                (i for i, r in enumerate(updated_edges)
                 if r.src == pred_name and r.dst == succ_name
                 and isinstance(r.edge, (SequentialEdge, GroupEdge))
                 and not (isinstance(r.edge, GroupEdge)
                          and r.edge.kind in (GroupEdgeKind.SCATTER_EXIT,
                                              GroupEdgeKind.PASS_THROUGH))),
                None,
            )
            if connecting_idx is not None:
                old_edge = updated_edges.pop(connecting_idx).edge
                if isinstance(old_edge, GroupEdge) and old_edge.kind == GroupEdgeKind.TRY_NEXT:
                    if group == pred_inst.group:
                        e1 = GroupEdge(group_name=pred_inst.group, kind=GroupEdgeKind.TRY_NEXT)
                        e2 = GroupEdge(group_name=pred_inst.group, kind=GroupEdgeKind.TRY_NEXT)
                    else:
                        e1 = SequentialEdge(when=new_inst.when)
                        e2 = SequentialEdge(when=succ_inst.when)
                else:
                    e1 = SequentialEdge(when=new_inst.when)
                    e2 = SequentialEdge(when=succ_inst.when)
                updated_edges.append(FlowEdgeRecord(src=pred_name, dst=name, edge=e1))
                updated_edges.append(FlowEdgeRecord(src=name, dst=succ_name, edge=e2))
        elif pred_inst is None and succ_inst is not None:
            updated_edges.append(FlowEdgeRecord(
                src=name, dst=succ_inst.name, edge=SequentialEdge(when=succ_inst.when),
            ))
        elif pred_inst is not None and succ_inst is None:
            updated_edges.append(FlowEdgeRecord(
                src=pred_inst.name, dst=name, edge=SequentialEdge(when=new_inst.when),
            ))
        self.flow_edges = tuple(updated_edges)

        # ── 10. Invalidate all Jump.absolute_target fields ─────────────────────
        # The translator's set_jump_absolute_targets() re-resolves these by name.
        for inst in self.components:
            for jmp in inst.jump:
                jmp.absolute_target = -1

        return new_inst

    def add_parameter(self, a: InstrumentParameter, ignore_repeated=False):
        if not parameter_name_present(self.parameters, a.name):
            self.parameters += (a,)
        elif not ignore_repeated:
            raise RuntimeError(f"An instrument parameter named {a.name} is already present in the instrument")

    def get_parameter(self, name, default=None):
        if parameter_name_present(self.parameters, name):
            for parameter in self.parameters:
                if name == parameter.name:
                    return parameter
        return default

    def has_parameter(self, name):
        return parameter_name_present(self.parameters, name)

    def last_component(self, count: int = 1, removable_ok: bool = True):
        if len(self.components) < count:
            raise RuntimeError(f"Only {len(self.components)} components defined -- can not go back {count}.")
        if removable_ok:
            return self.components[-count]
        fixed = [comp for comp in self.components if not comp.removable]
        if len(fixed) < count:
            for comp in self.components:
                logger.info(f'{comp.name}')
            raise RuntimeError(f"Only {len(fixed)} fixed components defined -- can not go back {count}.")
        return fixed[-count]

    def get_component(self, name: str):
        if name == 'PREVIOUS':
            return self.components[-1]
        for comp in self.components:
            if comp.name == name:
                return comp
        raise RuntimeError(f"No component instance named {name} defined.")

    def has_component_named(self, name: str):
        return any(comp.name == name for comp in self.components)

    def get_component_names_by_category(self, category: str):
        """Find all component instance names for a given category.

        Note:
            The category of an instance is determined by its type, and is either
            - set inside the .comp file by a 'CATEGORY <category>' directive, or
            - *guessed* by the reader based on the path to the .comp file.
            The second behaviour is to match McStasScript/McCode-3, which does not work for user-defined components.
        """
        return [inst.name for inst in self.components if category in inst.type.category]

    def add_included(self, name: str):
        self.included += (name,)

    def DEPENDENCY(self, *strings):
        self.dependency += strings

    def USERVARS(self, *blocks):
        self.user += blocks_to_raw_c(*blocks)

    def DECLARE(self, *blocks):
        self.declare += blocks_to_raw_c(*blocks)

    def INITIALIZE(self, *blocks):
        self.initialize += blocks_to_raw_c(*blocks)

    def SAVE(self, *blocks):
        self.save += blocks_to_raw_c(*blocks)

    def FINALLY(self, *blocks):
        self.final += blocks_to_raw_c(*blocks)

    def add_metadata(self, m: MetaData):
        if any([x.name == m.name for x in self.metadata]):
            self.metadata = tuple([x for x in self.metadata if x.name != m.name])
        self.metadata += (m,)

    @property
    def groups(self):
        return determine_groups(self.components)

    def component_types(self):
        # # If component order is unimportant, we can use a set:
        # return set(inst.type for inst in self.components)
        # For comparison with the C code generator, we must keep the order of component definitions
        return list(dict.fromkeys([inst.type for inst in self.components]))

    def collect_metadata(self):
        """Component definitions and instances can define metadata too, collect it all together here"""
        # Metadata defined in an instance overrides that defined in a component.
        # Metadata defined for an instrument is added to the collected list
        return tuple(m for inst in self.components for m in inst.collect_metadata()) + self.metadata

    def _getpath(self, filename: str):
        from pathlib import Path
        for registry in self.registries:
            if registry.known(filename):
                return registry.path(filename).absolute().resolve()
        return Path()

    def _replace_env_getpath_cmd(self, flags: str):
        """Replace CMD, ENV, and GETPATH directives from a flag string"""

        # Mimics McCode-3/tools/Python/mccodelib/cflags.py:evaluate_dependency_str
        #
        def getpath(chars):
            return self._getpath(chars).as_posix()

        def eval_cmd(chars):
            from mccode_antlr.utils import run_prog_message_output
            from shlex import split
            message, output = run_prog_message_output(split(chars))
            if message:
                raise RuntimeError(f"Calling {chars} resulted in error {message}")
            output = [line.strip() for line in output.splitlines() if line.strip()]
            if len(output) > 1:
                raise RuntimeError(f"Calling {chars} produced more than one line of output")
            return output[0] if output else ''

        def eval_env(chars):
            from os import environ
            return environ.get(chars, '')

        def replace(chars, start, replacer):
            if start not in chars:
                return chars
            before, after = chars.split(start, 1)
            if '(' != after[0]:
                raise ValueError(f'Missing opening parenthesis in dependency string after {start}')
            if ')' not in after:
                raise ValueError(f'Missing closing parenthesis in dependency string after {start}')
            dep, after = after[1:].split(')', 1)
            if start in dep:
                raise ValueError(f'Nested {start} in dependency string')
            return before + replacer(dep) + replace(after, start, replacer)

        for key, worker in zip(['ENV', 'GETPATH', 'CMD'], [eval_env, getpath, eval_cmd]):
            flags = replace(flags, key, worker)

        return flags

    def _replace_keywords(self, flag):
        from mccode_antlr.config import config
        from mccode_antlr.config.fallback import regex_sanitized_config_fallback
        from re import sub, findall
        if '@NEXUSFLAGS@' in flag:
            flag = sub(r'@NEXUSFLAGS@', config['flags']['nexus'].as_str_expanded(), flag)
        if '@MCCODE_LIB@' in flag:
            print(f'The instrument {self.name} uses @MCCODE_LIB@ dependencies which no longer work.')
            print('Expect problems at compilation.')
            flag = sub('@MCCODE_LIB@', '.', flag)
        general_re = r'@(\w+)@'
        for replace in findall(general_re, flag):
            # Is this replacement something like XXXFLAGS?
            if replace.lower().endswith('flags'):
                replacement = regex_sanitized_config_fallback(config['flags'], replace.lower()[:-5])
                flag = sub(f'@{replace}@', replacement, flag)
            else:
                logger.warning(f'Unknown keyword @{replace}@ in dependency string')
        return flag

    @property
    def dependencies(self) -> set[str]:
        # Each 'flag' in self.flags is from a single instrument component DEPENDENCY,
        # and might contain duplicates: If we accept that white space differences
        # matter, we can deduplicate the strings 'easily'
        uf = set(self.dependency)
        uf.update(inst.dependency for inst in self.components if inst.dependency is not None)
        if any(inst.cpu for inst in self.components):
            uf.add('-DFUNNEL')
        return uf

    def decoded_flags(self) -> list[str]:
        # The dependency strings are allowed to contain any of
        #       '@NEXUSFLAGS@', @MCCODE_LIB@, CMD(...), ENV(...), GETPATH(...)
        # each of which should be replaced by ... something. Start by replacing the 'static' (old-style) keywords
        replaced_flags = [self._replace_keywords(flag) for flag in self.dependencies]
        # Then use the above decoder method to replace any instances of CMD, ENV, or GETPATH
        return [self._replace_env_getpath_cmd(flag) for flag in replaced_flags]

    def copy(self, first=0, last=-1):
        """Return a copy of this instrument, optionally with only a subset of components"""
        from copy import deepcopy
        copy = Instr(self.name, self.source)
        copy.parameters = tuple(x for x in self.parameters)
        copy.metadata = tuple(x.copy() for x in self.metadata)
        if last < 0:
            last += 1 + len(self.components)
        copy.components = tuple(x.copy() for x in self.components[first:last])
        copy.included = tuple(x for x in self.included)
        copy.user = tuple(x.copy() for x in self.user)
        copy.declare = tuple(x.copy() for x in self.declare)
        copy.initialize = tuple(x.copy() for x in self.initialize)
        copy.save = tuple(x.copy() for x in self.save)
        copy.final = tuple(x.copy() for x in self.final)
        # copy.groups = {k: v.copy() for k, v in self.groups.items()}
        copy.dependency = tuple(x for x in self.dependency)
        copy.registries = tuple(x for x in self.registries)
        return copy

    def split(self, at, remove_unused_parameters=False):
        """Produces two instruments, both containing the indicated component

        Parameters:
        -----------
        after: Union[Instance, str]
            A component instance or the _name_ of a component instance at which to split the instrument.
            The instance or one with a matching name _must_ be in the instrument, and should probably be an Arm.
        remove_unused_parameters: bool
            If True, any Instrument parameters which do not appear in instance definitions or code blocks is not
            included in the output instruments

        Return:
        -------
        tuple[Instr, Instr]
            The first Instr has components up to and including the split-point.
            The second Instr has components starting from the split-point.
        """
        if isinstance(at, Instance):
            index = self.components.index(at)
        elif isinstance(at, str):
            index = [i for i, x in enumerate(self.components) if x.name == at]
            if len(index) != 1:
                raise RuntimeError(f'Can only split an instrument after a single component, "{at}" matches {index}')
            index = index[0]
        else:
            raise RuntimeError('Can only split an instrument after a component or component name')
        first = self.copy(last=index + 1)
        first.name = self.name + '_first'
        if first.check_instrument_parameters(remove=remove_unused_parameters) and not remove_unused_parameters:
            logger.warning(f'Instrument {first.name} has unused instrument parameters')

        second = self.copy(first=index)
        second.name = self.name + '_second'
        # remove any dangling component references and re-reference into the new instrument's components:
        for instance in second.components:
            at_rel = instance.at_relative[1]
            rot_rel = instance.rotate_relative[1]
            if isinstance(at_rel, Instance):
                if second.has_component_named(at_rel.name):
                    instance.at_relative = instance.at_relative[0], second.get_component(at_rel.name)
                else:
                    instance.at_relative = instance.orientation.position(), None
            if isinstance(rot_rel, Instance):
                if second.has_component_named(rot_rel.name):
                    instance.rotate_relative = instance.rotate_relative[0], second.get_component(rot_rel.name)
                else:
                    instance.rotate_relative = instance.orientation.angles(), None
        if second.check_instrument_parameters(remove=remove_unused_parameters) and not remove_unused_parameters:
            logger.info(f'Instrument {second.name} has unused instrument parameters')

        return first, second

    def make_instance(self, name, component, at_relative=None, rotate_relative=None, orientation=None,
                      parameters=None, group=None, removable=False):
        if parameters is None:
            parameters = tuple()
        if any(x.name == name for x in self.components):
            raise RuntimeError(f"An instance named {name} is already present in the instrument")
        if isinstance(component, str):
            from ..reader import Reader
            reader = Reader(registries=list(self.registries))
            component = reader.get_component(component)
        self.components += (Instance(name, component, at_relative, rotate_relative, orientation,
                                     parameters, group, removable),)

    def mcpl_split(self,
                   after,
                   filename=None,
                   output_component=None,
                   output_parameters=None,
                   input_component=None,
                   input_parameters=None,
                   remove_unused_parameters=False
                   ):
        from ..common import ComponentParameter
        from ..common import Expr, Value, ObjectType
        from .orientation import Vector, Angles
        if filename is None:
            filename = self.name + '.mcpl'
        if filename[0] != '"' or filename[-1] != '"':
            filename = '"' + filename + '"'

        filename_parameter = ComponentParameter('filename', Expr(Value('mcpl_filename', _object=ObjectType.parameter)))
        first, second = self.split(after, remove_unused_parameters=remove_unused_parameters)
        mcpl_filename = InstrumentParameter.parse(f'string mcpl_filename = {filename}')
        first.add_parameter(mcpl_filename)
        second.add_parameter(mcpl_filename)

        fc = first.components[-1]
        if fc.type.name != 'Arm':
            logger.info(f'Component {after} is a {fc.type.name} instead of an Arm -- using MCPL file may cause problems')

        if output_component is None:
            output_component = 'MCPL_output'
        if output_parameters is None:
            output_parameters = (filename_parameter,)
        elif not any(p.name == 'filename' for p in output_parameters):
            output_parameters = (filename_parameter,) + output_parameters
        # remove the last component, since we're going to re-use its name:
        first.components = first.components[:-1]
        # automatically adds the component at the end of the list:
        first.make_instance(fc.name, output_component, fc.at_relative, fc.rotate_relative, fc.orientation,
                            output_parameters)

        if input_component is None:
            input_component = 'MCPL_input'
        if input_parameters is None:
            input_parameters = (filename_parameter,)
        elif not any(p.name == 'filename' for p in input_parameters):
            input_parameters = (filename_parameter,) + input_parameters
        if not any(p.name == 'verbose' for p in input_parameters):
            input_parameters = (ComponentParameter('verbose', Expr.float(0)),) + input_parameters
        # # the MCPL input component _is_ the origin of its simulation, but must be placed relative to other components.
        # # so we need the *absolute* position and orientation of the removed component:
        # abs_at_rel = fc.orientation.position(), None
        # abs_rot_rel = fc.orientation.angles(), None

        # the split at component in the second instrument should have already been converted to absolute-positioning:
        sc = second.components[0]
        if sc.at_relative[1] is not None or sc.rotate_relative[1] is not None:
            logger.error("The split-at point should be positioned absolutely in the second instrument")
        # remove the first component before adding an-equal named one:
        second.components = second.components[1:]
        second.make_instance(sc.name, input_component, sc.at_relative, sc.rotate_relative, parameters=input_parameters)
        # move the newly added component to the front of the list:
        second.components = (second.components[-1],) + second.components[:-1]

        return first, second

    def parameter_used(self, name: str):
        """Check that an instrument parameter is used in the instrument"""
        for instance in self.components:
            if instance.parameter_used(name):
                return True
        for section in (self.declare, self.initialize, self.save, self.final):
            for block in section:
                # A more complex check would see if the use itself leads to a parameter being used, but
                # that would be language dependent and probably not worth the effort.
                if name in block:
                    return True
        return False

    def check_instrument_parameters(self, remove=False):
        """Check that all instrument parameters are used in the instrument, and optionally remove any that are not

        Returns
        -------
        int
            The number of unused instrument parameters
        """
        names = [p.name for p in self.parameters]
        used = [self.parameter_used(p.name) for p in self.parameters]
        if not all(used):
            logger.info(f'The following instrument parameters are not used in the instrument: '
                     f'{", ".join([n for n, u in zip(names, used) if not u])}')
            if remove:
                self.parameters = tuple(p for i, p in enumerate(self.parameters) if used[i])
                logger.info(f'Removed unused instrument parameters; {len(self.parameters)} remain')
        return len(used) - sum(used)

    def verify_instance_parameters(self):
        """Check that all instance parameters are of the expected type, and that identifiers which match
        instrument parameter names are flagged as such"""
        for instance in self.components:
            instance.verify_parameters(self.parameters)

    def check_expr(self, expr: int | float | str | Expr | Value):
        if not isinstance(expr, Expr):
            expr = Expr.best(expr)
        # check whether the expression contains any identifiers which are actually InstrumentParameters
        expr.verify_parameters([x.name for x in self.parameters])
        # We then verify that no as-of-yet undefined identifiers exist, but can't in case they're defined in
        # an initalize or share block
        return expr

flow_graph property

Derive a :class:networkx.MultiDiGraph from the current flow_edges (read-only view).

Call :meth:build_flow_graph to (re)build flow_edges from the component list, or use :meth:finalize_flow_edges to add JUMP edges after incremental construction.

add_flow_edge(src, dst, edge)

Append a single :class:~mccode_antlr.instr.FlowEdgeRecord to flow_edges.

Parameters:

Name Type Description Default
src str

Source component instance name.

required
dst str

Destination component instance name.

required
edge

Any :class:~mccode_antlr.instr.flow.FlowEdge subclass instance.

required
Source code in src/mccode_antlr/instr/instr.py
def add_flow_edge(self, src: str, dst: str, edge) -> None:
    """Append a single :class:`~mccode_antlr.instr.FlowEdgeRecord` to ``flow_edges``.

    Args:
        src: Source component instance name.
        dst: Destination component instance name.
        edge: Any :class:`~mccode_antlr.instr.flow.FlowEdge` subclass instance.
    """
    from .flow import FlowEdgeRecord
    self.flow_edges += (FlowEdgeRecord(src=src, dst=dst, edge=edge),)

finalize_flow_edges()

Add JUMP edges to flow_edges (deferred because forward targets are unknown during parsing).

Source code in src/mccode_antlr/instr/instr.py
def finalize_flow_edges(self) -> None:
    """Add JUMP edges to ``flow_edges`` (deferred because forward targets are unknown during parsing)."""
    from .flow import FlowEdgeRecord, JumpEdge
    components = self.components
    n = len(components)
    name_to_idx = {inst.name: idx for idx, inst in enumerate(components)}
    for inst in components:
        for jmp in inst.jump:
            target_idx = jmp.absolute_target
            if target_idx < 0:
                target_idx = name_to_idx.get(jmp.target, -1)
            if 0 <= target_idx < n:
                self.flow_edges += (FlowEdgeRecord(
                    src=inst.name, dst=components[target_idx].name,
                    edge=JumpEdge(
                        condition=jmp.condition,
                        iterate=jmp.iterate,
                        absolute_target=target_idx,
                    ),
                ),)

build_flow_graph()

Rebuild flow_edges from scratch and return the derived :class:networkx.MultiDiGraph.

This is idempotent; it replaces any existing flow_edges content.

Source code in src/mccode_antlr/instr/instr.py
def build_flow_graph(self):
    """Rebuild ``flow_edges`` from scratch and return the derived :class:`networkx.MultiDiGraph`.

    This is idempotent; it replaces any existing ``flow_edges`` content.
    """
    from .flow import _build_flow_edge_records, flow_graph_from_records
    self.flow_edges = _build_flow_edge_records(self.components)
    return flow_graph_from_records(self.components, self.flow_edges)

insert_component(name, component, *, before=None, after=None, at_relative=None, rotate_relative=None, parameters=(), group=None, removable=False)

Insert a new component instance before or after an existing one.

Exactly one of before or after must be specified.

The components tuple is updated in-place and flow_edges is updated consistently: the connecting sequential or TRY_NEXT edge between the two surrounding components is split into two edges. All Jump.absolute_target fields in all existing instances are reset to -1; the translator's set_jump_absolute_targets() (called during C generation) will re-resolve them by name.

Parameters

name: Instance name for the new component (must be unique). component: Component type: a :class:~mccode_antlr.instr.Comp object or a string that will be looked up from the instrument's registries. before: Name of the existing component to insert before. after: Name of the existing component to insert after. at_relative: (Vector | tuple[float,float,float], Instance | str | None) position and reference. The reference may name a component that comes after the new instance in the final order; it will be re-expressed relative to the predecessor component. If None, the position is computed as the midpoint between the surrounding components (falls back to the origin of the predecessor when orientations are unavailable or non-constant). rotate_relative: (Angles | tuple[float,float,float], Instance | str | None) rotation and reference. Same forward-reference fixing applies. If None, the absolute orientation of the downstream component is used (falls back to zero-rotation RELATIVE predecessor). parameters: Component instance parameters. group: GROUP name for the new instance, if any. removable: Whether the instance is marked removable.

Returns

Instance The newly created instance (already inserted into self.components).

Raises

ValueError If neither or both of before/after are given, or if name is already present, or if the reference component is not found.

Source code in src/mccode_antlr/instr/instr.py
def insert_component(
    self,
    name: str,
    component,
    *,
    before: Optional[str] = None,
    after: Optional[str] = None,
    at_relative: Optional[tuple] = None,
    rotate_relative: Optional[tuple] = None,
    parameters: tuple = (),
    group: Optional[str] = None,
    removable: bool = False,
) -> Instance:
    """Insert a new component instance before or after an existing one.

    Exactly one of *before* or *after* must be specified.

    The ``components`` tuple is updated in-place and ``flow_edges`` is
    updated consistently: the connecting sequential or TRY_NEXT edge
    between the two surrounding components is split into two edges.  All
    ``Jump.absolute_target`` fields in all existing instances are reset to
    ``-1``; the translator's ``set_jump_absolute_targets()`` (called during
    C generation) will re-resolve them by name.

    Parameters
    ----------
    name:
        Instance name for the new component (must be unique).
    component:
        Component type: a :class:`~mccode_antlr.instr.Comp` object or a
        string that will be looked up from the instrument's registries.
    before:
        Name of the existing component to insert *before*.
    after:
        Name of the existing component to insert *after*.
    at_relative:
        ``(Vector | tuple[float,float,float], Instance | str | None)``
        position and reference.  The reference may name a component that
        comes *after* the new instance in the final order; it will be
        re-expressed relative to the predecessor component.  If ``None``,
        the position is computed as the midpoint between the surrounding
        components (falls back to the origin of the predecessor when
        orientations are unavailable or non-constant).
    rotate_relative:
        ``(Angles | tuple[float,float,float], Instance | str | None)``
        rotation and reference.  Same forward-reference fixing applies.
        If ``None``, the absolute orientation of the downstream component
        is used (falls back to zero-rotation RELATIVE predecessor).
    parameters:
        Component instance parameters.
    group:
        GROUP name for the new instance, if any.
    removable:
        Whether the instance is marked removable.

    Returns
    -------
    Instance
        The newly created instance (already inserted into
        ``self.components``).

    Raises
    ------
    ValueError
        If neither or both of *before*/*after* are given, or if *name*
        is already present, or if the reference component is not found.
    """
    from .flow import FlowEdgeRecord, SequentialEdge, GroupEdge, GroupEdgeKind

    # ── 0. Validate ────────────────────────────────────────────────────────
    if (before is None) == (after is None):
        raise ValueError("Exactly one of 'before' or 'after' must be specified.")
    if any(x.name == name for x in self.components):
        raise ValueError(f"A component instance named {name!r} is already present.")

    # ── 1. Resolve reference component and insertion index ─────────────────
    comp_names = [c.name for c in self.components]
    if before is not None:
        ref_name = before if isinstance(before, str) else before.name
        if ref_name not in comp_names:
            raise ValueError(f"Component {ref_name!r} not found.")
        insert_idx = comp_names.index(ref_name)
        pred_inst = self.components[insert_idx - 1] if insert_idx > 0 else None
        succ_inst = self.components[insert_idx]
    else:
        ref_name = after if isinstance(after, str) else after.name
        if ref_name not in comp_names:
            raise ValueError(f"Component {ref_name!r} not found.")
        target_idx = comp_names.index(ref_name)
        insert_idx = target_idx + 1
        pred_inst = self.components[target_idx]
        succ_inst = self.components[insert_idx] if insert_idx < len(self.components) else None

    # ── 2. Resolve component type ──────────────────────────────────────────
    if isinstance(component, str):
        from ..reader import Reader
        reader = Reader(registries=list(self.registries))
        component = reader.get_component(component)

    # ── 3. Auto-compute position / rotation if not provided ────────────────
    if at_relative is None:
        at_relative = _midpoint_at_relative(pred_inst, succ_inst)
    if rotate_relative is None:
        rotate_relative = _downstream_rotate_relative(pred_inst, succ_inst)

    # ── 4. Normalise vector/angles in at_relative / rotate_relative ────────
    from .orientation import Vector, Angles
    at_relative = _normalise_vr(at_relative, self.components, Vector)
    rotate_relative = _normalise_vr(rotate_relative, self.components, Angles)

    # ── 5. Fix forward references (ref must come before the new instance) ──
    at_relative = _fix_forward_ref(at_relative, insert_idx, pred_inst)
    rotate_relative = _fix_forward_ref_rotation(rotate_relative, insert_idx, pred_inst)

    # ── 6. Reject insertion that would break a contiguous group ───────────
    if (pred_inst is not None and succ_inst is not None
            and pred_inst.group is not None
            and pred_inst.group == succ_inst.group
            and group != pred_inst.group):
        raise ValueError(
            f"Cannot insert component {name!r} (group={group!r}) between group members "
            f"{pred_inst.name!r} and {succ_inst.name!r} "
            f"(group {pred_inst.group!r}): McCode requires groups to be contiguous. "
            f"Insert before {pred_inst.name!r}, after the last member of the group, "
            f"or pass group={pred_inst.group!r} to join the group."
        )

    # ── 7. Create the instance (orientation computed in __post_init__) ──────
    new_inst = Instance(name, component, at_relative, rotate_relative,
                        parameters=parameters, group=group, removable=removable)

    # ── 8. Insert into components tuple ────────────────────────────────────
    comps = list(self.components)
    comps.insert(insert_idx, new_inst)
    self.components = tuple(comps)

    # ── 9. Update flow_edges: split the connecting positional edge ─────────
    updated_edges = list(self.flow_edges)
    if pred_inst is not None and succ_inst is not None:
        pred_name, succ_name = pred_inst.name, succ_inst.name
        # Find the single "positional" edge: SequentialEdge or TRY_NEXT between pred and succ
        connecting_idx = next(
            (i for i, r in enumerate(updated_edges)
             if r.src == pred_name and r.dst == succ_name
             and isinstance(r.edge, (SequentialEdge, GroupEdge))
             and not (isinstance(r.edge, GroupEdge)
                      and r.edge.kind in (GroupEdgeKind.SCATTER_EXIT,
                                          GroupEdgeKind.PASS_THROUGH))),
            None,
        )
        if connecting_idx is not None:
            old_edge = updated_edges.pop(connecting_idx).edge
            if isinstance(old_edge, GroupEdge) and old_edge.kind == GroupEdgeKind.TRY_NEXT:
                if group == pred_inst.group:
                    e1 = GroupEdge(group_name=pred_inst.group, kind=GroupEdgeKind.TRY_NEXT)
                    e2 = GroupEdge(group_name=pred_inst.group, kind=GroupEdgeKind.TRY_NEXT)
                else:
                    e1 = SequentialEdge(when=new_inst.when)
                    e2 = SequentialEdge(when=succ_inst.when)
            else:
                e1 = SequentialEdge(when=new_inst.when)
                e2 = SequentialEdge(when=succ_inst.when)
            updated_edges.append(FlowEdgeRecord(src=pred_name, dst=name, edge=e1))
            updated_edges.append(FlowEdgeRecord(src=name, dst=succ_name, edge=e2))
    elif pred_inst is None and succ_inst is not None:
        updated_edges.append(FlowEdgeRecord(
            src=name, dst=succ_inst.name, edge=SequentialEdge(when=succ_inst.when),
        ))
    elif pred_inst is not None and succ_inst is None:
        updated_edges.append(FlowEdgeRecord(
            src=pred_inst.name, dst=name, edge=SequentialEdge(when=new_inst.when),
        ))
    self.flow_edges = tuple(updated_edges)

    # ── 10. Invalidate all Jump.absolute_target fields ─────────────────────
    # The translator's set_jump_absolute_targets() re-resolves these by name.
    for inst in self.components:
        for jmp in inst.jump:
            jmp.absolute_target = -1

    return new_inst

get_component_names_by_category(category)

Find all component instance names for a given category.

Note

The category of an instance is determined by its type, and is either - set inside the .comp file by a 'CATEGORY ' directive, or - guessed by the reader based on the path to the .comp file. The second behaviour is to match McStasScript/McCode-3, which does not work for user-defined components.

Source code in src/mccode_antlr/instr/instr.py
def get_component_names_by_category(self, category: str):
    """Find all component instance names for a given category.

    Note:
        The category of an instance is determined by its type, and is either
        - set inside the .comp file by a 'CATEGORY <category>' directive, or
        - *guessed* by the reader based on the path to the .comp file.
        The second behaviour is to match McStasScript/McCode-3, which does not work for user-defined components.
    """
    return [inst.name for inst in self.components if category in inst.type.category]

collect_metadata()

Component definitions and instances can define metadata too, collect it all together here

Source code in src/mccode_antlr/instr/instr.py
def collect_metadata(self):
    """Component definitions and instances can define metadata too, collect it all together here"""
    # Metadata defined in an instance overrides that defined in a component.
    # Metadata defined for an instrument is added to the collected list
    return tuple(m for inst in self.components for m in inst.collect_metadata()) + self.metadata

copy(first=0, last=-1)

Return a copy of this instrument, optionally with only a subset of components

Source code in src/mccode_antlr/instr/instr.py
def copy(self, first=0, last=-1):
    """Return a copy of this instrument, optionally with only a subset of components"""
    from copy import deepcopy
    copy = Instr(self.name, self.source)
    copy.parameters = tuple(x for x in self.parameters)
    copy.metadata = tuple(x.copy() for x in self.metadata)
    if last < 0:
        last += 1 + len(self.components)
    copy.components = tuple(x.copy() for x in self.components[first:last])
    copy.included = tuple(x for x in self.included)
    copy.user = tuple(x.copy() for x in self.user)
    copy.declare = tuple(x.copy() for x in self.declare)
    copy.initialize = tuple(x.copy() for x in self.initialize)
    copy.save = tuple(x.copy() for x in self.save)
    copy.final = tuple(x.copy() for x in self.final)
    # copy.groups = {k: v.copy() for k, v in self.groups.items()}
    copy.dependency = tuple(x for x in self.dependency)
    copy.registries = tuple(x for x in self.registries)
    return copy

split(at, remove_unused_parameters=False)

Produces two instruments, both containing the indicated component

Parameters:

after: Union[Instance, str] A component instance or the name of a component instance at which to split the instrument. The instance or one with a matching name must be in the instrument, and should probably be an Arm. remove_unused_parameters: bool If True, any Instrument parameters which do not appear in instance definitions or code blocks is not included in the output instruments

Return:

tuple[Instr, Instr] The first Instr has components up to and including the split-point. The second Instr has components starting from the split-point.

Source code in src/mccode_antlr/instr/instr.py
def split(self, at, remove_unused_parameters=False):
    """Produces two instruments, both containing the indicated component

    Parameters:
    -----------
    after: Union[Instance, str]
        A component instance or the _name_ of a component instance at which to split the instrument.
        The instance or one with a matching name _must_ be in the instrument, and should probably be an Arm.
    remove_unused_parameters: bool
        If True, any Instrument parameters which do not appear in instance definitions or code blocks is not
        included in the output instruments

    Return:
    -------
    tuple[Instr, Instr]
        The first Instr has components up to and including the split-point.
        The second Instr has components starting from the split-point.
    """
    if isinstance(at, Instance):
        index = self.components.index(at)
    elif isinstance(at, str):
        index = [i for i, x in enumerate(self.components) if x.name == at]
        if len(index) != 1:
            raise RuntimeError(f'Can only split an instrument after a single component, "{at}" matches {index}')
        index = index[0]
    else:
        raise RuntimeError('Can only split an instrument after a component or component name')
    first = self.copy(last=index + 1)
    first.name = self.name + '_first'
    if first.check_instrument_parameters(remove=remove_unused_parameters) and not remove_unused_parameters:
        logger.warning(f'Instrument {first.name} has unused instrument parameters')

    second = self.copy(first=index)
    second.name = self.name + '_second'
    # remove any dangling component references and re-reference into the new instrument's components:
    for instance in second.components:
        at_rel = instance.at_relative[1]
        rot_rel = instance.rotate_relative[1]
        if isinstance(at_rel, Instance):
            if second.has_component_named(at_rel.name):
                instance.at_relative = instance.at_relative[0], second.get_component(at_rel.name)
            else:
                instance.at_relative = instance.orientation.position(), None
        if isinstance(rot_rel, Instance):
            if second.has_component_named(rot_rel.name):
                instance.rotate_relative = instance.rotate_relative[0], second.get_component(rot_rel.name)
            else:
                instance.rotate_relative = instance.orientation.angles(), None
    if second.check_instrument_parameters(remove=remove_unused_parameters) and not remove_unused_parameters:
        logger.info(f'Instrument {second.name} has unused instrument parameters')

    return first, second

parameter_used(name)

Check that an instrument parameter is used in the instrument

Source code in src/mccode_antlr/instr/instr.py
def parameter_used(self, name: str):
    """Check that an instrument parameter is used in the instrument"""
    for instance in self.components:
        if instance.parameter_used(name):
            return True
    for section in (self.declare, self.initialize, self.save, self.final):
        for block in section:
            # A more complex check would see if the use itself leads to a parameter being used, but
            # that would be language dependent and probably not worth the effort.
            if name in block:
                return True
    return False

check_instrument_parameters(remove=False)

Check that all instrument parameters are used in the instrument, and optionally remove any that are not

Returns

int The number of unused instrument parameters

Source code in src/mccode_antlr/instr/instr.py
def check_instrument_parameters(self, remove=False):
    """Check that all instrument parameters are used in the instrument, and optionally remove any that are not

    Returns
    -------
    int
        The number of unused instrument parameters
    """
    names = [p.name for p in self.parameters]
    used = [self.parameter_used(p.name) for p in self.parameters]
    if not all(used):
        logger.info(f'The following instrument parameters are not used in the instrument: '
                 f'{", ".join([n for n, u in zip(names, used) if not u])}')
        if remove:
            self.parameters = tuple(p for i, p in enumerate(self.parameters) if used[i])
            logger.info(f'Removed unused instrument parameters; {len(self.parameters)} remain')
    return len(used) - sum(used)

verify_instance_parameters()

Check that all instance parameters are of the expected type, and that identifiers which match instrument parameter names are flagged as such

Source code in src/mccode_antlr/instr/instr.py
def verify_instance_parameters(self):
    """Check that all instance parameters are of the expected type, and that identifiers which match
    instrument parameter names are flagged as such"""
    for instance in self.components:
        instance.verify_parameters(self.parameters)

Instance

mccode_antlr.instr.instance.Instance

Bases: Struct

Intermediate representation of a McCode component instance

Read from a .instr file TRACE section, using one or more .comp sources For output to a runtime source file

Source code in src/mccode_antlr/instr/instance.py
class Instance(Struct):
    """Intermediate representation of a McCode component instance

    Read from a .instr file TRACE section, using one or more .comp sources
    For output to a runtime source file
    """
    name: str
    type: Comp
    at_relative: VectorReference
    rotate_relative: AnglesReference
    orientation: Optional[Orient] = None
    parameters: tuple[ComponentParameter, ...] = field(default_factory=tuple)
    removable: bool = False
    cpu: bool = False
    split: Optional[Expr] = None
    when: Optional[Expr] = None
    group: Optional[str] = None
    extend: tuple[RawC, ...] = field(default_factory=tuple)
    jump: tuple[Jump, ...] = field(default_factory=tuple)
    metadata: tuple[MetaData, ...] = field(default_factory=tuple)
    mode: Mode = Mode.normal

    def __eq__(self, other: Instance) -> bool:
        if not isinstance(other, Instance):
            return NotImplemented
        from msgspec.structs import fields
        for name in [field.name for field in fields(self)]:
            if getattr(self, name) != getattr(other, name):
                return False
        return True

    def __hash__(self):
        return hash((
            self.name, self.type, self.at_relative, self.rotate_relative,
            self.orientation, self.parameters, self.removable, self.cpu, self.split,
            self.when, self.group, self.extend, self.jump, self.metadata, self.mode
        ))

    def __repr__(self):
        return f'Instance({self.name}, {self.type.name})'

    def to_file(self, output, wrapper=None, full=True):
        if self.cpu:
            print(wrapper.line('CPU', []), file=output, end='')

        instance_parameters = wrapper.hide(', '.join(p.to_string(wrapper=wrapper) for p in self.parameters))
        line = wrapper.bold('COMPONENT') + f' {self.name} = {self.type.name}({instance_parameters}) '

        if self.when is not None:
            line += wrapper.bold('WHEN') + ' ' + wrapper.escape(str(self.when)) + ' '

        def rf(which, x, required=False):
            absolute = wrapper.bold('ABSOLUTE')
            relative = wrapper.bold('RELATIVE')
            return _triplet_ref_str(wrapper.bold(which), x, absolute, relative, required)

        # The "AT ..." statement is required even when it is "AT (0, 0, 0) ABSOLUTE"
        line += rf('AT', self.at_relative, required=True) + ' '
        line += rf('ROTATED', self.rotate_relative) + wrapper.br()
        print(line, file=output, end='')

        if not full:
            return  # Skip the rest of the output

        if self.group is not None:
            print(wrapper.line('GROUP', [self.group]), file=output)
        if self.extend:
            extends = '\n'.join(str(ext) for ext in self.extend)
            print(wrapper.block('EXTEND', extends), file=output)
        for jump in self.jump:
            jump.to_file(output, wrapper)
        for metadata in self.metadata:
            metadata.to_file(output, wrapper)

    def to_string(self, wrapper, full=True):
        from io import StringIO
        output = StringIO()
        self.to_file(output, wrapper=wrapper, full=full)
        return output.getvalue()

    def __str__(self):
        from mccode_antlr.common import TextWrapper
        return self.to_string(TextWrapper())

    def partial_str(self):
        from mccode_antlr.common import TextWrapper
        return self.to_string(TextWrapper(), full=False)

    @classmethod
    def from_instance(cls, name: str, ref: InstanceReference, at: VectorReference, rotate: AnglesReference):
        # from copy import deepcopy
        # copy each of: parameters, extend, group, jump, when, metadata
        return cls(name, ref.type, at, rotate,
                   parameters=tuple([par for par in ref.parameters]),
                   when=ref.when, group=ref.group,
                   extend=tuple([ext for ext in ref.extend]),
                   jump=tuple([jmp for jmp in ref.jump]),
                   metadata=tuple([md for md in ref.metadata]),
                   mode=ref.mode)

    def __post_init__(self):
        # TODO: Enhancement idea. Instead of building independent orientation chains
        #       for each instance, as is done in the non-minimal Mode. The Instr
        #       could hold a directed graph defining the chains, with nodes as instances
        #       or instance names, and edges as the the chained operations linking
        #       the dependency. This should make it easier to re-use relative
        #       orientation information in other tools, e.g., moreniius.
        if self.mode != Mode.minimal and self.orientation is None:
            ar, rr = self.at_relative, self.rotate_relative
            if not isinstance(ar[0], Vector) or not isinstance(rr[0], Angles):
                logger.warning(f'Expected {ar=} and {rr=} to be Vector and Angles respectively')
            if rr[1] is None and ar[1] is not None:
                logger.warning(f'Expected rotation reference to be specified when at reference is specified')
            at = ar[0] if isinstance(ar[0], Vector) else Vector(*ar[0])
            an, ar = (ar[1].name, ar[1].orientation) if ar[1] else ("ABSOLUTE", None)
            rt = rr[0] if isinstance(rr[0], Angles) else Angles(*rr[0])
            rn, rr = (rr[1].name, rr[1].orientation) if rr[1] else (an, ar)
            self.orientation = Orient.from_dependent_orientations(ar, at, rr, rt)
        # check if the defining component is marked noacc, in which case this _is_ cpu only
        if not self.type.acc:
            self.cpu = True

    def set_parameter(self, name: str, value, overwrite=False, allow_repeated=True):
        if not parameter_name_present(self.type.define, name) and not parameter_name_present(self.type.setting, name):
            raise RuntimeError(f"Unknown parameter {name} for component type {self.type.name}")
        if parameter_name_present(self.parameters, name):
            if overwrite:
                self.parameters = tuple(x for x in self.parameters if name != x.name)
            elif allow_repeated:
                par = [p for p in self.parameters if name == p.name][0]
                logger.info(f'Multiple definitions of {name} in component instance {self.name}')
                if par.value != value:
                    logger.info(f'  first-encountered value {par.value} retained')
                    logger.info(f'  newly-encountered value {value} dropped')
            else:
                raise RuntimeError(f"Multiple definitions of {name} in component instance {self.name}")
        p = self.type.get_parameter(name)

        if not p.compatible_value(value):
            logger.debug(f'{p=}, {name=}, {value=}')
            raise RuntimeError(f"Provided value for parameter {name} is not compatible with {self.type.name}")

        if p.value.is_vector and isinstance(value, str):
            # FIXME can this be more general? Do we _need_ to treat vectors differently?
            value = Expr(Value(value, p.value.data_type, _shape=p.value.shape_type))
        elif isinstance(value, str):
            value = Expr.parse(value)
        elif not isinstance(value, Expr):
            # Copy the data_type of the component definition parameter
            # -- thus if value is a str but an int or float is expected, we will know it is an identifier
            value = Expr(Value(value, p.value.data_type))

        # 2023-09-14 This did nothing. Why was this here?
        # # is this parameter value *actually* an instrument parameter *name*
        # if value.is_id:
        #     pass
        # # If a parameter is set to an instrument parameter name, we need to keep track of that here:
        # TODO: Either add a reference to the containing instrument (and carry that around always)
        #       Or perform this check when it comes time to translate the whole instrument :/

        self.parameters += (ComponentParameter(p.name, value), )

    def verify_parameters(self, instrument_parameters: tuple[InstrumentParameter, ...]):
        """Check for instance parameters which are identifiers that match instrument parameter names,
        and flag them as parameter objects"""
        instrument_parameter_names = [x.name for x in instrument_parameters]
        for par in self.parameters:
            par.value.verify_parameters(instrument_parameter_names)

    def get_parameter(self, name: str):
        for par in self.parameters:
            if par.name == name:
                return par
        return self.type.get_parameter(name)

    def defines_parameter(self, name: str):
        """Check whether this instance has defined the named parameter"""
        return parameter_name_present(self.parameters, name)

    def set_parameters(self, **kwargs):
        for name, value in kwargs.items():
            self.set_parameter(name, value)
        return self

    def REMOVABLE(self):
        self.removable = True
        return self

    def CPU(self):
        self.cpu = True
        return self

    def SPLIT(self, count):
        if isinstance(count, str):
            count = Expr.parse(count)
        if not isinstance(count, Expr):
            raise ValueError(f'Expected provided SPLIT expression to be an Expr not a {type(count)}')
        self.split = count
        return self

    def WHEN(self, expr):
        if isinstance(expr, str):
            expr = Expr.parse(expr)
        if not isinstance(expr, Expr):
            raise ValueError(f'Expected provided WHEN expression to be an Expr not a {type(expr)}')
        if expr.is_constant:
            raise RuntimeError(f'Evaluated WHEN statement {expr} would be constant at runtime!')
        self.when = expr
        return self

    def GROUP(self, name: str):
        self.group = name
        return self

    def EXTEND(self, *blocks):
        # copy vanilla overwrite-COPY behavior, issue 85
        #
        #self.extend += blocks_to_raw_c(*blocks)
        if len(blocks):
            self.extend = blocks_to_raw_c(*blocks)
        return self

    def JUMP(self, *jumps):
        # copy vanilla overwrite-COPY behavior, issue 85
        #
        # self.jump += jumps
        if len(jumps):
            self.jump = jumps
        return self

    def add_metadata(self, m: MetaData):
        if any([x.name == m.name for x in self.metadata]):
            self.metadata = tuple([x for x in self.metadata if x.name != m.name])
        self.metadata += (m, )

    def collect_metadata(self):
        # A component declaration and instance can define metadata with the same name
        # When they do, the metadata from *the instance* should take precedence
        md = {m.name: m for m in self.type.collect_metadata()}
        md.update({m.name: m for m in self.metadata})
        return tuple(md.values())

    def copy(self):
        return Instance(self.name, self.type, self.at_relative, self.rotate_relative,
                        orientation=self.orientation, parameters=self.parameters,
                        removable=self.removable, cpu=self.cpu, split=self.split, when=self.when,
                        group=self.group, extend=self.extend, jump=self.jump, metadata=self.metadata)

    def parameter_used(self, name: str):
        if any([name in par.value for par in self.parameters]):
            return True
        if name in self.at_relative[0] or name in self.rotate_relative[0] or name in self.orientation:
            return True
        if name in (self.split or []) or name in (self.when or []):
            return True
        for block in self.extend:
            if name in block:
                return True
        for jump in self.jump:
            if name in jump:
                return True
        return False

    @property
    def dependency(self):
        return self.type.dependency

verify_parameters(instrument_parameters)

Check for instance parameters which are identifiers that match instrument parameter names, and flag them as parameter objects

Source code in src/mccode_antlr/instr/instance.py
def verify_parameters(self, instrument_parameters: tuple[InstrumentParameter, ...]):
    """Check for instance parameters which are identifiers that match instrument parameter names,
    and flag them as parameter objects"""
    instrument_parameter_names = [x.name for x in instrument_parameters]
    for par in self.parameters:
        par.value.verify_parameters(instrument_parameter_names)

defines_parameter(name)

Check whether this instance has defined the named parameter

Source code in src/mccode_antlr/instr/instance.py
def defines_parameter(self, name: str):
    """Check whether this instance has defined the named parameter"""
    return parameter_name_present(self.parameters, name)