import asyncio import json from pydantic import BaseModel, TypeAdapter from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.middleware.trustedhost import TrustedHostMiddleware from starlette.requests import Request from starlette.responses import JSONResponse, Response from starlette.routing import Route from synctoy.data_model import ObjectId, NonNullRecord, Record from synctoy.snapshot import SAll from synctoy.store import Store, Update store = Store() _TIMEOUT = 5.0 async def snapshot(request: Request): snapshot = store.snapshot(SAll()) populated = snapshot.populated_records() return JSONResponse(TypeAdapter(dict[ObjectId, NonNullRecord]).dump_python(populated, mode="json")) class UpdateResult(BaseModel): index: int async def update(request: Request): value = Update.model_validate_json(await request.body()) version = store.update([value]) return JSONResponse(UpdateResult(index=version).model_dump(mode="json")) async def events(request: Request): start = request.query_params.get("start") if start is None: start = 0 else: start = int(start) result = store.observe(start_from=start) if len(result) == 0: await store.wait(_TIMEOUT) result = store.observe(start_from=start) await asyncio.sleep(1.0) # simulated delay! wow return JSONResponse(TypeAdapter(list[Record]).dump_python(result)) app = Starlette(debug=True, routes=[ Route("/snapshot", snapshot, methods=["GET"]), Route("/update", update, methods=["POST"]), Route("/events", events, methods=["GET"]) ], ) app = CORSMiddleware(app=app, allow_origins=["*"])