Skip to content

Scheduling API

Parallel execution engine with conflict detection.

Overview

The scheduler automatically parallelizes system execution based on declared access patterns: - Conflict detection: Identifies write-write and read-write conflicts - Parallel execution: Groups non-conflicting systems - Query optimization: Leverages query disjointness for more parallelism - Async-first: Uses asyncio.gather for efficient concurrent execution

Execution Modes: - Parallel: Default mode, maximizes throughput - Sequential: For debugging and deterministic execution


SimpleScheduler

Parallel execution with conflict detection and query disjointness optimization.

SimpleScheduler

Scheduler with parallel execution.

All systems execute in parallel (snapshot isolation), with results concatenated in registration order and applied at group boundaries.

Execution grouping is delegated to an ExecutionGroupBuilder, enabling future extensions for dependency-based, frequency-based, or custom grouping strategies.

Parameters:

Name Type Description Default
config SchedulerConfig | None

Scheduler configuration (concurrency, retry).

None
group_builder ExecutionGroupBuilder | None

Strategy for building execution groups from systems. Defaults to SingleGroupBuilder (all parallel, dev systems isolated).

None
Source code in src/agentecs/scheduling/scheduler.py
class SimpleScheduler:
    """Scheduler with parallel execution.

    All systems execute in parallel (snapshot isolation), with results
    concatenated in registration order and applied at group boundaries.

    Execution grouping is delegated to an ExecutionGroupBuilder, enabling
    future extensions for dependency-based, frequency-based, or custom
    grouping strategies.

    Args:
        config: Scheduler configuration (concurrency, retry).
        group_builder: Strategy for building execution groups from systems.
            Defaults to SingleGroupBuilder (all parallel, dev systems isolated).
    """

    def __init__(
        self,
        config: SchedulerConfig | None = None,
        group_builder: ExecutionGroupBuilder | None = None,
    ) -> None:
        self._config = config or SchedulerConfig()
        self._group_builder = group_builder or SingleGroupBuilder()
        self._systems: list[SystemDescriptor] = []
        self._execution_plan: ExecutionPlan | None = None

    def register_system(self, descriptor: SystemDescriptor) -> None:
        """Register system for execution. Invalidates cached execution plan."""
        self._systems.append(descriptor)
        self._execution_plan = None

    def build_execution_plan(self) -> ExecutionPlan:
        """Build execution plan using the configured group builder."""
        return self._group_builder.build(self._systems)

    async def tick_async(self, world: World) -> None:
        """Execute all systems once, parallelizing where possible."""
        if self._execution_plan is None:
            self._execution_plan = self.build_execution_plan()

        for group in self._execution_plan:
            await self._execute_group_async(world, group)

    async def _execute_group_async(self, world: World, group: ExecutionGroup) -> None:
        """Execute a group of systems in parallel, merge and apply results."""
        if not group.systems:
            return

        # Execute systems with concurrency limiting
        results = await self._execute_systems_async(world, group.systems)

        # Merge results in registration order
        merged = self._merge_results(results)

        # Apply merged results
        await world.apply_result_async(merged)

    async def _execute_systems_async(
        self, world: World, systems: list[SystemDescriptor]
    ) -> list[SystemResult]:
        """Execute systems with optional concurrency limiting and retry."""
        max_concurrent = self._config.max_concurrent

        if max_concurrent is None:
            # Unlimited concurrency
            tasks = [self._execute_with_retry(world, system) for system in systems]
            return list(await asyncio.gather(*tasks))
        else:
            # Semaphore-limited concurrency
            semaphore = asyncio.Semaphore(max_concurrent)

            async def limited_execute(system: SystemDescriptor) -> SystemResult:
                async with semaphore:
                    return await self._execute_with_retry(world, system)

            tasks = [limited_execute(system) for system in systems]
            return list(await asyncio.gather(*tasks))

    async def _execute_with_retry(self, world: World, system: SystemDescriptor) -> SystemResult:
        """Execute system with retry policy.

        Uses tenacity for retry logic when max_attempts > 1.
        Requires tenacity to be installed: pip install agentecs[retry]
        """
        policy = self._config.retry_policy

        if policy.max_attempts <= 1:
            return await world.execute_system_async(system)

        if not TENACITY_AVAILABLE:
            msg = "Retry policy requires tenacity. Install with: pip install agentecs[retry]"
            raise ImportError(msg)

        retryer = self._build_retryer(policy)

        try:
            async for attempt in retryer:
                with attempt:
                    return await world.execute_system_async(system)
        except tenacity.RetryError as e:
            if policy.on_exhausted == "skip":
                return SystemResult()  # Empty result, skip this system
            msg = f"{system.name} failed after {policy.max_attempts} attempts"
            raise RuntimeError(msg) from e.last_attempt.exception()

        return SystemResult()  # pragma: no cover

    def _build_retryer(self, policy: RetryPolicy) -> tenacity.AsyncRetrying:
        """Build a tenacity retryer from RetryPolicy configuration."""
        stop = tenacity.stop_after_attempt(policy.max_attempts)

        wait: tenacity.wait.wait_base
        if policy.backoff == "exponential":
            wait = tenacity.wait_exponential(multiplier=policy.base_delay, min=policy.base_delay)
        elif policy.backoff == "linear":
            wait = tenacity.wait_incrementing(start=policy.base_delay, increment=policy.base_delay)
        else:
            wait = tenacity.wait_none()

        return tenacity.AsyncRetrying(
            stop=stop,
            wait=wait,
            reraise=False,
        )

    def _merge_results(self, results: list[SystemResult]) -> SystemResult:
        """Merge results in order of operation."""
        merged = SystemResult()
        for result in results:
            merged.merge(result)
        return merged

    def tick(self, world: World) -> None:
        """Synchronous wrapper for tick_async."""
        asyncio.run(self.tick_async(world))

    def get_execution_plan_info(self) -> list[list[str]]:
        """Get human-readable execution plan (for debugging)."""
        if self._execution_plan is None:
            self._execution_plan = self.build_execution_plan()

        return [[s.name for s in group.systems] for group in self._execution_plan]

__init__(config=None, group_builder=None)

Source code in src/agentecs/scheduling/scheduler.py
def __init__(
    self,
    config: SchedulerConfig | None = None,
    group_builder: ExecutionGroupBuilder | None = None,
) -> None:
    self._config = config or SchedulerConfig()
    self._group_builder = group_builder or SingleGroupBuilder()
    self._systems: list[SystemDescriptor] = []
    self._execution_plan: ExecutionPlan | None = None

tick(world)

Synchronous wrapper for tick_async.

Source code in src/agentecs/scheduling/scheduler.py
def tick(self, world: World) -> None:
    """Synchronous wrapper for tick_async."""
    asyncio.run(self.tick_async(world))

tick_async(world) async

Execute all systems once, parallelizing where possible.

Source code in src/agentecs/scheduling/scheduler.py
async def tick_async(self, world: World) -> None:
    """Execute all systems once, parallelizing where possible."""
    if self._execution_plan is None:
        self._execution_plan = self.build_execution_plan()

    for group in self._execution_plan:
        await self._execute_group_async(world, group)

SequentialScheduler

Simple sequential execution for debugging.

SequentialScheduler()

Create a scheduler that executes systems one at a time.

Equivalent to SimpleScheduler with max_concurrent=1. Useful for debugging or when parallelism isn't needed.

Source code in src/agentecs/scheduling/scheduler.py
def SequentialScheduler() -> SimpleScheduler:  # noqa: N802
    """Create a scheduler that executes systems one at a time.

    Equivalent to SimpleScheduler with max_concurrent=1.
    Useful for debugging or when parallelism isn't needed.
    """
    return SimpleScheduler(config=SchedulerConfig(max_concurrent=1))

Conflict Detection

How the scheduler determines if systems can run in parallel:

Write-Write Conflicts

Two systems conflict if both write to the same component type.

@system(writes=(Position,))
def move_system(world): ...

@system(writes=(Position,))  # Conflicts with move_system
def teleport_system(world): ...

Read-Write Conflicts

A system reading a component conflicts with one writing it.

@system(reads=(Position,), writes=(Velocity,))
def physics_system(world): ...

@system(writes=(Position,))  # Conflicts with physics_system
def move_system(world): ...

Query Disjointness

Systems with provably disjoint queries can parallelize even with same component types.

@system(reads=Query().having(Agent, Active))
def active_agents(world): ...

@system(reads=Query().having(Agent, Inactive))  # Disjoint!
def inactive_agents(world): ...

Usage Example

from agentecs import World
from agentecs.scheduling import SimpleScheduler, SchedulerConfig

# Create world with scheduler
world = World(execution=SimpleScheduler())

# Or with custom config
world = World(
    execution=SimpleScheduler(
        config=SchedulerConfig(
            max_concurrent=10
        )
    )
)

# Register systems
world.register_system(movement_system)
world.register_system(physics_system)
world.register_system(rendering_system)

# Execute one tick (parallel)
await world.tick_async()

# Or synchronous wrapper
world.tick()

Future Enhancements

Frequency-Based Execution (Planned): - Systems declare execution frequency (every N ticks) - Reduces unnecessary computation - Phase-based grouping

Context-Aware Scheduling (Research): - Optimize for LLM cache hits - Group systems with overlapping context - Learn optimal schedules from execution patterns