Skip to content

World API

Central coordination service for entity lifecycle and system execution.

Overview

The World is the main entry point for AgentECS. It coordinates: - Entity lifecycle: Spawn, destroy, and query entities - Component management: Get, set, and query components - System execution: Register and execute systems with proper isolation - Entity operations: Merge and split entities dynamically

Key Features: - Snapshot isolation: Systems see their own writes immediately - Atomic updates: Changes applied at tick boundaries - Parallel execution: Non-conflicting systems run concurrently


World Class

The central coordinator for your ECS world.

World

Central world state and system execution coordinator.

Owns storage backend and execution strategy. Systems interact via ScopedAccess, not World directly. Execution strategy handles all system registration and orchestration.

Source code in src/agentecs/world/world.py
class World:
    """Central world state and system execution coordinator.

    Owns storage backend and execution strategy. Systems interact via
    ScopedAccess, not World directly. Execution strategy handles all
    system registration and orchestration.
    """

    def __init__(
        self,
        storage: Storage | None = None,
        execution: ExecutionStrategy | None = None,
    ):
        self._storage = storage or LocalStorage()
        # Import here to avoid circular dependency at module level
        if execution is None:
            from agentecs.scheduling import SimpleScheduler

            execution = SimpleScheduler()
        self._execution = execution
        self._ensure_system_entities()

    def _ensure_system_entities(self) -> None:
        """Create reserved singleton entities if not present."""
        for entity in [SystemEntity.WORLD, SystemEntity.CLOCK]:
            if not self._storage.entity_exists(entity):
                # Bypass allocator
                self._storage._components[entity] = {}  # type: ignore

    def spawn(self, *components: Any) -> EntityId:
        """Create entity with components. For use outside systems."""
        entity = self._storage.create_entity()
        seen_types: set[type] = set()
        for comp in components:
            comp_type = get_type(comp)
            if comp_type in seen_types:
                warnings.warn(
                    f"spawn() received multiple components of type {comp_type.__name__}. "
                    f"Only the last one will be kept.",
                    stacklevel=2,
                )
            seen_types.add(comp_type)
            self._storage.set_component(entity, comp)
        return entity

    def destroy(self, entity: EntityId) -> None:
        """Destroy entity. For use outside systems."""
        self._storage.destroy_entity(entity)

    def get_copy(
        self, entity: EntityId, component_type: type[ComponentT]
    ) -> Copy[ComponentT] | None:
        """Get component copy. For use outside systems.

        Returns a deep copy to prevent accidental mutation of world state.
        Modifications must be written back via world.set() or world[entity, Type] = component.
        """
        component = self._storage.get_component(entity, component_type, copy=True)
        return component

    def set(self, entity: EntityId, component: Any) -> None:
        """Set component. For use outside systems."""
        self._storage.set_component(entity, component)

    def singleton_copy(self, component_type: type[ComponentT]) -> Copy[ComponentT] | None:
        """Get singleton component from WORLD entity."""
        return self.get_copy(SystemEntity.WORLD, component_type)

    def set_singleton(self, component: Any) -> None:
        """Set singleton component on WORLD entity."""
        self.set(SystemEntity.WORLD, component)

    def query_copies(self, *component_types: type) -> Iterator[tuple[EntityId, ...]]:
        """Query entities with specified component types. For use outside systems.

        Returns iterator of tuples: (entity, component1, component2, ...)
        where components are deep copies.

        Example:
            >>> for entity, pos, vel in world.query(Position, Velocity):
            ...     # Process entities with both Position and Velocity
            ...     pass
        """
        for entity, components in self._storage.query(*component_types, copy=True):
            yield (entity, *components)

    def merge_entities(
        self,
        entity1: EntityId,
        entity2: EntityId,
    ) -> EntityId:
        """Merge two entities into a single new entity.

        Components implementing Combinable protocol are merged via __combine__.
        In other cases, entity2's component takes precedence.

        Args:
            entity1: First entity to merge.
            entity2: Second entity to merge.

        Returns:
            EntityId of the newly created merged entity.

        Raises:
            ValueError: If either entity doesn't exist.

        Example:
            >>> merged = world.merge_entities(agent1, agent2)
            >>> # agent1 and agent2 are destroyed
            >>> # merged has combined components
        """
        if not self._storage.entity_exists(entity1):
            raise ValueError(f"Entity {entity1} does not exist")
        if not self._storage.entity_exists(entity2):
            raise ValueError(f"Entity {entity2} does not exist")

        # Collect all component types from both entities
        types1 = self._storage.get_component_types(entity1)
        types2 = self._storage.get_component_types(entity2)
        all_types = types1 | types2

        # Combine components using protocol or fallback
        merged_components: list[Any] = []

        for comp_type in all_types:
            comp1 = self._storage.get_component(entity1, comp_type)
            comp2 = self._storage.get_component(entity2, comp_type)

            if comp1 is not None and comp2 is not None:
                merged = combine_protocol_or_fallback(comp1, comp2)

                if merged is not None:
                    merged_components.append(merged)

            elif comp1 is not None:
                # Only entity1 has this component
                merged_components.append(comp1)
            elif comp2 is not None:
                # Only entity2 has this component
                merged_components.append(comp2)

        # Create merged entity and destroy originals
        merged_entity = self.spawn(*merged_components)
        self.destroy(entity1)
        self.destroy(entity2)

        return merged_entity

    def split_entity(
        self,
        entity: EntityId,
    ) -> tuple[EntityId, EntityId]:
        """Split one entity into two new entities.

        Components implementing Splittable protocol are split via __split__.
        Non-splittable components are duplicated on a component-by-component basis.

        Args:
            entity: Entity to split.

        Returns:
            Tuple of (first_entity, second_entity) IDs.

        Raises:
            ValueError: If entity doesn't exist

        Example:
            >>> left, right = world.split_entity(agent)
            >>> # original agent is destroyed
            >>> # left and right have split components
        """
        if not self._storage.entity_exists(entity):
            raise ValueError(f"Entity {entity} does not exist")

        # Collect all components and split using protocol or fallback
        comp_types = self._storage.get_component_types(entity)
        first_components: list[Any] = []
        second_components: list[Any] = []

        for comp_type in comp_types:
            comp = self._storage.get_component(entity, comp_type)
            if comp is None:
                continue

            # Split using protocol or fallback
            left, right = split_protocol_or_fallback(comp)
            if left is not None:
                first_components.append(left)
            if right is not None:
                second_components.append(right)

        # Create new entities and destroy original
        first_entity = self.spawn(*first_components)
        second_entity = self.spawn(*second_components)
        self.destroy(entity)

        return first_entity, second_entity

    def _get_component(
        self, entity: EntityId, component_type: type[ComponentT]
    ) -> ComponentT | None:
        return self._storage.get_component(entity, component_type)

    def _has_component(self, entity: EntityId, component_type: type) -> bool:
        return self._storage.has_component(entity, component_type)

    def _query_components(
        self,
        *component_types: type,
    ) -> Iterator[tuple[EntityId, tuple[Any, ...]]]:
        return self._storage.query(*component_types)

    def _all_entities(self) -> Iterator[EntityId]:
        return self._storage.all_entities()

    def _get_component_types(self, entity: EntityId) -> frozenset[type]:
        return self._storage.get_component_types(entity)

    # Async variants for internal use (enables async ScopedAccess methods)

    async def _get_component_async(
        self, entity: EntityId, component_type: type[ComponentT]
    ) -> ComponentT | None:
        return await self._storage.get_component_async(entity, component_type, copy=True)

    async def _query_components_async(
        self,
        *component_types: type,
    ) -> AsyncIterator[tuple[EntityId, tuple[Any, ...]]]:
        async for x in self._storage.query_async(*component_types):
            yield x

    def register_system(self, descriptor: SystemDescriptor) -> None:
        """Register system for execution.

        Delegates to the injected execution strategy.
        """
        self._execution.register_system(descriptor)

    def register_systems(self, *descriptors: SystemDescriptor) -> None:
        """Register multiple systems."""
        for d in descriptors:
            self.register_system(d)

    async def execute_system_async(self, descriptor: SystemDescriptor) -> SystemResult:
        """Execute single system asynchronously, returning collected changes.

        Handles both sync and async systems automatically based on descriptor.is_async.
        """
        result_buffer = SystemResult()
        access = ScopedAccess(world=self, descriptor=descriptor, buffer=result_buffer)

        # Run system (async if needed)
        if descriptor.is_async:
            returned = await descriptor.run(access)
        else:
            returned = descriptor.run(access)

        # Merge return value into buffer
        if returned is not None:
            normalized = normalize_result(returned)
            result_buffer.merge(normalized)

        validate_result_access(
            result_buffer,
            descriptor.writes,
            descriptor.name,
        )

        return result_buffer

    def execute_system(self, descriptor: SystemDescriptor) -> SystemResult:
        """Execute single system synchronously (wrapper for execute_system_async).

        For backward compatibility. Prefer execute_system_async() in async contexts.
        """
        return asyncio.run(self.execute_system_async(descriptor))

    async def apply_result_async(self, result: SystemResult) -> list[EntityId]:
        """Apply system result to world state asynchronously.

        For distributed storage, this enables parallel updates across shards.
        For local storage, this is a simple async wrapper.

        Args:
            result: System execution result containing all changes.

        Returns:
            List of newly created entity IDs from spawns.
        """
        written: dict[tuple[EntityId, type], Any] = {}
        new_entities: list[EntityId] = []

        for op in result.ops:
            if op.kind == OpKind.SPAWN and op.spawn_components is not None:
                # Spawn new entity with components
                new_entity = self.spawn(*op.spawn_components)
                new_entities.append(new_entity)
                # Store mapping from temp ID to new entity ID for later ops
                for comp in op.spawn_components:
                    comp_type = get_type(comp)
                    written[(new_entity, comp_type)] = comp
            elif (
                op.kind in (OpKind.UPDATE, OpKind.INSERT)
                and op.component is not None
                and op.component_type is not None
                and op.entity is not None
            ):
                key = (op.entity, op.component_type)
                if key in written and isinstance(op.component, Combinable):
                    written[key] = combine_protocol_or_fallback(written[key], op.component)
                else:
                    written[key] = op.component
                self._storage.set_component(op.entity, component=written[key])

            elif (
                op.kind == OpKind.REMOVE and op.component_type is not None and op.entity is not None
            ):
                self._storage.remove_component(op.entity, op.component_type)
                deleted_key = (op.entity, op.component_type)
                if deleted_key in written:
                    del written[deleted_key]
            elif op.kind == OpKind.DESTROY and op.entity is not None:
                self._storage.destroy_entity(op.entity)
                # Remove all pending writes for this entity
                keys_to_delete = [k for k in written if k[0] == op.entity]
                for k in keys_to_delete:
                    del written[k]
            else:
                raise ValueError(f"Invalid operation in system result: {op}")
        return new_entities

    def apply_result(self, result: SystemResult) -> list[EntityId]:
        """Apply system result to world state (sync wrapper for backward compatibility).

        Prefer apply_result_async() in async contexts.

        Args:
            result: System execution result containing all changes.

        Returns:
            List of newly created entity IDs from spawns.
        """
        return asyncio.run(self.apply_result_async(result))

    async def tick_async(self) -> None:
        """Execute all registered systems once asynchronously.

        Delegates to the injected execution strategy, which handles
        parallelization, conflict detection, or other orchestration logic.
        """
        await self._execution.tick_async(self)

    def tick(self) -> None:
        """Execute all registered systems once synchronously (wrapper for tick_async).

        For backward compatibility and simple scripts. Prefer tick_async() in async contexts.
        """
        asyncio.run(self.tick_async())

    def snapshot(self) -> bytes:
        """Serialize world state."""
        return self._storage.snapshot()

    def restore(self, data: bytes) -> None:
        """Restore from snapshot."""
        self._storage.restore(data)

__init__(storage=None, execution=None)

Source code in src/agentecs/world/world.py
def __init__(
    self,
    storage: Storage | None = None,
    execution: ExecutionStrategy | None = None,
):
    self._storage = storage or LocalStorage()
    # Import here to avoid circular dependency at module level
    if execution is None:
        from agentecs.scheduling import SimpleScheduler

        execution = SimpleScheduler()
    self._execution = execution
    self._ensure_system_entities()

spawn(*components)

Create entity with components. For use outside systems.

Source code in src/agentecs/world/world.py
def spawn(self, *components: Any) -> EntityId:
    """Create entity with components. For use outside systems."""
    entity = self._storage.create_entity()
    seen_types: set[type] = set()
    for comp in components:
        comp_type = get_type(comp)
        if comp_type in seen_types:
            warnings.warn(
                f"spawn() received multiple components of type {comp_type.__name__}. "
                f"Only the last one will be kept.",
                stacklevel=2,
            )
        seen_types.add(comp_type)
        self._storage.set_component(entity, comp)
    return entity

destroy(entity)

Destroy entity. For use outside systems.

Source code in src/agentecs/world/world.py
def destroy(self, entity: EntityId) -> None:
    """Destroy entity. For use outside systems."""
    self._storage.destroy_entity(entity)

set(entity, component)

Set component. For use outside systems.

Source code in src/agentecs/world/world.py
def set(self, entity: EntityId, component: Any) -> None:
    """Set component. For use outside systems."""
    self._storage.set_component(entity, component)

set_singleton(component)

Set singleton component on WORLD entity.

Source code in src/agentecs/world/world.py
def set_singleton(self, component: Any) -> None:
    """Set singleton component on WORLD entity."""
    self.set(SystemEntity.WORLD, component)

merge_entities(entity1, entity2)

Merge two entities into a single new entity.

Components implementing Combinable protocol are merged via combine. In other cases, entity2's component takes precedence.

Parameters:

Name Type Description Default
entity1 EntityId

First entity to merge.

required
entity2 EntityId

Second entity to merge.

required

Returns:

Type Description
EntityId

EntityId of the newly created merged entity.

Raises:

Type Description
ValueError

If either entity doesn't exist.

Example

merged = world.merge_entities(agent1, agent2)

agent1 and agent2 are destroyed

merged has combined components

Source code in src/agentecs/world/world.py
def merge_entities(
    self,
    entity1: EntityId,
    entity2: EntityId,
) -> EntityId:
    """Merge two entities into a single new entity.

    Components implementing Combinable protocol are merged via __combine__.
    In other cases, entity2's component takes precedence.

    Args:
        entity1: First entity to merge.
        entity2: Second entity to merge.

    Returns:
        EntityId of the newly created merged entity.

    Raises:
        ValueError: If either entity doesn't exist.

    Example:
        >>> merged = world.merge_entities(agent1, agent2)
        >>> # agent1 and agent2 are destroyed
        >>> # merged has combined components
    """
    if not self._storage.entity_exists(entity1):
        raise ValueError(f"Entity {entity1} does not exist")
    if not self._storage.entity_exists(entity2):
        raise ValueError(f"Entity {entity2} does not exist")

    # Collect all component types from both entities
    types1 = self._storage.get_component_types(entity1)
    types2 = self._storage.get_component_types(entity2)
    all_types = types1 | types2

    # Combine components using protocol or fallback
    merged_components: list[Any] = []

    for comp_type in all_types:
        comp1 = self._storage.get_component(entity1, comp_type)
        comp2 = self._storage.get_component(entity2, comp_type)

        if comp1 is not None and comp2 is not None:
            merged = combine_protocol_or_fallback(comp1, comp2)

            if merged is not None:
                merged_components.append(merged)

        elif comp1 is not None:
            # Only entity1 has this component
            merged_components.append(comp1)
        elif comp2 is not None:
            # Only entity2 has this component
            merged_components.append(comp2)

    # Create merged entity and destroy originals
    merged_entity = self.spawn(*merged_components)
    self.destroy(entity1)
    self.destroy(entity2)

    return merged_entity

split_entity(entity)

Split one entity into two new entities.

Components implementing Splittable protocol are split via split. Non-splittable components are duplicated on a component-by-component basis.

Parameters:

Name Type Description Default
entity EntityId

Entity to split.

required

Returns:

Type Description
tuple[EntityId, EntityId]

Tuple of (first_entity, second_entity) IDs.

Raises:

Type Description
ValueError

If entity doesn't exist

Example

left, right = world.split_entity(agent)

original agent is destroyed

left and right have split components

Source code in src/agentecs/world/world.py
def split_entity(
    self,
    entity: EntityId,
) -> tuple[EntityId, EntityId]:
    """Split one entity into two new entities.

    Components implementing Splittable protocol are split via __split__.
    Non-splittable components are duplicated on a component-by-component basis.

    Args:
        entity: Entity to split.

    Returns:
        Tuple of (first_entity, second_entity) IDs.

    Raises:
        ValueError: If entity doesn't exist

    Example:
        >>> left, right = world.split_entity(agent)
        >>> # original agent is destroyed
        >>> # left and right have split components
    """
    if not self._storage.entity_exists(entity):
        raise ValueError(f"Entity {entity} does not exist")

    # Collect all components and split using protocol or fallback
    comp_types = self._storage.get_component_types(entity)
    first_components: list[Any] = []
    second_components: list[Any] = []

    for comp_type in comp_types:
        comp = self._storage.get_component(entity, comp_type)
        if comp is None:
            continue

        # Split using protocol or fallback
        left, right = split_protocol_or_fallback(comp)
        if left is not None:
            first_components.append(left)
        if right is not None:
            second_components.append(right)

    # Create new entities and destroy original
    first_entity = self.spawn(*first_components)
    second_entity = self.spawn(*second_components)
    self.destroy(entity)

    return first_entity, second_entity

register_system(descriptor)

Register system for execution.

Delegates to the injected execution strategy.

Source code in src/agentecs/world/world.py
def register_system(self, descriptor: SystemDescriptor) -> None:
    """Register system for execution.

    Delegates to the injected execution strategy.
    """
    self._execution.register_system(descriptor)

execute_system(descriptor)

Execute single system synchronously (wrapper for execute_system_async).

For backward compatibility. Prefer execute_system_async() in async contexts.

Source code in src/agentecs/world/world.py
def execute_system(self, descriptor: SystemDescriptor) -> SystemResult:
    """Execute single system synchronously (wrapper for execute_system_async).

    For backward compatibility. Prefer execute_system_async() in async contexts.
    """
    return asyncio.run(self.execute_system_async(descriptor))

execute_system_async(descriptor) async

Execute single system asynchronously, returning collected changes.

Handles both sync and async systems automatically based on descriptor.is_async.

Source code in src/agentecs/world/world.py
async def execute_system_async(self, descriptor: SystemDescriptor) -> SystemResult:
    """Execute single system asynchronously, returning collected changes.

    Handles both sync and async systems automatically based on descriptor.is_async.
    """
    result_buffer = SystemResult()
    access = ScopedAccess(world=self, descriptor=descriptor, buffer=result_buffer)

    # Run system (async if needed)
    if descriptor.is_async:
        returned = await descriptor.run(access)
    else:
        returned = descriptor.run(access)

    # Merge return value into buffer
    if returned is not None:
        normalized = normalize_result(returned)
        result_buffer.merge(normalized)

    validate_result_access(
        result_buffer,
        descriptor.writes,
        descriptor.name,
    )

    return result_buffer

tick()

Execute all registered systems once synchronously (wrapper for tick_async).

For backward compatibility and simple scripts. Prefer tick_async() in async contexts.

Source code in src/agentecs/world/world.py
def tick(self) -> None:
    """Execute all registered systems once synchronously (wrapper for tick_async).

    For backward compatibility and simple scripts. Prefer tick_async() in async contexts.
    """
    asyncio.run(self.tick_async())

tick_async() async

Execute all registered systems once asynchronously.

Delegates to the injected execution strategy, which handles parallelization, conflict detection, or other orchestration logic.

Source code in src/agentecs/world/world.py
async def tick_async(self) -> None:
    """Execute all registered systems once asynchronously.

    Delegates to the injected execution strategy, which handles
    parallelization, conflict detection, or other orchestration logic.
    """
    await self._execution.tick_async(self)

apply_result(result)

Apply system result to world state (sync wrapper for backward compatibility).

Prefer apply_result_async() in async contexts.

Parameters:

Name Type Description Default
result SystemResult

System execution result containing all changes.

required

Returns:

Type Description
list[EntityId]

List of newly created entity IDs from spawns.

Source code in src/agentecs/world/world.py
def apply_result(self, result: SystemResult) -> list[EntityId]:
    """Apply system result to world state (sync wrapper for backward compatibility).

    Prefer apply_result_async() in async contexts.

    Args:
        result: System execution result containing all changes.

    Returns:
        List of newly created entity IDs from spawns.
    """
    return asyncio.run(self.apply_result_async(result))

Access Control

Systems access the world through scoped interfaces that enforce access patterns.

ScopedAccess

ScopedAccess

World access scoped to system's declared patterns with magic methods.

Provides snapshot isolation: sees own writes immediately, others' writes only after tick boundary.

Gotcha: For PURE mode systems, write methods raise AccessViolation.

Source code in src/agentecs/world/access.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
class ScopedAccess:
    """World access scoped to system's declared patterns with magic methods.

    Provides snapshot isolation: sees own writes immediately, others' writes
    only after tick boundary.

    Gotcha: For PURE mode systems, write methods raise AccessViolation.
    """

    def __init__(
        self,
        world: World,  # World instance (avoid circular import)
        descriptor: SystemDescriptor,
        buffer: SystemResult,
    ):
        self._world = world
        self._descriptor = descriptor
        self._buffer = buffer
        self._sync_runner: SyncRunner = SyncRunner.get()

    def _check_readable(self, *types: type | Any) -> None:
        if self._descriptor.is_dev_mode():
            return
        for t in types:
            component_type = get_type(t) if not isinstance(t, type) else t
            if not self._descriptor.can_read_type(component_type):
                raise AccessViolationError(
                    f"System '{self._descriptor.name}' cannot read {component_type.__name__}: "
                    f"not in readable types"
                )

    def _check_writable(self, component: type | Any) -> None:
        from ..core.system import SystemMode

        if self._descriptor.is_dev_mode():
            return

        component_type: type = get_type(component) if not isinstance(component, type) else component

        # READONLY mode cannot write at all
        if self._descriptor.mode == SystemMode.READONLY:
            raise AccessViolationError(
                f"System '{self._descriptor.name}' is READONLY"
                f" and cannot write {component_type.__name__}"
            )

        if self._descriptor.can_write_type(component_type):
            return
        if self._descriptor.can_read_type(component_type):
            raise AccessViolationError(
                f"System '{self._descriptor.name}' cannot"
                f" write {component_type.__name__}: "
                f"declared as read-only"
            )
        raise AccessViolationError(
            f"System '{self._descriptor.name}': {component_type.__name__}: not in writable types"
        )

    def __getitem__(self, key: tuple[EntityId, type[T]]) -> Copy[T]:
        """Get directly the component T for the entity in key.

        Example: world[entity, Position] -> Position
        """
        entity, component_type = key
        return self.get(entity, component_type)

    def __setitem__(self, key: tuple[EntityId, type], value: Any) -> None:
        """Set directly the component T for entity in key.

        Example: world[entity, Position] = new_pos.
        """
        entity, _ = key
        self.update(entity, value)

    def __delitem__(self, key: tuple[EntityId, type]) -> None:
        """Delete component.

        Example: Del world[entity, Position].
        """
        entity, component_type = key
        self.remove(entity, component_type)

    def __contains__(self, key: tuple[EntityId, type]) -> bool:
        """(entity, Position) in world."""
        entity, component_type = key
        return self.has(entity, component_type)

    def __call__(self, *component_types: type) -> QueryResult:
        """Create a query for component types.

        Example: world(Position, Velocity) -> QueryResult iterable.
        """
        self._check_readable(*component_types)
        return QueryResult(self, component_types)

    def __iter__(self) -> Iterator[EntityId]:
        """Iterate over entities.

        Example: For entity in world: ...
        """
        return self.entities()

    def get(self, entity: EntityId, component_type: type[T]) -> Copy[T]:
        """Get component copy (from buffer or storage).

        ALWAYS returns a copy to prevent accidental mutation of world state.
        Modifications must be written back explicitly via world[entity, Type] = component.
        """
        return cast(T, self._sync_runner.run(self.get_async(entity, component_type)))

    def _buffered_component_types(self, entity: EntityId) -> frozenset[type]:
        """Component types for entity, accounting for buffered inserts/removes."""
        base = set(self._world._storage.get_component_types(entity))
        inserts = self._buffer.inserts
        removes = self._buffer.removes
        updates = self._buffer.updates
        if entity in inserts:
            for comp in inserts[entity]:
                base.add(get_type(comp))
        if entity in updates:
            for comp_type in updates[entity]:
                if updates[entity][comp_type]:
                    base.add(comp_type)
        if entity in removes:
            base -= set(removes[entity])
        return frozenset(base)

    def has(self, entity: EntityId, component_type: type) -> bool:
        """Check if entity has a specific component type."""
        self._check_readable(component_type)
        inserts = self._buffer.inserts
        removes = self._buffer.removes

        if entity in inserts and any(get_type(c) is component_type for c in inserts[entity]):
            return True
        if entity in removes and component_type in removes[entity]:
            return False

        return self._world._has_component(entity, component_type)

    def _query_raw(
        self,
        *component_types: type,
    ) -> Iterator[tuple[EntityId, tuple[Any, ...]]]:
        """Internal query returning (entity, (comp1, comp2, ...))."""
        return self._sync_runner.iterate(self._query_raw_async(*component_types))

    def query(
        self,
        *component_types: type,
    ) -> Iterator[tuple[EntityId, tuple[Any, ...]]]:
        """Query entities with components. Returns (entity, (comp1, comp2, ...))."""
        self._check_readable(*component_types)
        return self._query_raw(*component_types)

    async def _query_raw_async(
        self,
        *component_types: type,
    ) -> AsyncIterator[tuple[EntityId, tuple[Any, ...]]]:
        """Internal async query returning (entity, (comp1, comp2, ...))."""
        yielded_entities = set()

        buffer_version = -1
        buffer_updates: dict[EntityId, dict[type, Any]] = {}
        buffer_inserts: dict[EntityId, list[Any]] = {}
        buffer_removes: dict[EntityId, list[type]] = {}
        buffer_destroys: set[EntityId] = set()

        def refresh_buffer_views() -> None:
            nonlocal buffer_version, buffer_updates, buffer_inserts, buffer_removes, buffer_destroys
            if buffer_version == self._buffer._next_op_seq:
                return

            buffer_updates = self._buffer.updates
            buffer_inserts = self._buffer.inserts
            buffer_removes = self._buffer.removes
            buffer_destroys = set(self._buffer.destroys)
            buffer_version = self._buffer._next_op_seq

        async for entity, components in self._world._query_components_async(*component_types):
            refresh_buffer_views()
            should_skip = False
            if entity in buffer_destroys:
                continue
            for comp_type in component_types:
                if entity in buffer_removes and comp_type in buffer_removes[entity]:
                    should_skip = True
                    break
            if should_skip:
                continue

            result = []
            for comp_type, comp in zip(component_types, components, strict=False):
                if entity in buffer_updates and comp_type in buffer_updates[entity]:
                    result.append(copy.deepcopy(get_component(buffer_updates[entity][comp_type])))
                else:
                    result.append(copy.deepcopy(get_component(comp)))
            yielded_entities.add(entity)
            # Yield either from storage or updated from buffer
            yield entity, tuple(result)

        # At this point we have yielded all entities that has components matching in storage.
        # Now we check inserted or updated components.
        refresh_buffer_views()
        for entity in list(buffer_updates.keys()) + list(buffer_inserts.keys()):
            refresh_buffer_views()

            if entity in yielded_entities or entity in buffer_destroys:
                continue

            has_all = True
            result = []
            for comp_type in component_types:
                comp = None

                if entity in buffer_updates and comp_type in buffer_updates[entity]:
                    comp = buffer_updates[entity][comp_type]
                elif entity in buffer_inserts:
                    for inserted_comp in buffer_inserts[entity]:
                        if get_type(inserted_comp) is comp_type:
                            comp = inserted_comp
                            break
                if comp is None:
                    comp = await self._world._get_component_async(entity, comp_type)

                if entity in buffer_removes and comp_type in buffer_removes[entity]:
                    has_all = False
                    break

                if comp is None:
                    has_all = False
                    break

                result.append(copy.deepcopy(get_component(comp)))

            if has_all:
                yielded_entities.add(entity)
                yield entity, tuple(result)

    async def query_async(
        self,
        *component_types: type,
    ) -> AsyncIterator[tuple[EntityId, tuple[Any, ...]]]:
        """Query entities with components asynchronously. Returns (entity, (comp1, comp2, ...)).

        Async variant for use in async systems. For distributed storage, this enables
        efficient remote queries.

        Args:
            *component_types: Component types to query for.

        Yields:
            Tuples of (entity, (component1, component2, ...)) for each match.
        """
        self._check_readable(*component_types)
        async for entity, components in self._query_raw_async(*component_types):
            yield entity, components

    async def get_async(self, entity: EntityId, component_type: type[T]) -> Copy[T]:
        """Get component asynchronously, checking buffer first (read own writes).

        Async variant for use in async systems. For distributed storage, this enables
        efficient remote component fetches.

        Args:
            entity: Entity to query.
            component_type: Type of component to retrieve.

        Returns:
            Component instance.

        Raises:
            KeyError: If entity does not have the component.
        """
        self._check_readable(component_type)
        updates = self._buffer.updates
        inserts = self._buffer.inserts
        destroys = self._buffer.destroys

        if entity in updates and component_type in updates[entity]:
            return cast(T, copy.deepcopy(get_component(updates[entity][component_type])))

        if entity in inserts:
            for comp in inserts[entity]:
                if get_type(comp) is component_type:
                    return cast(T, copy.deepcopy(get_component(comp)))

        if component := await self._world._get_component_async(entity, component_type):
            return copy.deepcopy(component)
        elif entity in destroys:
            raise KeyError(f"Entity {entity} has been destroyed")
        else:
            raise KeyError(f"Entity {entity} has no component {component_type.__name__}")

    def singleton(self, component_type: type[T]) -> Copy[T]:
        """Get singleton component from WORLD entity."""
        result = self.get(SystemEntity.WORLD, component_type)
        if result is None:
            raise KeyError(f"No singleton {component_type.__name__} registered")
        return result

    def entities(self) -> Iterator[EntityId]:
        """Iterate all entities."""
        return self._world._all_entities()

    def entity(self, entity_id: EntityId) -> EntityHandle:
        """Get handle for convenient single-entity operations."""
        return EntityHandle(self, entity_id)

    def update(self, entity: EntityId, component: Any) -> None:
        """Update/set component on entity."""
        self._check_writable(component)
        self._buffer.record_update(entity=entity, component=component)

    def update_singleton(self, component: Any) -> None:
        """Update/set singleton component on WORLD entity."""
        self._check_writable(component)
        self._buffer.record_update(entity=SystemEntity.WORLD, component=component)

    def insert(self, entity: EntityId, component: Any) -> None:
        """Add new component to entity."""
        self._check_writable(component)
        self._buffer.record_insert(entity=entity, component=component)

    def remove(self, entity: EntityId, component_type: type) -> None:
        """Remove component from entity."""
        self._check_writable(component_type)
        self._buffer.record_remove(entity=entity, component_type=component_type)

    def spawn(self, *components: Any) -> EntityId:
        """Spawn new entity with components. Returns provisional ID."""
        seen_types: set[type] = set()
        for comp in components:
            comp_type = get_type(comp)
            self._check_writable(comp)
            if comp_type in seen_types:
                warnings.warn(
                    f"spawn() received multiple components of type {comp_type.__name__}. "
                    f"Only the last one will be kept.",
                    stacklevel=2,
                )
            seen_types.add(comp_type)

        self._buffer.record_spawn(*components)
        # Return provisional ID - actual ID assigned at apply time
        return EntityId(shard=0, index=-self._buffer.spawn_count, generation=0)

    def destroy(self, entity: EntityId) -> None:
        """Queue entity for destruction."""
        self._buffer.record_destroy(entity=entity)

    def merge_entities(
        self,
        entity1: EntityId,
        entity2: EntityId,
    ) -> EntityId:
        """Merge two entities into one new provisional entity."""
        types1 = self._buffered_component_types(entity1)
        types2 = self._buffered_component_types(entity2)
        all_types = types1 | types2

        merged_components: list[Any] = []
        for comp_type in all_types:
            if comp_type in types1 and comp_type in types2:
                comp1: Any = self.get(entity1, comp_type)
                comp2: Any = self.get(entity2, comp_type)
                merged_components.append(combine_protocol_or_fallback(comp1, comp2))
            elif comp_type in types1:
                merged_components.append(self.get(entity1, comp_type))
            else:
                merged_components.append(self.get(entity2, comp_type))

        merged_entity = self.spawn(*merged_components)
        self.destroy(entity1)
        self.destroy(entity2)
        return merged_entity

    def split_entity(
        self,
        entity: EntityId,
    ) -> tuple[EntityId, EntityId]:
        """Split one entity into two new provisional entities."""
        comp_types = self._buffered_component_types(entity)
        first_components: list[Any] = []
        second_components: list[Any] = []

        for comp_type in comp_types:
            comp: Any = self.get(entity, comp_type)
            first, second = split_protocol_or_fallback(comp)
            first_components.append(first)
            second_components.append(second)

        first_entity = self.spawn(*first_components)
        second_entity = self.spawn(*second_components)
        self.destroy(entity)
        return first_entity, second_entity

    def get_copy(self, entity: EntityId, component_type: type[T]) -> Copy[T]:
        """Get component copy. Alias for get() with explicit naming."""
        return self.get(entity, component_type)

    async def get_copy_async(self, entity: EntityId, component_type: type[T]) -> Copy[T]:
        """Get component copy asynchronously. Alias for get_async()."""
        return await self.get_async(entity, component_type)

    def query_copies(
        self,
        *component_types: type,
    ) -> Iterator[tuple[EntityId, tuple[Any, ...]]]:
        """Query entities returning component copies. Alias for query()."""
        return self.query(*component_types)

    async def query_copies_async(
        self,
        *component_types: type,
    ) -> AsyncIterator[tuple[EntityId, tuple[Any, ...]]]:
        """Query entities asynchronously returning copies. Alias for query_async()."""
        async for item in self.query_async(*component_types):
            yield item

__getitem__(key)

Get directly the component T for the entity in key.

Example: world[entity, Position] -> Position

Source code in src/agentecs/world/access.py
def __getitem__(self, key: tuple[EntityId, type[T]]) -> Copy[T]:
    """Get directly the component T for the entity in key.

    Example: world[entity, Position] -> Position
    """
    entity, component_type = key
    return self.get(entity, component_type)

__setitem__(key, value)

Set directly the component T for entity in key.

Example: world[entity, Position] = new_pos.

Source code in src/agentecs/world/access.py
def __setitem__(self, key: tuple[EntityId, type], value: Any) -> None:
    """Set directly the component T for entity in key.

    Example: world[entity, Position] = new_pos.
    """
    entity, _ = key
    self.update(entity, value)

__delitem__(key)

Delete component.

Example: Del world[entity, Position].

Source code in src/agentecs/world/access.py
def __delitem__(self, key: tuple[EntityId, type]) -> None:
    """Delete component.

    Example: Del world[entity, Position].
    """
    entity, component_type = key
    self.remove(entity, component_type)

__contains__(key)

(entity, Position) in world.

Source code in src/agentecs/world/access.py
def __contains__(self, key: tuple[EntityId, type]) -> bool:
    """(entity, Position) in world."""
    entity, component_type = key
    return self.has(entity, component_type)

__call__(*component_types)

Create a query for component types.

Example: world(Position, Velocity) -> QueryResult iterable.

Source code in src/agentecs/world/access.py
def __call__(self, *component_types: type) -> QueryResult:
    """Create a query for component types.

    Example: world(Position, Velocity) -> QueryResult iterable.
    """
    self._check_readable(*component_types)
    return QueryResult(self, component_types)

spawn(*components)

Spawn new entity with components. Returns provisional ID.

Source code in src/agentecs/world/access.py
def spawn(self, *components: Any) -> EntityId:
    """Spawn new entity with components. Returns provisional ID."""
    seen_types: set[type] = set()
    for comp in components:
        comp_type = get_type(comp)
        self._check_writable(comp)
        if comp_type in seen_types:
            warnings.warn(
                f"spawn() received multiple components of type {comp_type.__name__}. "
                f"Only the last one will be kept.",
                stacklevel=2,
            )
        seen_types.add(comp_type)

    self._buffer.record_spawn(*components)
    # Return provisional ID - actual ID assigned at apply time
    return EntityId(shard=0, index=-self._buffer.spawn_count, generation=0)

destroy(entity)

Queue entity for destruction.

Source code in src/agentecs/world/access.py
def destroy(self, entity: EntityId) -> None:
    """Queue entity for destruction."""
    self._buffer.record_destroy(entity=entity)

ReadOnlyAccess

ReadOnlyAccess

Bases: Protocol

Read-only world view for PURE and READONLY systems.

Source code in src/agentecs/world/access.py
class ReadOnlyAccess(Protocol):
    """Read-only world view for PURE and READONLY systems."""

    def query(self, *component_types: type) -> Iterator[tuple[EntityId, tuple[Any, ...]]]:
        """Query entities by component types."""
        ...

    def get(self, entity: EntityId, component_type: type[T]) -> T | None:
        """Get a component from an entity."""
        ...

    def has(self, entity: EntityId, component_type: type) -> bool:
        """Check if entity has a component."""
        ...

    def singleton(self, component_type: type[T]) -> T:
        """Get a singleton component."""
        ...

    def entities(self) -> Iterator[EntityId]:
        """Iterate all entities."""
        ...

query(*component_types)

Query entities by component types.

Source code in src/agentecs/world/access.py
def query(self, *component_types: type) -> Iterator[tuple[EntityId, tuple[Any, ...]]]:
    """Query entities by component types."""
    ...

get(entity, component_type)

Get a component from an entity.

Source code in src/agentecs/world/access.py
def get(self, entity: EntityId, component_type: type[T]) -> T | None:
    """Get a component from an entity."""
    ...

has(entity, component_type)

Check if entity has a component.

Source code in src/agentecs/world/access.py
def has(self, entity: EntityId, component_type: type) -> bool:
    """Check if entity has a component."""
    ...

singleton(component_type)

Get a singleton component.

Source code in src/agentecs/world/access.py
def singleton(self, component_type: type[T]) -> T:
    """Get a singleton component."""
    ...

entities()

Iterate all entities.

Source code in src/agentecs/world/access.py
def entities(self) -> Iterator[EntityId]:
    """Iterate all entities."""
    ...

EntityHandle

Convenient wrapper for single-entity operations.

EntityHandle

Convenient wrapper for repeated single-entity operations.

Provides dict-style access to an entity's components with syntax like: e[Position] = pos, del e[Velocity], Position in e.

Parameters:

Name Type Description Default
access ScopedAccess

ScopedAccess instance.

required
entity EntityId

EntityId to wrap.

required
Source code in src/agentecs/world/access.py
class EntityHandle:
    """Convenient wrapper for repeated single-entity operations.

    Provides dict-style access to an entity's components with syntax like:
    e[Position] = pos, del e[Velocity], Position in e.

    Args:
        access: ScopedAccess instance.
        entity: EntityId to wrap.
    """

    def __init__(self, access: ScopedAccess, entity: EntityId):
        """Initialize entity handle.

        Args:
            access: ScopedAccess instance.
            entity: EntityId to wrap.
        """
        self._access = access
        self._entity = entity

    @property
    def id(self) -> EntityId:
        """Get the entity ID for this handle.

        Returns:
            The EntityId this handle wraps.
        """
        return self._entity

    def __getitem__(self, component_type: type[T]) -> T | None:
        """Get component from entity: e[Position] -> Position or None.

        Args:
            component_type: Component type to retrieve.

        Returns:
            Component instance or None if not present.
        """
        return self._access.get(self._entity, component_type)

    def __setitem__(self, component_type: type, value: Any) -> None:
        """Set component on entity: e[Position] = new_pos.

        Args:
            component_type: Component type (unused, inferred from value).
            value: Component instance to set.
        """
        self._access.update(self._entity, value)

    def __delitem__(self, component_type: type) -> None:
        """Remove component from entity: del e[Position].

        Args:
            component_type: Component type to remove.
        """
        self._access.remove(self._entity, component_type)

    def __contains__(self, component_type: type) -> bool:
        """Check if entity has component: Position in e.

        Args:
            component_type: Component type to check.

        Returns:
            True if entity has component, False otherwise.
        """
        return self._access.has(self._entity, component_type)

System Results

Systems can return results describing changes to apply.

SystemResult dataclass

Accumulated changes from system execution.

Source code in src/agentecs/world/result.py
@dataclass(slots=True)
class SystemResult:
    """Accumulated changes from system execution."""

    _ops: list[MutationOp] = field(default_factory=list)
    _next_op_seq: int = 0
    _update_indices: list[int] = field(default_factory=list)
    _insert_indices: list[int] = field(default_factory=list)
    _remove_indices: list[int] = field(default_factory=list)
    _spawn_indices: list[int] = field(default_factory=list)
    _destroy_indices: list[int] = field(default_factory=list)

    def record_update(self, entity: EntityId, component: Any) -> None:
        """Record an update operation for an entity's component."""
        if entity is None:
            raise ValueError("record_update requires a non-None entity")
        if component is None:
            raise ValueError("record_update requires a non-None component")

        op = MutationOp(
            op_seq=self._next_op_seq,
            kind=OpKind.UPDATE,
            entity=entity,
            component=component,
            component_type=get_type(component),
        )
        self._ops.append(op)
        self._update_indices.append(self._next_op_seq)
        self._next_op_seq += 1

    def record_insert(self, entity: EntityId, component: Any) -> None:
        """Record an insert operation for an entity's component."""
        if entity is None:
            raise ValueError("record_insert requires a non-None entity")
        if component is None:
            raise ValueError("record_insert requires a non-None component")

        op = MutationOp(
            op_seq=self._next_op_seq,
            kind=OpKind.INSERT,
            entity=entity,
            component=component,
            component_type=get_type(component),
        )
        self._ops.append(op)
        self._insert_indices.append(self._next_op_seq)
        self._next_op_seq += 1

    def record_remove(self, entity: EntityId, component_type: type) -> None:
        """Record a remove operation for an entity's component type."""
        if entity is None:
            raise ValueError("record_remove requires a non-None entity")
        if component_type is None:
            raise ValueError("record_remove requires a non-None component_type")

        op = MutationOp(
            op_seq=self._next_op_seq,
            kind=OpKind.REMOVE,
            entity=entity,
            component_type=component_type,
        )
        self._ops.append(op)
        self._remove_indices.append(self._next_op_seq)
        self._next_op_seq += 1

    def record_spawn(self, *components: Any) -> None:
        """Record a spawn operation for a new entity with given components."""
        op = MutationOp(
            op_seq=self._next_op_seq,
            kind=OpKind.SPAWN,
            spawn_components=components,
        )
        self._ops.append(op)
        self._spawn_indices.append(self._next_op_seq)
        self._next_op_seq += 1

    def record_destroy(self, entity: EntityId) -> None:
        """Record a destroy operation for an entity."""
        if entity is None:
            raise ValueError("record_destroy requires a non-None entity")

        op = MutationOp(
            op_seq=self._next_op_seq,
            kind=OpKind.DESTROY,
            entity=entity,
        )
        self._ops.append(op)
        self._destroy_indices.append(self._next_op_seq)
        self._next_op_seq += 1

    @property
    def ops(self) -> tuple[MutationOp, ...]:
        """Returns all ops."""
        return tuple(self._ops)

    @property
    def updates(self) -> dict[EntityId, dict[type, Any]]:
        """Returns all updates as {entity: {Type: component}}."""
        result: dict[EntityId, dict[type, Any]] = {}
        for i in self._update_indices:
            op = self._ops[i]
            if op.entity is not None and op.component is not None:
                if op.entity not in result:
                    result[op.entity] = {}
                result[op.entity][get_type(op.component)] = op.component
        return result

    @property
    def inserts(self) -> dict[EntityId, list[Any]]:
        """Returns all inserts as {entity: [components]}."""
        result: dict[EntityId, list[Any]] = {}
        for i in self._insert_indices:
            op = self._ops[i]
            if op.entity is not None and op.component is not None:
                if op.entity not in result:
                    result[op.entity] = []
                result[op.entity].append(op.component)
        return result

    @property
    def removes(self) -> dict[EntityId, list[type]]:
        """Returns all removes as {entity: [component types]}."""
        result: dict[EntityId, list[type]] = {}
        for i in self._remove_indices:
            op = self._ops[i]
            if op.entity is not None and op.component_type is not None:
                if op.entity not in result:
                    result[op.entity] = []
                result[op.entity].append(op.component_type)
        return result

    @property
    def spawns(self) -> list[tuple[Any, ...]]:
        """Returns all spawns as list of component tuples."""
        return [
            spawn_components
            for i in self._spawn_indices
            if (spawn_components := self._ops[i].spawn_components) is not None
        ]

    @property
    def spawn_count(self) -> int:
        """Returns total number of queued spawn operations."""
        return len(self._spawn_indices)

    @property
    def destroys(self) -> list[EntityId]:
        """Returns all destroys as list of entity IDs."""
        return [
            entity for i in self._destroy_indices if (entity := self._ops[i].entity) is not None
        ]

    def is_empty(self) -> bool:
        """Check if this result contains no changes.

        Returns:
            True if result has no updates, inserts, removes, spawns, or destroys.
        """
        return not self._ops

    def merge(self, other: SystemResult) -> None:
        """Merge other result into this one.

        ADDS all ops from other into this result.
        Does NOT check for conflicts or merge individual ops
        caller is responsible for ensuring this is safe.

        Args:
            other: SystemResult to merge into this one.
        """
        for op in other._ops:
            if op.kind == OpKind.UPDATE:
                if op.entity is not None and op.component is not None:
                    self.record_update(op.entity, op.component)
            elif op.kind == OpKind.INSERT:
                if op.entity is not None and op.component is not None:
                    self.record_insert(op.entity, op.component)
            elif op.kind == OpKind.REMOVE:
                if op.entity is not None and op.component_type is not None:
                    self.record_remove(op.entity, op.component_type)
            elif op.kind == OpKind.SPAWN:
                if op.spawn_components is not None:
                    self.record_spawn(*op.spawn_components)
            elif op.kind == OpKind.DESTROY:
                if op.entity is not None:
                    self.record_destroy(op.entity)
            else:
                raise ValueError(f"Unknown op kind: {op.kind}")

ops property

Returns all ops.

updates property

Returns all updates as {entity: {Type: component}}.

inserts property

Returns all inserts as {entity: [components]}.

removes property

Returns all removes as {entity: [component types]}.

spawns property

Returns all spawns as list of component tuples.

spawn_count property

Returns total number of queued spawn operations.

destroys property

Returns all destroys as list of entity IDs.

record_update(entity, component)

Record an update operation for an entity's component.

Source code in src/agentecs/world/result.py
def record_update(self, entity: EntityId, component: Any) -> None:
    """Record an update operation for an entity's component."""
    if entity is None:
        raise ValueError("record_update requires a non-None entity")
    if component is None:
        raise ValueError("record_update requires a non-None component")

    op = MutationOp(
        op_seq=self._next_op_seq,
        kind=OpKind.UPDATE,
        entity=entity,
        component=component,
        component_type=get_type(component),
    )
    self._ops.append(op)
    self._update_indices.append(self._next_op_seq)
    self._next_op_seq += 1

record_insert(entity, component)

Record an insert operation for an entity's component.

Source code in src/agentecs/world/result.py
def record_insert(self, entity: EntityId, component: Any) -> None:
    """Record an insert operation for an entity's component."""
    if entity is None:
        raise ValueError("record_insert requires a non-None entity")
    if component is None:
        raise ValueError("record_insert requires a non-None component")

    op = MutationOp(
        op_seq=self._next_op_seq,
        kind=OpKind.INSERT,
        entity=entity,
        component=component,
        component_type=get_type(component),
    )
    self._ops.append(op)
    self._insert_indices.append(self._next_op_seq)
    self._next_op_seq += 1

record_remove(entity, component_type)

Record a remove operation for an entity's component type.

Source code in src/agentecs/world/result.py
def record_remove(self, entity: EntityId, component_type: type) -> None:
    """Record a remove operation for an entity's component type."""
    if entity is None:
        raise ValueError("record_remove requires a non-None entity")
    if component_type is None:
        raise ValueError("record_remove requires a non-None component_type")

    op = MutationOp(
        op_seq=self._next_op_seq,
        kind=OpKind.REMOVE,
        entity=entity,
        component_type=component_type,
    )
    self._ops.append(op)
    self._remove_indices.append(self._next_op_seq)
    self._next_op_seq += 1

record_spawn(*components)

Record a spawn operation for a new entity with given components.

Source code in src/agentecs/world/result.py
def record_spawn(self, *components: Any) -> None:
    """Record a spawn operation for a new entity with given components."""
    op = MutationOp(
        op_seq=self._next_op_seq,
        kind=OpKind.SPAWN,
        spawn_components=components,
    )
    self._ops.append(op)
    self._spawn_indices.append(self._next_op_seq)
    self._next_op_seq += 1

record_destroy(entity)

Record a destroy operation for an entity.

Source code in src/agentecs/world/result.py
def record_destroy(self, entity: EntityId) -> None:
    """Record a destroy operation for an entity."""
    if entity is None:
        raise ValueError("record_destroy requires a non-None entity")

    op = MutationOp(
        op_seq=self._next_op_seq,
        kind=OpKind.DESTROY,
        entity=entity,
    )
    self._ops.append(op)
    self._destroy_indices.append(self._next_op_seq)
    self._next_op_seq += 1

is_empty()

Check if this result contains no changes.

Returns:

Type Description
bool

True if result has no updates, inserts, removes, spawns, or destroys.

Source code in src/agentecs/world/result.py
def is_empty(self) -> bool:
    """Check if this result contains no changes.

    Returns:
        True if result has no updates, inserts, removes, spawns, or destroys.
    """
    return not self._ops

merge(other)

Merge other result into this one.

ADDS all ops from other into this result. Does NOT check for conflicts or merge individual ops caller is responsible for ensuring this is safe.

Parameters:

Name Type Description Default
other SystemResult

SystemResult to merge into this one.

required
Source code in src/agentecs/world/result.py
def merge(self, other: SystemResult) -> None:
    """Merge other result into this one.

    ADDS all ops from other into this result.
    Does NOT check for conflicts or merge individual ops
    caller is responsible for ensuring this is safe.

    Args:
        other: SystemResult to merge into this one.
    """
    for op in other._ops:
        if op.kind == OpKind.UPDATE:
            if op.entity is not None and op.component is not None:
                self.record_update(op.entity, op.component)
        elif op.kind == OpKind.INSERT:
            if op.entity is not None and op.component is not None:
                self.record_insert(op.entity, op.component)
        elif op.kind == OpKind.REMOVE:
            if op.entity is not None and op.component_type is not None:
                self.record_remove(op.entity, op.component_type)
        elif op.kind == OpKind.SPAWN:
            if op.spawn_components is not None:
                self.record_spawn(*op.spawn_components)
        elif op.kind == OpKind.DESTROY:
            if op.entity is not None:
                self.record_destroy(op.entity)
        else:
            raise ValueError(f"Unknown op kind: {op.kind}")

normalize_result(raw)

Convert any valid system return format to SystemResult.

Supports multiple return formats for convenience: - None: No changes - SystemResult: Direct passthrough - Dict[EntityId, Dict[type, Any]]: Entity to component dict - Dict[EntityId, Any]: Entity to single component - List[Tuple[EntityId, Any]]: List of (entity, component) pairs

Parameters:

Name Type Description Default
raw SystemReturn

System return value in any supported format.

required

Returns:

Type Description
SystemResult

Normalized SystemResult.

Raises:

Type Description
TypeError

If return value is not a recognized format.

Source code in src/agentecs/world/result.py
def normalize_result(raw: SystemReturn) -> SystemResult:
    """Convert any valid system return format to SystemResult.

    Supports multiple return formats for convenience:
    - None: No changes
    - SystemResult: Direct passthrough
    - Dict[EntityId, Dict[type, Any]]: Entity to component dict
    - Dict[EntityId, Any]: Entity to single component
    - List[Tuple[EntityId, Any]]: List of (entity, component) pairs

    Args:
        raw: System return value in any supported format.

    Returns:
        Normalized SystemResult.

    Raises:
        TypeError: If return value is not a recognized format.
    """
    if raw is None:
        return SystemResult()

    if isinstance(raw, SystemResult):
        return raw

    if isinstance(raw, dict):
        result = SystemResult()
        for entity, value in raw.items():
            if not isinstance(entity, EntityId):
                raise TypeError(f"Expected EntityId key, got {type(entity)}")

            if isinstance(value, dict):
                for _, comp in value.items():
                    result.record_update(entity, comp)
            else:
                result.record_update(entity, value)
        return result

    if isinstance(raw, list):
        result = SystemResult()
        for item in raw:
            if isinstance(item, tuple) and len(item) == 2:
                entity, comp = item
                if not isinstance(entity, EntityId):
                    raise TypeError(f"Expected EntityId, got {type(entity)}")
                result.record_update(entity, comp)
            else:
                raise TypeError(f"Invalid list item format: {item}")
        return result

    raise TypeError(f"Invalid system return type: {type(raw)}")

Entity Allocator

Manages entity ID allocation with generational indices.

EntityAllocator

Allocates entity IDs with generation tracking for recycling.

Maintains a free list of deallocated entity indices with incremented generations to safely reuse entity IDs. Starts allocation after reserved system entities.

Parameters:

Name Type Description Default
shard int

Shard number for this allocator (default 0 for local).

0
Source code in src/agentecs/storage/allocator.py
class EntityAllocator:
    """Allocates entity IDs with generation tracking for recycling.

    Maintains a free list of deallocated entity indices with incremented generations
    to safely reuse entity IDs. Starts allocation after reserved system entities.

    Args:
        shard: Shard number for this allocator (default 0 for local).
    """

    def __init__(self, shard: int = 0):
        """Initialize entity allocator for a specific shard.

        Args:
            shard: Shard number for this allocator (default 0 for local).
        """
        self._shard = shard
        self._next_index = SystemEntity._RESERVED_COUNT
        self._free_list: list[tuple[int, int]] = []  # (index, generation)
        self._generations: dict[int, int] = {}

    def allocate(self) -> EntityId:
        """Allocate new entity ID, reusing recycled slots when available.

        Prioritizes reusing freed entity IDs from the free list before allocating
        new indices. Reused IDs have incremented generation numbers.

        Returns:
            Newly allocated EntityId.
        """
        if self._free_list:
            index, gen = self._free_list.pop()
            return EntityId(shard=self._shard, index=index, generation=gen)

        index = self._next_index
        self._next_index += 1
        self._generations[index] = 0
        return EntityId(shard=self._shard, index=index, generation=0)

    def deallocate(self, entity: EntityId) -> None:
        """Return entity ID for reuse with incremented generation.

        Adds the entity's index to the free list with an incremented generation
        number, making it available for reallocation.

        Args:
            entity: Entity ID to deallocate.

        Raises:
            ValueError: If entity is from a different shard.
        """
        if entity.shard != self._shard:
            raise ValueError(
                f"Cannot deallocate entity from shard {entity.shard} on shard {self._shard}"
            )

        new_gen = entity.generation + 1
        self._generations[entity.index] = new_gen
        self._free_list.append((entity.index, new_gen))

    def is_alive(self, entity: EntityId) -> bool:
        """Check if entity ID is still valid (not recycled).

        Compares entity's generation number with the current generation for that
        index to determine if the entity is still alive or has been recycled.

        Args:
            entity: Entity ID to check.

        Returns:
            True if entity is alive, False if recycled or from different shard.
        """
        if entity.shard != self._shard:
            return False  # TODO: cross-shard liveness check
        current_gen = self._generations.get(entity.index, -1)
        return current_gen == entity.generation

allocate()

Allocate new entity ID, reusing recycled slots when available.

Prioritizes reusing freed entity IDs from the free list before allocating new indices. Reused IDs have incremented generation numbers.

Returns:

Type Description
EntityId

Newly allocated EntityId.

Source code in src/agentecs/storage/allocator.py
def allocate(self) -> EntityId:
    """Allocate new entity ID, reusing recycled slots when available.

    Prioritizes reusing freed entity IDs from the free list before allocating
    new indices. Reused IDs have incremented generation numbers.

    Returns:
        Newly allocated EntityId.
    """
    if self._free_list:
        index, gen = self._free_list.pop()
        return EntityId(shard=self._shard, index=index, generation=gen)

    index = self._next_index
    self._next_index += 1
    self._generations[index] = 0
    return EntityId(shard=self._shard, index=index, generation=0)

deallocate(entity)

Return entity ID for reuse with incremented generation.

Adds the entity's index to the free list with an incremented generation number, making it available for reallocation.

Parameters:

Name Type Description Default
entity EntityId

Entity ID to deallocate.

required

Raises:

Type Description
ValueError

If entity is from a different shard.

Source code in src/agentecs/storage/allocator.py
def deallocate(self, entity: EntityId) -> None:
    """Return entity ID for reuse with incremented generation.

    Adds the entity's index to the free list with an incremented generation
    number, making it available for reallocation.

    Args:
        entity: Entity ID to deallocate.

    Raises:
        ValueError: If entity is from a different shard.
    """
    if entity.shard != self._shard:
        raise ValueError(
            f"Cannot deallocate entity from shard {entity.shard} on shard {self._shard}"
        )

    new_gen = entity.generation + 1
    self._generations[entity.index] = new_gen
    self._free_list.append((entity.index, new_gen))

is_alive(entity)

Check if entity ID is still valid (not recycled).

Compares entity's generation number with the current generation for that index to determine if the entity is still alive or has been recycled.

Parameters:

Name Type Description Default
entity EntityId

Entity ID to check.

required

Returns:

Type Description
bool

True if entity is alive, False if recycled or from different shard.

Source code in src/agentecs/storage/allocator.py
def is_alive(self, entity: EntityId) -> bool:
    """Check if entity ID is still valid (not recycled).

    Compares entity's generation number with the current generation for that
    index to determine if the entity is still alive or has been recycled.

    Args:
        entity: Entity ID to check.

    Returns:
        True if entity is alive, False if recycled or from different shard.
    """
    if entity.shard != self._shard:
        return False  # TODO: cross-shard liveness check
    current_gen = self._generations.get(entity.index, -1)
    return current_gen == entity.generation