Storage¶
Storage backends provide the persistence layer for entities and components in AgentECS. The pluggable storage architecture enables swapping implementations from simple in-memory storage to distributed, persistent, or specialized backends.
Overview¶
The storage layer abstracts how entities and components are stored, queried, and updated. This separation from World and Scheduler enables:
- Swappable Backends: Change storage without touching game logic
- Testing: Mock storage for unit tests
- Optimization: Specialized storage for different workloads
- Distribution: Shard entities across nodes for scale
graph TD
A[World] -->|delegates to| B[Storage Protocol]
B -.implements.-> C[LocalStorage]
B -.implements.-> D[DistributedStorage Future]
B -.implements.-> E[PersistentStorage Future]
C -->|uses| F[EntityAllocator]
C -->|stores in| G[dict entity, components]
D -->|shards across| H[Multiple Nodes]
E -->|persists to| I[Database]
style B fill:#ffb74d
style C fill:#81c784
style D fill:#e0e0e0
style E fill:#e0e0e0
Storage Responsibilities:
- Entity Lifecycle: Create, destroy, check existence
- Component Access: Get, set, remove, check presence
- Queries: Find entities by component types
- Batch Operations: Apply multiple updates atomically
- Serialization: Snapshot and restore world state
Storage Interface¶
The Storage protocol defines the contract all storage backends must implement:
Entity Lifecycle Methods¶
create_entity() → EntityId
Allocate a new unique entity ID:
destroy_entity(entity: EntityId) → None
Remove an entity and all its components:
entity_exists(entity: EntityId) → bool
Check if an entity is alive:
Generational Indices
EntityIds include a generation counter. Even if an ID is recycled, the generation ensures old references don't accidentally access new entities.
all_entities() → Iterator[EntityId]
Iterate over all living entities:
Component Access Methods¶
get_component(entity: EntityId, component_type: type[T]) → T | None
Retrieve a component from an entity:
set_component(entity: EntityId, component: Any) → None
Add or update a component on an entity:
Shared Wrapper Support
Shared component storage is explicit: pass Shared(component) when inserting/updating components that should share a backing storage instance.
remove_component(entity: EntityId, component_type: type) → bool
Delete a component from an entity:
removed = storage.remove_component(entity, Velocity)
if removed:
print("Velocity component removed")
has_component(entity: EntityId, component_type: type) → bool
Check if entity has a specific component:
get_component_types(entity: EntityId) → frozenset[type]
Get all component types on an entity:
Query Methods¶
query(*component_types: type) → Iterator[tuple[EntityId, tuple[Any, ...]]]
Find entities with all specified components:
# Find entities with Position AND Velocity
for entity, (pos, vel) in storage.query(Position, Velocity):
print(f"Entity {entity} at ({pos.x}, {pos.y})")
Query Returns Tuples
query() returns (entity, (comp1, comp2, ...)). Components are in the same order as the query types.
query_single(component_type: type[T]) → Iterator[tuple[EntityId, T]]
Optimized query for a single component type:
# Find all entities with Health
for entity, health in storage.query_single(Health):
if health.hp < 10:
print(f"Entity {entity} is low health")
Batch Update Methods¶
apply_updates(...) → list[EntityId]
Apply multiple changes atomically:
new_entities = storage.apply_updates(
updates={entity1: {Position: Position(5, 5)}},
inserts={entity2: [Velocity(1, 0)]},
removes={entity3: [Health]},
destroys=[entity4]
)
Parameters:
updates: Modify existing components{entity: {Type: component}}inserts: Add new components{entity: [component, ...]}removes: Delete components{entity: [Type, ...]}destroys: Delete entities[entity, ...]
Returns: List of newly created EntityIds (for spawns)
Atomic Application
All changes in a single apply_updates call are applied atomically. Either all succeed or none do (implementation-dependent).
Serialization Methods¶
snapshot() → bytes
Serialize entire storage state:
restore(data: bytes) → None
Restore from snapshot:
Snapshot Format is Implementation-Specific
Different storage backends may use different serialization formats. Snapshots are not portable across implementations.
Async Variants¶
For distributed or remote storage, async variants are provided:
# Async query
async for entity, (pos, vel) in storage.query_async(Position, Velocity):
process(entity, pos, vel)
# Async get
component = await storage.get_component_async(entity, Position)
# Async apply
new_entities = await storage.apply_updates_async(updates, inserts, removes, destroys)
Local vs Remote Storage
LocalStorage provides both sync and async methods (async wraps sync). Remote storage implementations may only support async methods for network I/O.
Entity Allocator¶
The EntityAllocator manages entity ID generation and recycling:
Generational Indices¶
EntityIds use a three-part structure:
@dataclass(frozen=True)
class EntityId:
shard: int # Which shard owns this entity (0 for local)
index: int # Unique index within shard
generation: int # Incremented on reuse
sequenceDiagram
participant User
participant Allocator
participant FreeList
User->>Allocator: allocate()
Allocator->>Allocator: Check free list
alt Free list empty
Allocator->>Allocator: index = next_index++
Allocator-->>User: EntityId(0, index, 0)
else Free list has IDs
Allocator->>FreeList: Pop ID
FreeList-->>Allocator: (index, gen)
Allocator->>Allocator: Increment generation
Allocator-->>User: EntityId(0, index, gen+1)
end
User->>Allocator: deallocate(entity)
Allocator->>FreeList: Push (index, gen)
Prevents Stale References
If you hold an EntityId and the entity is destroyed then recreated, the generation counter ensures your old ID won't accidentally access the new entity:
Allocator Methods¶
allocate() → EntityId
Allocate a new entity ID:
deallocate(entity: EntityId) → None
Return entity ID to free list:
is_alive(entity: EntityId) → bool
Check if entity ID is currently active:
Liveness Check
is_alive() checks if the entity's generation matches the allocator's current generation for that index. Destroyed entities have mismatched generations.
Sharding Support¶
The allocator supports sharding for distributed scenarios:
# Shard 0 allocator
allocator0 = EntityAllocator(shard=0)
e1 = allocator0.allocate() # EntityId(0, 1, 0)
# Shard 1 allocator
allocator1 = EntityAllocator(shard=1)
e2 = allocator1.allocate() # EntityId(1, 1, 0) ← Different shard
Future: Distributed Allocation
Shards enable partitioning entities across nodes. Each node has its own allocator with a unique shard ID, preventing ID collisions.
Built-in Storage Implementations¶
Local Storage¶
LocalStorage is the default in-memory storage backend:
Implementation:
class LocalStorage:
def __init__(self, shard: int = 0):
self._shard = shard
self._allocator = EntityAllocator(shard=shard)
self._components: dict[EntityId, dict[type, Any]] = {}
Data Structure:
_components = {
EntityId(0, 1, 0): {
Position: Position(10, 20),
Velocity: Velocity(1, 0),
Health: Health(100, 100)
},
EntityId(0, 2, 0): {
Position: Position(5, 5),
AgentTag: AgentTag("Alice")
}
}
Characteristics:
-
Simple
Dict-based, easy to understand and debug
-
Fast for Small Scale
Good performance for 100s-1000s of entities
-
High Memory
Stores all entities in memory, no persistence
-
O(n) Queries
Must check every entity (no archetypal optimization)
Usage:
from agentecs import World
from agentecs.storage import LocalStorage
# Default (shard 0)
world = World()
# Explicit local storage
world = World(storage=LocalStorage(shard=0))
Serialization:
Uses pickle for snapshot/restore:
Pickle Security
LocalStorage uses pickle which is not secure for untrusted data. Only restore snapshots from trusted sources.
Performance Characteristics:
| Operation | Complexity | Notes |
|---|---|---|
create_entity() |
O(1) | Allocator amortized |
get_component() |
O(1) | Dict lookup |
set_component() |
O(1) | Dict insert |
query(*types) |
O(n×m) | n=entities, m=types |
apply_updates() |
O(k) | k=total changes |
Optimization Opportunity
Future: Implement archetypal storage where entities with the same component types are stored contiguously. This reduces queries from O(n×m) to O(matched entities).
Remote Storage Options¶
Future Feature
Remote storage backends for distributed or persistent scenarios:
PostgreSQL Backend:
# Future API
from agentecs.storage.postgres import PostgreSQLStorage
storage = PostgreSQLStorage(
connection_string="postgresql://localhost/agentecs"
)
world = World(storage=storage)
- Persistent storage across runs
- SQL queries for analytics
- ACID transactions
- Slower than in-memory
Redis Backend:
# Future API
from agentecs.storage.redis import RedisStorage
storage = RedisStorage(host="localhost", port=6379)
world = World(storage=storage)
- In-memory with optional persistence
- Fast read/write
- Pub/sub for distributed scenarios
- TTL for automatic cleanup
S3/Cloud Storage:
# Future API
from agentecs.storage.s3 import S3Storage
storage = S3Storage(bucket="my-simulation-state")
world = World(storage=storage)
- Cheap persistent storage
- Good for checkpointing
- High latency (not for real-time)
- Versioning support
Distributed Storage¶
Future Feature
Shard entities across multiple nodes:
graph TD
A[Coordinator] -->|manages| B[Storage Shard 0]
A -->|manages| C[Storage Shard 1]
A -->|manages| D[Storage Shard 2]
B -->|stores| E[Entities 0-999]
C -->|stores| F[Entities 1000-1999]
D -->|stores| G[Entities 2000-2999]
H[System] -.queries.-> B
H -.queries.-> C
H -.queries.-> D
style A fill:#ffb74d
style B fill:#81c784
style C fill:#81c784
style D fill:#81c784
Sharding Strategies:
- Hash-based:
shard = hash(entity) % num_shards - Range-based:
shard = entity.index // shard_size - Spatial: Entities near each other on same shard (for locality)
- Component-based: Shard by primary component type
Challenges:
- Cross-shard queries: Entity on shard A references component from shard B
- Load balancing: Some shards may have more entities
- Consistency: Distributed transactions or eventual consistency
- Network overhead: Remote queries are slower than local
Example API:
Archetypal Storage Optimization¶
Future Feature
Current LocalStorage is entity-first (dict of entities). Archetypal storage is component-first:
Entity-First (Current):
{
Entity1: {Position, Velocity, Health},
Entity2: {Position, Velocity},
Entity3: {Position, Health},
}
Archetype-First (Future):
{
Archetype(Position, Velocity, Health): [Entity1],
Archetype(Position, Velocity): [Entity2],
Archetype(Position, Health): [Entity3],
}
Benefits:
- O(matched) Queries: Only iterate entities with matching archetype
- Cache Locality: Components stored contiguously in memory
- Batch Operations: Process all entities with same archetype together
Trade-offs:
- More complex implementation
- Component add/remove requires archetype change (slower)
- Memory overhead for archetype tracking
When to Use:
- 10,000+ entities
- Many entities share same component combinations
- Query-heavy workloads
Storage Best Practices¶
Start with LocalStorage
Begin with LocalStorage. Only move to distributed or persistent storage when profiling shows it's necessary.
Profile Query Patterns
Understand your query patterns before choosing storage:
- Many queries, few updates → Archetypal storage
- Frequent component changes → Entity-first storage
- Need persistence → Database backend
- Massive scale → Distributed storage
Beware of Cross-Shard Queries
In distributed scenarios, queries that span shards are expensive. Design component relationships to minimize cross-shard access.
Use Snapshots for Checkpointing
Periodic snapshots enable:
- Crash recovery
- Debugging (rewind to previous state)
- A/B testing (fork simulation from snapshot)
Consider Consistency Requirements
Different storage backends offer different consistency guarantees:
- LocalStorage: Strong consistency (single node)
- Distributed: Eventual consistency (requires conflict resolution)
- Persistent: ACID transactions (slow but safe)
Storage Protocol Example Implementation¶
Here's a minimal custom storage implementation:
from agentecs.storage.protocol import Storage
from agentecs.core.identity import EntityId
class InMemoryStorage(Storage):
"""Simple in-memory storage (minimal implementation)."""
def __init__(self):
self._data: dict[EntityId, dict[type, Any]] = {}
self._next_id = 0
def create_entity(self) -> EntityId:
entity = EntityId(0, self._next_id, 0)
self._next_id += 1
self._data[entity] = {}
return entity
def destroy_entity(self, entity: EntityId) -> None:
if entity in self._data:
del self._data[entity]
def entity_exists(self, entity: EntityId) -> bool:
return entity in self._data
def get_component(self, entity: EntityId, component_type: type[T]) -> T | None:
return self._data.get(entity, {}).get(component_type)
def set_component(self, entity: EntityId, component: Any) -> None:
if entity in self._data:
self._data[entity][type(component)] = component
# ... implement other protocol methods ...
Use custom storage:
See Also¶
- World Management: How World interacts with storage
- Scheduling: How storage affects scheduling decisions
- Queries: How queries are executed against storage
- Components: What storage stores