import asyncio from typing import NewType, Protocol from pydantic import BaseModel, JsonValue from synctoy.data_model import Condition, NonNullRecord, ObjectId, RNull, Record from synctoy.snapshot import SList, Selector, Snapshot class Update(BaseModel): condition: Condition new: Record class PreconditionException(Exception): pass class Store(object): def __init__(self): self._event = asyncio.Event() self._versions: list[Record] = [] async def wait(self, max_timeout: float): try: await asyncio.wait_for(self._event.wait(), max_timeout) except TimeoutError: return def observe(self, start_from: int): return self._versions[start_from:] def update(self, updates: list[Update]) -> int: object_ids: set[ObjectId] = set(u.new.id for u in updates) snapshot: Snapshot = self.snapshot(SList(object_ids=list(object_ids))) for update in updates: id = update.new.id old_object = snapshot[id] new_object = update.new if not old_object.can_transition_to(new_object): raise PreconditionException(f"can't transition from {old_object} to {new_object}") if not new_object.can_transition_from(old_object): raise PreconditionException(f"can't transition from {old_object} to {new_object}") if not update.condition.is_met(old_object, new_object): raise PreconditionException(f"failed condition: {update.condition}") snapshot[id] = new_object for new_update in updates: self._versions.append(new_update.new) self._event.set() self._event = asyncio.Event() return len(self._versions) def snapshot(self, selector: Selector) -> Snapshot: pre_snapshot: dict[ObjectId, Record] = {} for row in self._versions: if not selector.includes_object_id(row.id): continue pre_snapshot[row.id] = row filtered: dict[ObjectId, NonNullRecord] = { id: row for id, row in pre_snapshot.items() if not isinstance(row, RNull) } return Snapshot(selector, filtered)