artifacts-dashboard/backend/app/main.py
Paweł Orzech 75313b83c0
Add multi-user workflows/pipelines and error tracking
Add multi-user automation features and per-user error tracking.

- Database migrations: add workflow_configs/workflow_runs (004), app_errors (005), pipeline_configs/pipeline_runs (006), and add user_token_hash to app_errors (007).
- Backend: introduce per-request token handling (X-API-Token) via app.api.deps and update many API routes (auth, automations, bank, characters, dashboard, events, exchange, logs) to use user-scoped Artifacts client and character scoping. Auth endpoints no longer store tokens server-side (validate-only); clear is a no-op on server.
- New Errors API and services: endpoint to list, filter, resolve, and report errors scoped to the requesting user; add error models, schemas, middleware/error handler and error_service for recording/hashing tokens.
- Pipelines & Workflows: add API routers, models, schemas and engine modules (pipeline/worker/coordinator, workflow runner/conditions) and action_executor updates to support workflow/pipeline execution.
- Logs: logs endpoint now prefers fetching recent action logs from the game API (with fallback to local DB), supports paging and filtering, and scopes results to the user.
- Frontend: add pipeline/workflow builders, lists, progress components and hooks (use-errors, use-pipelines, use-workflows), sentry client config, and updates to API client/constants/types.
- Misc: add middleware error handler, various engine strategy tweaks, tests adjusted.

Overall this change enables per-user API tokens, scopes DB queries to each user, introduces pipelines/workflows runtime support, and centralizes application error tracking.
2026-03-01 23:02:34 +01:00

293 lines
9.3 KiB
Python

import asyncio
import json
import logging
import sys
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
# ---- Sentry (conditional) ----
if settings.sentry_dsn:
try:
import sentry_sdk
sentry_sdk.init(
dsn=settings.sentry_dsn,
environment=settings.environment,
traces_sample_rate=0.2,
send_default_pii=False,
)
except Exception:
pass # Sentry is optional; don't block startup
from app.database import async_session_factory, engine, Base
from app.services.artifacts_client import ArtifactsClient
from app.services.character_service import CharacterService
from app.services.game_data_cache import GameDataCacheService
# Import models so they are registered on Base.metadata
from app.models import game_cache as _game_cache_model # noqa: F401
from app.models import character_snapshot as _snapshot_model # noqa: F401
from app.models import automation as _automation_model # noqa: F401
from app.models import workflow as _workflow_model # noqa: F401
from app.models import price_history as _price_history_model # noqa: F401
from app.models import event_log as _event_log_model # noqa: F401
from app.models import app_error as _app_error_model # noqa: F401
from app.models import pipeline as _pipeline_model # noqa: F401
# Import routers
from app.api.characters import router as characters_router
from app.api.game_data import router as game_data_router
from app.api.dashboard import router as dashboard_router
from app.api.bank import router as bank_router
from app.api.automations import router as automations_router
from app.api.ws import router as ws_router
from app.api.exchange import router as exchange_router
from app.api.events import router as events_router
from app.api.logs import router as logs_router
from app.api.auth import router as auth_router
from app.api.workflows import router as workflows_router
from app.api.errors import router as errors_router
from app.api.pipelines import router as pipelines_router
# Automation engine
from app.engine.pathfinder import Pathfinder
from app.engine.manager import AutomationManager
# Error-handling middleware
from app.middleware.error_handler import ErrorHandlerMiddleware
# Exchange service
from app.services.exchange_service import ExchangeService
# WebSocket system
from app.websocket.event_bus import EventBus
from app.websocket.client import GameWebSocketClient
from app.websocket.handlers import GameEventHandler
logger = logging.getLogger(__name__)
class _JSONFormatter(logging.Formatter):
"""Structured JSON log formatter for production."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info and record.exc_info[1]:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, default=str)
_handler = logging.StreamHandler(sys.stdout)
if settings.environment != "development":
_handler.setFormatter(_JSONFormatter())
else:
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
logging.basicConfig(level=logging.INFO, handlers=[_handler])
async def _snapshot_loop(
db_factory: async_session_factory.__class__,
client: ArtifactsClient,
character_service: CharacterService,
interval: float = 60.0,
) -> None:
"""Periodically save character snapshots."""
while True:
try:
async with db_factory() as db:
await character_service.take_snapshot(db, client)
except asyncio.CancelledError:
logger.info("Character snapshot loop cancelled")
return
except Exception:
logger.exception("Error taking character snapshot")
await asyncio.sleep(interval)
async def _load_pathfinder_maps(
pathfinder: Pathfinder,
cache_service: GameDataCacheService,
) -> None:
"""Load map data from the game data cache into the pathfinder.
Retries with a short delay if the cache has not been populated yet
(e.g. the background refresh has not completed its first pass).
"""
max_attempts = 10
for attempt in range(1, max_attempts + 1):
try:
async with async_session_factory() as db:
maps = await cache_service.get_maps(db)
if maps:
pathfinder.load_maps(maps)
logger.info("Pathfinder loaded %d map tiles", len(maps))
return
logger.info(
"Map cache empty, retrying (%d/%d)",
attempt,
max_attempts,
)
except Exception:
logger.exception(
"Error loading maps into pathfinder (attempt %d/%d)",
attempt,
max_attempts,
)
await asyncio.sleep(5)
logger.warning(
"Pathfinder could not load maps after %d attempts; "
"automations that depend on pathfinding will not work until maps are cached",
max_attempts,
)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# --- Startup ---
# Create tables if they do not exist (useful for dev; in prod rely on Alembic)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Instantiate shared services
client = ArtifactsClient()
cache_service = GameDataCacheService()
character_service = CharacterService()
# Event bus for internal pub/sub
event_bus = EventBus()
exchange_service = ExchangeService()
app.state.artifacts_client = client
app.state.cache_service = cache_service
app.state.character_service = character_service
app.state.event_bus = event_bus
app.state.exchange_service = exchange_service
# Start background cache refresh (runs immediately, then every 30 min)
cache_task = cache_service.start_background_refresh(
db_factory=async_session_factory,
client=client,
)
# Start periodic character snapshot (every 60 seconds)
snapshot_task = asyncio.create_task(
_snapshot_loop(async_session_factory, client, character_service)
)
# --- Automation engine ---
# Initialize pathfinder and load maps (runs in a background task so it
# does not block startup if the cache has not been populated yet)
pathfinder = Pathfinder()
pathfinder_task = asyncio.create_task(
_load_pathfinder_maps(pathfinder, cache_service)
)
# Create the automation manager and expose it on app.state
automation_manager = AutomationManager(
client=client,
db_factory=async_session_factory,
pathfinder=pathfinder,
event_bus=event_bus,
)
app.state.automation_manager = automation_manager
# --- Price capture background task ---
price_capture_task = exchange_service.start_price_capture(
db_factory=async_session_factory,
client=client,
)
# --- WebSocket system ---
# Game WebSocket client (connects to the Artifacts game server)
game_ws_client = GameWebSocketClient(
token=settings.artifacts_token,
event_bus=event_bus,
)
game_ws_task = await game_ws_client.start()
app.state.game_ws_client = game_ws_client
# Event handler (processes game events from the bus)
game_event_handler = GameEventHandler(event_bus=event_bus)
event_handler_task = await game_event_handler.start()
logger.info("Artifacts Dashboard API started")
yield
# --- Shutdown ---
logger.info("Shutting down background tasks")
# Stop all running automations gracefully
await automation_manager.stop_all()
# Stop WebSocket system
await game_event_handler.stop()
await game_ws_client.stop()
cache_service.stop_background_refresh()
exchange_service.stop_price_capture()
snapshot_task.cancel()
pathfinder_task.cancel()
# Wait for tasks to finish cleanly
for task in (cache_task, snapshot_task, pathfinder_task, price_capture_task, game_ws_task, event_handler_task):
try:
await task
except asyncio.CancelledError:
pass
await client.close()
await engine.dispose()
logger.info("Artifacts Dashboard API stopped")
app = FastAPI(
title="Artifacts Dashboard API",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(ErrorHandlerMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register routers
app.include_router(characters_router)
app.include_router(game_data_router)
app.include_router(dashboard_router)
app.include_router(bank_router)
app.include_router(automations_router)
app.include_router(ws_router)
app.include_router(exchange_router)
app.include_router(events_router)
app.include_router(logs_router)
app.include_router(auth_router)
app.include_router(workflows_router)
app.include_router(errors_router)
app.include_router(pipelines_router)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}