Files

55 lines
1.7 KiB
Python

import asyncio
import json
from pydantic import BaseModel, TypeAdapter
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from synctoy.data_model import ObjectId, NonNullRecord, Record
from synctoy.snapshot import SAll
from synctoy.store import Store, Update
store = Store()
_TIMEOUT = 5.0
async def snapshot(request: Request):
snapshot = store.snapshot(SAll())
populated = snapshot.populated_records()
return JSONResponse(TypeAdapter(dict[ObjectId, NonNullRecord]).dump_python(populated, mode="json"))
class UpdateResult(BaseModel):
index: int
async def update(request: Request):
value = Update.model_validate_json(await request.body())
version = store.update([value])
return JSONResponse(UpdateResult(index=version).model_dump(mode="json"))
async def events(request: Request):
start = request.query_params.get("start")
if start is None:
start = 0
else:
start = int(start)
result = store.observe(start_from=start)
if len(result) == 0:
await store.wait(_TIMEOUT)
result = store.observe(start_from=start)
await asyncio.sleep(1.0) # simulated delay! wow
return JSONResponse(TypeAdapter(list[Record]).dump_python(result))
app = Starlette(debug=True, routes=[
Route("/snapshot", snapshot, methods=["GET"]),
Route("/update", update, methods=["POST"]),
Route("/events", events, methods=["GET"])
],
)
app = CORSMiddleware(app=app, allow_origins=["*"])