65 lines
2.2 KiB
Python
65 lines
2.2 KiB
Python
import asyncio
|
|
from typing import NewType, Protocol
|
|
from pydantic import BaseModel, JsonValue
|
|
|
|
from synctoy.data_model import Condition, NonNullRecord, ObjectId, RNull, Record
|
|
from synctoy.snapshot import SList, Selector, Snapshot
|
|
|
|
class Update(BaseModel):
|
|
condition: Condition
|
|
new: Record
|
|
|
|
class PreconditionException(Exception):
|
|
pass
|
|
|
|
class Store(object):
|
|
def __init__(self):
|
|
self._event = asyncio.Event()
|
|
self._versions: list[Record] = []
|
|
|
|
async def wait(self, max_timeout: float):
|
|
try:
|
|
await asyncio.wait_for(self._event.wait(), max_timeout)
|
|
except TimeoutError:
|
|
return
|
|
|
|
def observe(self, start_from: int):
|
|
return self._versions[start_from:]
|
|
|
|
def update(self, updates: list[Update]) -> int:
|
|
object_ids: set[ObjectId] = set(u.new.id for u in updates)
|
|
snapshot: Snapshot = self.snapshot(SList(object_ids=list(object_ids)))
|
|
|
|
for update in updates:
|
|
id = update.new.id
|
|
old_object = snapshot[id]
|
|
new_object = update.new
|
|
|
|
if not old_object.can_transition_to(new_object):
|
|
raise PreconditionException(f"can't transition from {old_object} to {new_object}")
|
|
if not new_object.can_transition_from(old_object):
|
|
raise PreconditionException(f"can't transition from {old_object} to {new_object}")
|
|
|
|
if not update.condition.is_met(old_object, new_object):
|
|
raise PreconditionException(f"failed condition: {update.condition}")
|
|
|
|
snapshot[id] = new_object
|
|
|
|
for new_update in updates:
|
|
self._versions.append(new_update.new)
|
|
self._event.set()
|
|
self._event = asyncio.Event()
|
|
return len(self._versions)
|
|
|
|
def snapshot(self, selector: Selector) -> Snapshot:
|
|
pre_snapshot: dict[ObjectId, Record] = {}
|
|
for row in self._versions:
|
|
if not selector.includes_object_id(row.id):
|
|
continue
|
|
|
|
pre_snapshot[row.id] = row
|
|
filtered: dict[ObjectId, NonNullRecord] = {
|
|
id: row for id, row in pre_snapshot.items() if not isinstance(row, RNull)
|
|
}
|
|
|
|
return Snapshot(selector, filtered) |