diff --git a/AGENTS.md b/AGENTS.md index 8727886..aec3094 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -17,182 +17,57 @@ More rules are only to be added by the human, in case such a suggestion becomes --- -## 1. Project Overview +## 1. Stack -- **Monorepo**: Turbo repo -- **Package Manager**: pnpm -- **Language**: TypeScript everywhere -- **Node**: >= 24 - -### Applications - -| App | Purpose | -| ---------------- | ----------------------------------------------------------------------------------------------------------------- | -| `apps/main` | SvelteKit UI — the primary user-facing application | -| `apps/processor` | Hono web server — intended for asynchronous processing (jobs, workers). Currently minimal; structure is evolving. | - -### Packages - -Packages live under `packages/` and are **standalone, modular** pieces consumed by apps: - -| Package | Purpose | -| --------------- | --------------------------------------------- | -| `@pkg/logic` | Domain logic (DDD, controllers, repositories) | -| `@pkg/db` | Drizzle schema, database access | -| `@pkg/logger` | Logging, `getError` for error construction | -| `@pkg/result` | Result type, `ERROR_CODES`, `tryCatch` | -| `@pkg/keystore` | Redis instance (sessions, 2FA, etc.) | -| `@pkg/settings` | App settings / env config | - -### Data Stores - -- **PostgreSQL (Drizzle)** — Primary relational data (auth, users, accounts, etc.) -- **Redis (Valkey)** — Sessions, 2FA verification state (via `@pkg/keystore`) - -Additional stores (NoSQL DBs, R2, etc.) may be introduced later. Follow existing domain patterns when adding new data access. +- **Monorepo**: Turborepo + pnpm +- **Language**: TypeScript everywhere, Node >= 24 +- **Apps**: `@apps/main` (SvelteKit), `@apps/front` (Hono), `@apps/orchestrator` (Hono) +- **Packages**: `@pkg/logic`, `@pkg/db`, `@pkg/logger`, `@pkg/result`, `@pkg/keystore`, `@pkg/settings` +- **DB**: PostgreSQL via Drizzle ORM; Redis (Valkey) via `@pkg/keystore` --- -## 2. The Logic Package: DDD + Layered Architecture +## 2. Logic Package Conventions -The `@pkg/logic` package contains **all domain logic**. It follows: +All domain logic lives in `@pkg/logic` under `packages/logic/domains//` with four files: `data.ts`, `repository.ts`, `controller.ts`, `errors.ts`. Mirror this exactly when adding a domain. -1. **Domain-Driven Design (DDD)** — Bounded contexts as domains -2. **Layered architecture** — Clear separation of concerns -3. **Result-style error handling** — Errors as values; avoid try-catch in domain code - -### Domain Structure - -Each domain is a folder under `packages/logic/domains/`: - -``` -domains/ - / - data.ts # Types, schemas (Valibot) - repository.ts # Data access - controller.ts # Use cases / application logic - errors.ts # Domain-specific error constructors (using getError) -``` - -The logic package is **pure domain logic** — no HTTP routes or routers. API exposure is handled by the main app via SvelteKit remote functions. Auth uses `config.base.ts` with better-auth. Add new domains as needed; mirror existing patterns. - -### Path Aliases (logic package) - -- `@/*` → `./*` -- `@domains/*` → `./domains/*` -- `@core/*` → `./core/*` - -### Flow Execution Context - -Domain operations receive a `FlowExecCtx` (`fctx`) for tracing and audit: +**Path aliases** (logic package only): +- `@/*` → `./*` · `@domains/*` → `./domains/*` · `@core/*` → `./core/*` +**FlowExecCtx** (`fctx`) — passed into every domain operation for tracing: ```ts -type FlowExecCtx = { - flowId: string; - userId?: string; - sessionId?: string; -}; +type FlowExecCtx = { flowId: string; userId?: string; sessionId?: string; }; ``` --- -## 3. Error Handling: Result Pattern (neverthrow) +## 3. Error Handling -Errors are **values**, not exceptions. The codebase uses Result-style handling. - -### Current Conventions - -1. **Logic package** — Uses `neverthrow` (`ResultAsync`, `okAsync`, `errAsync`) for async operations that may fail. -2. **`@pkg/result`** — Provides `Result`, `ERROR_CODES`, and `tryCatch()`. The `Result` type is legacy; So don't reach for it primarily. -3. Use `ERROR_CODES` for consistent error codes. -4. **`getError()`** — From `@pkg/logger`. Use at boundaries when converting a thrown error to an `Err` object: - `return getError({ code: ERROR_CODES.XXX, message: "...", description: "...", detail: "..." }, e)`. -5. **Domain errors** — Each domain has an `errors.ts` that exports error constructors built with `getError`. Use these instead of ad-hoc error objects. -6. **Check before use** — Always check `result.isOk()` / `result.isErr()` before using `result.value`; never assume success. - -### Err Shape - -```ts -type Err = { - flowId?: string; - code: string; - message: string; - description: string; - detail: string; - actionable?: boolean; - error?: any; - // Flexible, but more "defined base fields" in the future -}; -``` +- Use `neverthrow` (`ResultAsync`, `okAsync`, `errAsync`) for all fallible async logic. +- `@pkg/result`'s `Result` type is **legacy** — do not reach for it primarily. +- Use `ERROR_CODES` from `@pkg/result` for all error codes. +- Use `getError()` from `@pkg/logger` when converting thrown errors into `Err` objects at boundaries. +- Use domain-specific error constructors from `errors.ts` — never ad-hoc error objects. +- Always check `result.isOk()` / `result.isErr()` before accessing `result.value`. --- -## 4. Frontend (Main App) +## 4. Main App Conventions (`apps/main`) -The main app is a **SvelteKit** application with a domain-driven UI structure. - -### Structure - -- **Routes**: File-based routing under `src/routes/`. Layout groups (e.g. `(main)`, `auth`) wrap related pages. -- **Domain-driven UI**: Feature code lives under `src/lib/domains//` — each domain has its own folder with view models, components, and remote functions. -- **View Model (VM) pattern**: Domain logic and state for a screen live in `*.vm.svelte.ts` classes. VMs hold reactive state (`$state`), orchestrate remote function calls, and expose methods. Pages import and use a VM instance. - -### SvelteKit Remote Functions - -The main app uses **SvelteKit remote functions** as the primary API layer — replacing Hono routers in the logic package. Each domain has a `*.remote.ts` file that exposes `query` (reads) and `command` (writes) functions, called directly from VMs. Auth context and `FlowExecCtx` are built inside each remote function from `event.locals` via helpers in `$lib/core/server.utils`. - -Naming convention: `*SQ` for queries, `*SC` for commands. - -### Global Stores - -Shared state (`user`, `session`, `breadcrumbs`) lives in `$lib/global.stores.ts`. - -### Conventions - -- Pages are thin: they mount a VM, render components, and wire up lifecycle. -- VMs own async flows, polling, and error handling for their domain. -- VMs call remote functions directly; remote functions invoke logic controllers. -- UI components under `$lib/components/` are shared; domain-specific components live in `$lib/domains//`. +- Feature code lives in `src/lib/domains//` — view model, components, remote functions. +- **VM pattern**: `*.vm.svelte.ts` classes own all reactive state and async flows for a screen. Pages are thin — they just mount the VM. +- **Remote functions**: `*.remote.ts` per domain. Naming: `*SQ` for queries, `*SC` for commands. +- Auth context and `fctx` are built inside remote functions from `event.locals` via `$lib/core/server.utils`. --- -## 5. Processor App +## 5. Observability Convention -The processor is a **Hono** server intended for **background work** and async jobs. Its structure is still evolving and it is to be updated soon. - -When logic is added, processing logic should live under `src/domains//` and call into `@pkg/logic` controllers and repositories. +When adding any new operation in a repository or controller, wrap it with `traceResultAsync` from `@pkg/logic/core/observability.ts`. Keep span names descriptive and consistent (e.g. `"user.getUserInfo"`). Do not add ad-hoc spans. --- -## 6. Observability +## 6. Validation -The stack uses **OpenTelemetry** end-to-end for traces, logs, and metrics, shipped to a **SigNoz** instance (via OTel Collector). - -### How it fits together - -- **`apps/main`** bootstraps the OTel SDK in `instrumentation.server.ts` (auto-instrumentation via `@opentelemetry/sdk-node`). SvelteKit's `tracing` and `instrumentation` experimental flags wire this into the request lifecycle. -- **`@pkg/logger`** ships Winston logs to OTel via `OpenTelemetryTransportV3` — logs are correlated with active traces automatically. -- **`@pkg/logic/core/observability.ts`** provides two tracing helpers for domain code: - - `traceResultAsync` — wraps a `ResultAsync` operation in an OTel span. Use this in repositories and controllers. - - `withFlowSpan` — lower-level span wrapper for non-Result async code. -- Both helpers accept `fctx` and stamp spans with `flow.id`, `flow.user_id`, and `flow.session_id` for end-to-end trace correlation. - -### Convention - -When adding new domain operations in repositories or controllers, wrap them with `traceResultAsync`. Keep span names consistent and descriptive (e.g. `"user.getUserInfo"`). Do not add ad-hoc spans outside these helpers. - ---- - -## 7. Validation & Schemas - -- **Valibot** is used for schema validation in the logic package and in remote function input. -- Domain data types are defined in `data.ts` per domain. -- Use `v.InferOutput` for TypeScript types. -- Remote functions pass Valibot schemas to `query()` and `command()` for input validation. - ---- - -## 8. Package Naming - -- Apps: `@apps/*` (e.g. `@apps/main`, `@apps/processor`) -- Packages: `@pkg/*` (e.g. `@pkg/logic`, `@pkg/db`, `@pkg/logger`) +- **Valibot** for all schemas. Types via `v.InferOutput`. +- Domain data types defined in `data.ts`. Remote function inputs validated via Valibot schemas passed to `query()` / `command()`. diff --git a/README.md b/README.md index dba777a..2036b74 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,72 @@ # Illusory IOTAM -This is the source code for a SaaS project with purposes to basically offer the users the ability to use specific Android applications, but instead of the apps running on their own phone, they run on our hosted Docker-Android instances. +A SaaS platform that lets users run specific Android applications on hosted Docker-Android instances — instead of on their own device. Users get a unique link, install a PWA, and are streamed a live Android session from the cloud. -Right now the project is in alpha testing phase, so it is subject to change (greenfield project) +Currently in alpha. Greenfield. Subject to change. --- + +## How It Works + +1. Admin generates a unique link and assigns it to a user (or a slot). +2. User opens that link in their browser — served by `apps/front`. +3. User waits while a Docker-Android device is allocated to their session. +4. User is prompted to install the PWA. +5. User opens the PWA — they are routed into a live stream of their assigned Android instance. +6. Admin manages the entire fleet from `apps/main` (the dashboard), which communicates with `apps/orchestrator` running on each VPS to control Docker-Android containers. + +--- + +## Implementation Checklist + +### Foundation + +- [x] Monorepo setup (Turborepo + pnpm) +- [x] Shared packages: `@pkg/logic`, `@pkg/db`, `@pkg/logger`, `@pkg/result`, `@pkg/keystore`, `@pkg/settings` +- [x] PostgreSQL with Drizzle ORM +- [x] Redis (Valkey) via `@pkg/keystore` +- [x] OpenTelemetry end-to-end (logs, traces, metrics → SigNoz) +- [x] Auth system (Better Auth — email/password, magic link, 2FA/TOTP, sessions) +- [x] User management (roles, bans, account operations) +- [x] Notifications system (priority, archiving, bulk ops) +- [x] Admin dashboard shell (`apps/main` — SvelteKit) +- [x] Background task tracking schema (task table) + +### Device Management (Orchestrator + Admin) + +- [ ] Device schema — DB model for a device (host VPS, container ID, status, assigned session, etc.) +- [ ] Device domain in `@pkg/logic` — controller + repository + errors +- [ ] Orchestrator command interface — secured Hono routes the admin dashboard calls: + - [ ] `POST /devices/:id/start` — start a Docker-Android container + - [ ] `POST /devices/:id/stop` — stop a container + - [ ] `POST /devices/:id/restart` — restart a container + - [ ] `GET /devices` — list all devices and their current status + - [ ] `GET /devices/:id` — page to view the device in more detail (info, live stream feed with ws-scrcpy) +- [ ] Device allocation logic — mark a device as in-use for a user session +- [ ] Device release logic — free up a device when a session ends +- [ ] Admin dashboard: Devices page — list fleet, show status, trigger start/stop/restart +- [ ] Internal API key auth between `apps/main` and `apps/orchestrator` + +### Link Management (Admin + Front App) + +- [ ] Link schema — DB model (unique token, expiry, status, linked device ID) +- [ ] Link domain in `@pkg/logic` — controller + repository + errors +- [ ] Admin dashboard: Links page — generate links, view detail, configure linked device, revoke, delete +- [ ] `apps/front`: validate incoming link token on request +- [ ] `apps/front`: return appropriate error page for invalid/expired/revoked links + +### PWA & User Session Flow (`apps/front`) + +- [ ] `apps/front`: serve static PWA shell (HTML + manifest + service worker) +- [ ] `apps/front`: wait/loading page — poll for device allocation status +- [ ] `apps/front`: PWA install prompt flow (beforeinstallprompt handling) +- [ ] `apps/front`: session binding — tie the PWA launch to the user's allocated device +- [ ] `apps/front`: route/proxy authenticated PWA requests to the Android instance stream + +### Android Streaming (scrcpy + ws-scrcpy) + +- [ ] Docker-Android image setup and validation on VPS +- [ ] ws-scrcpy WebSocket server running per container, exposed via orchestrator +- [ ] `apps/front`: scrcpy client embedded in PWA — renders the Android stream in browser +- [ ] Input forwarding (touch/keyboard events → scrcpy → Android container) +- [ ] Session timeout + stream teardown on inactivity diff --git a/apps/main/src/lib/domains/device/device.remote.ts b/apps/main/src/lib/domains/device/device.remote.ts new file mode 100644 index 0000000..27ce66a --- /dev/null +++ b/apps/main/src/lib/domains/device/device.remote.ts @@ -0,0 +1,88 @@ +import { getDeviceController } from "@pkg/logic/domains/device/controller"; +import { createDeviceSchema, updateDeviceSchema } from "@pkg/logic/domains/device/data"; +import { getFlowExecCtxForRemoteFuncs, unauthorized } from "$lib/core/server.utils"; +import { command, getRequestEvent, query } from "$app/server"; +import * as v from "valibot"; + +const dc = getDeviceController(); + +export const listDevicesSQ = query(async () => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await dc.list(fctx); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; +}); + +export const getDeviceByIdSQ = query( + v.object({ id: v.number() }), + async (input) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await dc.getById(fctx, input.id); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const createDeviceSC = command(createDeviceSchema, async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await dc.create(fctx, payload); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; +}); + +export const updateDeviceSC = command( + v.object({ id: v.number(), data: updateDeviceSchema }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await dc.update(fctx, payload.id, payload.data); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const deleteDeviceSC = command( + v.object({ id: v.number() }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await dc.delete(fctx, payload.id); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const setDeviceStatusSC = command( + v.object({ + id: v.number(), + status: v.picklist(["online", "offline", "busy", "error"]), + }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await dc.setStatus(fctx, payload.id, payload.status as any); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); diff --git a/apps/main/src/lib/domains/device/device.vm.svelte.ts b/apps/main/src/lib/domains/device/device.vm.svelte.ts new file mode 100644 index 0000000..77ac221 --- /dev/null +++ b/apps/main/src/lib/domains/device/device.vm.svelte.ts @@ -0,0 +1,152 @@ +import { + listDevicesSQ, + createDeviceSC, + deleteDeviceSC, + setDeviceStatusSC, +} from "./device.remote"; +import { toast } from "svelte-sonner"; + +type Device = { + id: number; + title: string; + version: string; + status: string; + isActive: boolean; + containerId: string | null; + host: string; + wsPort: string | null; + createdAt: Date; + updatedAt: Date; +}; + +type CreateDeviceInput = { + title: string; + version: string; + host: string; + containerId?: string; + wsPort?: string; + isActive?: boolean; +}; + +class DeviceViewModel { + devices = $state([]); + loading = $state(false); + creating = $state(false); + deletingId = $state(null); + showCreateDialog = $state(false); + + async fetchDevices() { + this.loading = true; + try { + const result = await listDevicesSQ(); + if (result?.error || !result?.data) { + toast.error( + result?.error?.message || "Failed to fetch devices", + { + description: + result?.error?.description || "Please try again", + }, + ); + return; + } + this.devices = result.data as Device[]; + } catch (error) { + toast.error("Failed to fetch devices", { + description: + error instanceof Error + ? error.message + : "Please try again", + }); + } finally { + this.loading = false; + } + } + + async createDevice(data: CreateDeviceInput): Promise { + this.creating = true; + try { + const result = await createDeviceSC(data); + if (result?.error) { + toast.error( + result.error.message || "Failed to create device", + { + description: + result.error.description || "Please try again", + }, + ); + return false; + } + toast.success("Device created"); + this.showCreateDialog = false; + await this.fetchDevices(); + return true; + } catch (error) { + toast.error("Failed to create device", { + description: + error instanceof Error + ? error.message + : "Please try again", + }); + return false; + } finally { + this.creating = false; + } + } + + async deleteDevice(id: number) { + this.deletingId = id; + try { + const result = await deleteDeviceSC({ id }); + if (result?.error) { + toast.error( + result.error.message || "Failed to delete device", + { + description: + result.error.description || "Please try again", + }, + ); + return; + } + toast.success("Device deleted"); + await this.fetchDevices(); + } catch (error) { + toast.error("Failed to delete device", { + description: + error instanceof Error + ? error.message + : "Please try again", + }); + } finally { + this.deletingId = null; + } + } + + async setStatus(id: number, status: string) { + try { + const result = await setDeviceStatusSC({ + id, + status: status as any, + }); + if (result?.error) { + toast.error( + result.error.message || "Failed to update status", + { + description: + result.error.description || "Please try again", + }, + ); + return; + } + await this.fetchDevices(); + } catch (error) { + toast.error("Failed to update status", { + description: + error instanceof Error + ? error.message + : "Please try again", + }); + } + } +} + +export const deviceVM = new DeviceViewModel(); diff --git a/apps/main/src/lib/domains/link/link.remote.ts b/apps/main/src/lib/domains/link/link.remote.ts new file mode 100644 index 0000000..df65480 --- /dev/null +++ b/apps/main/src/lib/domains/link/link.remote.ts @@ -0,0 +1,108 @@ +import { getLinkController } from "@pkg/logic/domains/link/controller"; +import { updateLinkSchema } from "@pkg/logic/domains/link/data"; +import { getFlowExecCtxForRemoteFuncs, unauthorized } from "$lib/core/server.utils"; +import { command, getRequestEvent, query } from "$app/server"; +import * as v from "valibot"; + +const lc = getLinkController(); + +export const listLinksSQ = query(async () => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.list(fctx); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; +}); + +export const getLinkByIdSQ = query( + v.object({ id: v.number() }), + async (input) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.getById(fctx, input.id); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const createLinkSC = command( + v.object({ + linkedDeviceId: v.optional(v.nullable(v.number())), + expiresAt: v.optional(v.nullable(v.date())), + }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.create(fctx, payload); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const updateLinkSC = command( + v.object({ id: v.number(), data: updateLinkSchema }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.update(fctx, payload.id, payload.data); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const revokeLinkSC = command( + v.object({ id: v.number() }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.revoke(fctx, payload.id); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const deleteLinkSC = command( + v.object({ id: v.number() }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.delete(fctx, payload.id); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); + +export const assignDeviceSC = command( + v.object({ + id: v.number(), + deviceId: v.nullable(v.number()), + }), + async (payload) => { + const event = getRequestEvent(); + const fctx = await getFlowExecCtxForRemoteFuncs(event.locals); + if (!fctx.userId) return unauthorized(fctx); + + const res = await lc.assignDevice(fctx, payload.id, payload.deviceId); + return res.isOk() + ? { data: res.value, error: null } + : { data: null, error: res.error }; + }, +); diff --git a/dev/README.md b/dev/README.md deleted file mode 100644 index 23b0c33..0000000 --- a/dev/README.md +++ /dev/null @@ -1,42 +0,0 @@ -# Dev - -Self-contained local development stack. Spin up all shared infrastructure on a per-project basis. - -## Services - -| Service | Description | Port(s) | -| ------------------ | ---------------------------------------- | -------------- | -| **PostgreSQL** | Primary relational database | `5432` | -| **Valkey** | Redis-compatible cache / message broker | `6379` | -| **SigNoz** | Observability UI (traces, metrics, logs) | `8080` | -| **OTel Collector** | OpenTelemetry ingest (gRPC / HTTP) | `4317`, `4318` | -| **ClickHouse** | Telemetry storage backend for SigNoz | — | - -## Run - -```sh -cd dev -docker compose -f docker-compose.dev.yaml up -d -``` - -## Stop - -```sh -docker compose -f docker-compose.dev.yaml down -``` - -To also remove all persisted data volumes: - -```sh -docker compose -f docker-compose.dev.yaml down -v -``` - -## Connection strings - -| Resource | Default value | -| ---------- | --------------------------------------------------------- | -| PostgreSQL | `postgresql://postgres:postgres@localhost:5432/primarydb` | -| Valkey | `redis://localhost:6379` | -| SigNoz UI | `http://localhost:8080` | -| OTLP gRPC | `localhost:4317` | -| OTLP HTTP | `localhost:4318` | diff --git a/packages/db/schema/device.schema.ts b/packages/db/schema/device.schema.ts new file mode 100644 index 0000000..4424e2e --- /dev/null +++ b/packages/db/schema/device.schema.ts @@ -0,0 +1,25 @@ +import { + boolean, + pgTable, + serial, + text, + timestamp, + varchar, +} from "drizzle-orm/pg-core"; + +export const device = pgTable("device", { + id: serial("id").primaryKey(), + + title: text("title").notNull(), + version: text("version").notNull(), // Docker image / Android version tag + + status: varchar("status", { length: 16 }).notNull().default("offline"), // "online" | "offline" | "busy" | "error" + isActive: boolean("is_active").notNull().default(false), + + containerId: text("container_id"), // Docker container ID on the VPS + host: text("host").notNull(), // VPS hostname or IP + wsPort: text("ws_port"), // ws-scrcpy WebSocket port + + createdAt: timestamp("created_at").notNull(), + updatedAt: timestamp("updated_at").notNull(), +}); diff --git a/packages/db/schema/index.ts b/packages/db/schema/index.ts index 241f3ee..2ddcd31 100644 --- a/packages/db/schema/index.ts +++ b/packages/db/schema/index.ts @@ -1,4 +1,6 @@ export * from "./auth.schema"; export * from "./better.auth.schema"; +export * from "./device.schema"; export * from "./general.schema"; +export * from "./link.schema"; export * from "./task.schema"; diff --git a/packages/db/schema/link.schema.ts b/packages/db/schema/link.schema.ts new file mode 100644 index 0000000..1086640 --- /dev/null +++ b/packages/db/schema/link.schema.ts @@ -0,0 +1,26 @@ +import { integer, pgTable, serial, text, timestamp, varchar } from "drizzle-orm/pg-core"; +import { device } from "./device.schema"; +import { relations } from "drizzle-orm"; + +export const link = pgTable("link", { + id: serial("id").primaryKey(), + + token: text("token").notNull().unique(), + status: varchar("status", { length: 16 }).notNull().default("active"), // "active" | "inactive" | "expired" | "revoked" + + linkedDeviceId: integer("linked_device_id").references(() => device.id, { + onDelete: "set null", + }), + + expiresAt: timestamp("expires_at"), + lastAccessedAt: timestamp("last_accessed_at"), + createdAt: timestamp("created_at").notNull(), + updatedAt: timestamp("updated_at").notNull(), +}); + +export const linkRelations = relations(link, ({ one }) => ({ + device: one(device, { + fields: [link.linkedDeviceId], + references: [device.id], + }), +})); diff --git a/packages/logic/domains/device/controller.ts b/packages/logic/domains/device/controller.ts new file mode 100644 index 0000000..118a895 --- /dev/null +++ b/packages/logic/domains/device/controller.ts @@ -0,0 +1,67 @@ +import { errAsync, ResultAsync } from "neverthrow"; +import { db } from "@pkg/db"; +import { type Err } from "@pkg/result"; +import { FlowExecCtx } from "@core/flow.execution.context"; +import { CreateDevice, Device, DeviceStatus, UpdateDevice } from "./data"; +import { DeviceRepository } from "./repository"; +import { deviceErrors } from "./errors"; + +export class DeviceController { + constructor(private repo: DeviceRepository) {} + + list(fctx: FlowExecCtx): ResultAsync { + return this.repo.list(fctx); + } + + getById(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.getById(fctx, id); + } + + create(fctx: FlowExecCtx, data: CreateDevice): ResultAsync { + return this.repo.create(fctx, data); + } + + update( + fctx: FlowExecCtx, + id: number, + data: UpdateDevice, + ): ResultAsync { + return this.repo.update(fctx, id, data); + } + + delete(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.delete(fctx, id); + } + + setStatus( + fctx: FlowExecCtx, + id: number, + status: DeviceStatus, + ): ResultAsync { + return this.repo.setStatus(fctx, id, status); + } + + /** + * Mark a device as busy for an incoming session. + * Only succeeds if the device is currently online. + */ + allocate(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.getById(fctx, id).andThen((dev) => { + if (dev.status !== DeviceStatus.ONLINE) { + return errAsync(deviceErrors.deviceNotAvailable(fctx, id)); + } + return this.repo.setStatus(fctx, id, DeviceStatus.BUSY); + }); + } + + /** + * Release a device back to online after a session ends. + */ + release(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.setStatus(fctx, id, DeviceStatus.ONLINE); + } +} + +export function getDeviceController(): DeviceController { + return new DeviceController(new DeviceRepository(db)); +} diff --git a/packages/logic/domains/device/data.ts b/packages/logic/domains/device/data.ts new file mode 100644 index 0000000..d63cf66 --- /dev/null +++ b/packages/logic/domains/device/data.ts @@ -0,0 +1,48 @@ +import * as v from "valibot"; + +export enum DeviceStatus { + ONLINE = "online", + OFFLINE = "offline", + BUSY = "busy", + ERROR = "error", +} + +export const deviceStatusSchema = v.picklist(["online", "offline", "busy", "error"]); +export type DeviceStatusValue = v.InferOutput; + +export const deviceSchema = v.object({ + id: v.number(), + title: v.string(), + version: v.string(), + status: deviceStatusSchema, + isActive: v.boolean(), + containerId: v.nullable(v.string()), + host: v.string(), + wsPort: v.nullable(v.string()), + createdAt: v.date(), + updatedAt: v.date(), +}); +export type Device = v.InferOutput; + +export const createDeviceSchema = v.object({ + title: v.pipe(v.string(), v.minLength(1)), + version: v.pipe(v.string(), v.minLength(1)), + host: v.pipe(v.string(), v.minLength(1)), + containerId: v.optional(v.string()), + wsPort: v.optional(v.string()), + isActive: v.optional(v.boolean()), +}); +export type CreateDevice = v.InferOutput; + +export const updateDeviceSchema = v.partial( + v.object({ + title: v.string(), + version: v.string(), + host: v.string(), + containerId: v.nullable(v.string()), + wsPort: v.nullable(v.string()), + isActive: v.boolean(), + status: deviceStatusSchema, + }), +); +export type UpdateDevice = v.InferOutput; diff --git a/packages/logic/domains/device/errors.ts b/packages/logic/domains/device/errors.ts new file mode 100644 index 0000000..32c3527 --- /dev/null +++ b/packages/logic/domains/device/errors.ts @@ -0,0 +1,69 @@ +import { FlowExecCtx } from "@/core/flow.execution.context"; +import { ERROR_CODES, type Err } from "@pkg/result"; +import { getError } from "@pkg/logger"; + +export const deviceErrors = { + dbError: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Database operation failed", + description: "Please try again later", + detail, + }), + + deviceNotFound: (fctx: FlowExecCtx, id: number): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_FOUND, + message: "Device not found", + description: "The requested device does not exist", + detail: `No device found with ID: ${id}`, + }), + + listFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to list devices", + description: "Try again later", + detail, + }), + + createFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to create device", + description: "Try again later", + detail, + }), + + updateFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to update device", + description: "Try again later", + detail, + }), + + deleteFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to delete device", + description: "Try again later", + detail, + }), + + deviceNotAvailable: (fctx: FlowExecCtx, id: number): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_ALLOWED, + message: "Device is not available", + description: "The device is currently busy or offline", + detail: `Device ${id} cannot be allocated in its current state`, + actionable: true, + }), +}; diff --git a/packages/logic/domains/device/repository.ts b/packages/logic/domains/device/repository.ts new file mode 100644 index 0000000..0e2aef7 --- /dev/null +++ b/packages/logic/domains/device/repository.ts @@ -0,0 +1,137 @@ +import { ResultAsync, errAsync, okAsync } from "neverthrow"; +import { FlowExecCtx } from "@core/flow.execution.context"; +import { Database, asc, eq } from "@pkg/db"; +import { device } from "@pkg/db/schema"; +import { type Err } from "@pkg/result"; +import { logger } from "@pkg/logger"; +import { traceResultAsync } from "@core/observability"; +import { CreateDevice, Device, DeviceStatus, UpdateDevice } from "./data"; +import { deviceErrors } from "./errors"; + +export class DeviceRepository { + constructor(private db: Database) {} + + list(fctx: FlowExecCtx): ResultAsync { + return traceResultAsync({ + name: "device.list", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db.select().from(device).orderBy(asc(device.createdAt)), + (e) => + deviceErrors.listFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).map((rows) => rows as Device[]), + }); + } + + getById(fctx: FlowExecCtx, id: number): ResultAsync { + return traceResultAsync({ + name: "device.getById", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db.query.device.findFirst({ where: eq(device.id, id) }), + (e) => + deviceErrors.dbError( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).andThen((row) => { + if (!row) return errAsync(deviceErrors.deviceNotFound(fctx, id)); + return okAsync(row as Device); + }), + }); + } + + create(fctx: FlowExecCtx, data: CreateDevice): ResultAsync { + logger.info("Creating device", { ...fctx, host: data.host }); + + return traceResultAsync({ + name: "device.create", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db + .insert(device) + .values({ + title: data.title, + version: data.version, + host: data.host, + containerId: data.containerId ?? null, + wsPort: data.wsPort ?? null, + status: DeviceStatus.OFFLINE, + isActive: data.isActive ?? false, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning() + .execute(), + (e) => + deviceErrors.createFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).map((rows) => rows[0] as Device), + }); + } + + update( + fctx: FlowExecCtx, + id: number, + updates: UpdateDevice, + ): ResultAsync { + return traceResultAsync({ + name: "device.update", + fctx, + fn: () => + this.getById(fctx, id).andThen(() => + ResultAsync.fromPromise( + this.db + .update(device) + .set({ ...updates, updatedAt: new Date() }) + .where(eq(device.id, id)) + .returning() + .execute(), + (e) => + deviceErrors.updateFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).andThen((rows) => { + if (!rows[0]) + return errAsync(deviceErrors.deviceNotFound(fctx, id)); + return okAsync(rows[0] as Device); + }), + ), + }); + } + + delete(fctx: FlowExecCtx, id: number): ResultAsync { + return traceResultAsync({ + name: "device.delete", + fctx, + fn: () => + this.getById(fctx, id).andThen(() => + ResultAsync.fromPromise( + this.db.delete(device).where(eq(device.id, id)).execute(), + (e) => + deviceErrors.deleteFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).map(() => true), + ), + }); + } + + setStatus( + fctx: FlowExecCtx, + id: number, + status: DeviceStatus, + ): ResultAsync { + return this.update(fctx, id, { status }); + } +} diff --git a/packages/logic/domains/link/controller.ts b/packages/logic/domains/link/controller.ts new file mode 100644 index 0000000..be91702 --- /dev/null +++ b/packages/logic/domains/link/controller.ts @@ -0,0 +1,85 @@ +import { errAsync, ResultAsync } from "neverthrow"; +import { nanoid } from "nanoid"; +import { db } from "@pkg/db"; +import { type Err } from "@pkg/result"; +import { FlowExecCtx } from "@core/flow.execution.context"; +import { CreateLink, Link, LinkStatus, LinkWithDevice, UpdateLink } from "./data"; +import { LinkRepository } from "./repository"; +import { linkErrors } from "./errors"; + +export class LinkController { + constructor(private repo: LinkRepository) {} + + list(fctx: FlowExecCtx): ResultAsync { + return this.repo.list(fctx); + } + + getById(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.getById(fctx, id); + } + + /** + * Fetch a link by its URL token, including the joined device. + * Used by apps/front to validate and resolve an incoming link. + */ + getByToken(fctx: FlowExecCtx, token: string): ResultAsync { + return this.repo.getByToken(fctx, token); + } + + /** + * Validate a token: must exist, be active, and not be expired. + * Returns the resolved link+device on success. + */ + validate(fctx: FlowExecCtx, token: string): ResultAsync { + return this.repo.getByToken(fctx, token).andThen((l) => { + if (l.status !== LinkStatus.ACTIVE) { + return errAsync(linkErrors.linkNotActive(fctx, token)); + } + if (l.expiresAt && l.expiresAt < new Date()) { + return errAsync(linkErrors.linkExpired(fctx, token)); + } + return this.repo.touch(fctx, token).map(() => l); + }); + } + + /** + * Generate a new link. Token is auto-generated as a URL-safe nanoid. + */ + create( + fctx: FlowExecCtx, + data: Omit, + ): ResultAsync { + return this.repo.create(fctx, { + ...data, + token: nanoid(12), + }); + } + + update( + fctx: FlowExecCtx, + id: number, + data: UpdateLink, + ): ResultAsync { + return this.repo.update(fctx, id, data); + } + + assignDevice( + fctx: FlowExecCtx, + id: number, + deviceId: number | null, + ): ResultAsync { + return this.repo.update(fctx, id, { linkedDeviceId: deviceId }); + } + + revoke(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.update(fctx, id, { status: LinkStatus.REVOKED }); + } + + delete(fctx: FlowExecCtx, id: number): ResultAsync { + return this.repo.delete(fctx, id); + } +} + +export function getLinkController(): LinkController { + return new LinkController(new LinkRepository(db)); +} diff --git a/packages/logic/domains/link/data.ts b/packages/logic/domains/link/data.ts new file mode 100644 index 0000000..7da9786 --- /dev/null +++ b/packages/logic/domains/link/data.ts @@ -0,0 +1,47 @@ +import * as v from "valibot"; +import { deviceSchema } from "@domains/device/data"; + +export enum LinkStatus { + ACTIVE = "active", + INACTIVE = "inactive", + EXPIRED = "expired", + REVOKED = "revoked", +} + +export const linkStatusSchema = v.picklist(["active", "inactive", "expired", "revoked"]); +export type LinkStatusValue = v.InferOutput; + +export const linkSchema = v.object({ + id: v.number(), + token: v.string(), + status: linkStatusSchema, + linkedDeviceId: v.nullable(v.number()), + expiresAt: v.nullable(v.date()), + lastAccessedAt: v.nullable(v.date()), + createdAt: v.date(), + updatedAt: v.date(), +}); +export type Link = v.InferOutput; + +export const linkWithDeviceSchema = v.object({ + ...linkSchema.entries, + device: v.nullable(deviceSchema), +}); +export type LinkWithDevice = v.InferOutput; + +export const createLinkSchema = v.object({ + token: v.pipe(v.string(), v.minLength(1)), + linkedDeviceId: v.optional(v.nullable(v.number())), + expiresAt: v.optional(v.nullable(v.date())), +}); +export type CreateLink = v.InferOutput; + +export const updateLinkSchema = v.partial( + v.object({ + status: linkStatusSchema, + linkedDeviceId: v.nullable(v.number()), + expiresAt: v.nullable(v.date()), + lastAccessedAt: v.nullable(v.date()), + }), +); +export type UpdateLink = v.InferOutput; diff --git a/packages/logic/domains/link/errors.ts b/packages/logic/domains/link/errors.ts new file mode 100644 index 0000000..04fc9d2 --- /dev/null +++ b/packages/logic/domains/link/errors.ts @@ -0,0 +1,79 @@ +import { FlowExecCtx } from "@/core/flow.execution.context"; +import { ERROR_CODES, type Err } from "@pkg/result"; +import { getError } from "@pkg/logger"; + +export const linkErrors = { + dbError: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Database operation failed", + description: "Please try again later", + detail, + }), + + linkNotFound: (fctx: FlowExecCtx, identifier: string | number): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_FOUND, + message: "Link not found", + description: "The requested link does not exist", + detail: `No link found for: ${identifier}`, + }), + + listFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to list links", + description: "Try again later", + detail, + }), + + createFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to create link", + description: "Try again later", + detail, + }), + + updateFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to update link", + description: "Try again later", + detail, + }), + + deleteFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to delete link", + description: "Try again later", + detail, + }), + + linkNotActive: (fctx: FlowExecCtx, token: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_ALLOWED, + message: "Link is not active", + description: "This link has been revoked, expired, or deactivated", + detail: `Link with token ${token} is not in an active state`, + actionable: true, + }), + + linkExpired: (fctx: FlowExecCtx, token: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_ALLOWED, + message: "Link has expired", + description: "This link is no longer valid", + detail: `Link with token ${token} has passed its expiry date`, + actionable: true, + }), +}; diff --git a/packages/logic/domains/link/repository.ts b/packages/logic/domains/link/repository.ts new file mode 100644 index 0000000..726752a --- /dev/null +++ b/packages/logic/domains/link/repository.ts @@ -0,0 +1,173 @@ +import { ResultAsync, errAsync, okAsync } from "neverthrow"; +import { FlowExecCtx } from "@core/flow.execution.context"; +import { Database, asc, eq } from "@pkg/db"; +import { link } from "@pkg/db/schema"; +import { type Err } from "@pkg/result"; +import { logger } from "@pkg/logger"; +import { traceResultAsync } from "@core/observability"; +import { CreateLink, Link, LinkWithDevice, UpdateLink } from "./data"; +import { linkErrors } from "./errors"; + +export class LinkRepository { + constructor(private db: Database) {} + + list(fctx: FlowExecCtx): ResultAsync { + return traceResultAsync({ + name: "link.list", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db.select().from(link).orderBy(asc(link.createdAt)), + (e) => + linkErrors.listFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).map((rows) => rows as Link[]), + }); + } + + getById(fctx: FlowExecCtx, id: number): ResultAsync { + return traceResultAsync({ + name: "link.getById", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db.query.link.findFirst({ where: eq(link.id, id) }), + (e) => + linkErrors.dbError( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).andThen((row) => { + if (!row) return errAsync(linkErrors.linkNotFound(fctx, id)); + return okAsync(row as Link); + }), + }); + } + + getByToken(fctx: FlowExecCtx, token: string): ResultAsync { + return traceResultAsync({ + name: "link.getByToken", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db.query.link.findFirst({ + where: eq(link.token, token), + with: { device: true }, + }), + (e) => + linkErrors.dbError( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).andThen((row) => { + if (!row) return errAsync(linkErrors.linkNotFound(fctx, token)); + return okAsync(row as LinkWithDevice); + }), + }); + } + + create(fctx: FlowExecCtx, data: CreateLink): ResultAsync { + logger.info("Creating link", { ...fctx, token: data.token }); + + return traceResultAsync({ + name: "link.create", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db + .insert(link) + .values({ + token: data.token, + status: "active", + linkedDeviceId: data.linkedDeviceId ?? null, + expiresAt: data.expiresAt ?? null, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning() + .execute(), + (e) => + linkErrors.createFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).map((rows) => rows[0] as Link), + }); + } + + update( + fctx: FlowExecCtx, + id: number, + updates: UpdateLink, + ): ResultAsync { + return traceResultAsync({ + name: "link.update", + fctx, + fn: () => + this.getById(fctx, id).andThen(() => + ResultAsync.fromPromise( + this.db + .update(link) + .set({ ...updates, updatedAt: new Date() }) + .where(eq(link.id, id)) + .returning() + .execute(), + (e) => + linkErrors.updateFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).andThen((rows) => { + if (!rows[0]) + return errAsync(linkErrors.linkNotFound(fctx, id)); + return okAsync(rows[0] as Link); + }), + ), + }); + } + + delete(fctx: FlowExecCtx, id: number): ResultAsync { + return traceResultAsync({ + name: "link.delete", + fctx, + fn: () => + this.getById(fctx, id).andThen(() => + ResultAsync.fromPromise( + this.db.delete(link).where(eq(link.id, id)).execute(), + (e) => + linkErrors.deleteFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).map(() => true), + ), + }); + } + + touch(fctx: FlowExecCtx, token: string): ResultAsync { + return traceResultAsync({ + name: "link.touch", + fctx, + fn: () => + ResultAsync.fromPromise( + this.db + .update(link) + .set({ lastAccessedAt: new Date(), updatedAt: new Date() }) + .where(eq(link.token, token)) + .returning() + .execute(), + (e) => + linkErrors.updateFailed( + fctx, + e instanceof Error ? e.message : String(e), + ), + ).andThen((rows) => { + if (!rows[0]) + return errAsync(linkErrors.linkNotFound(fctx, token)); + return okAsync(rows[0] as Link); + }), + }); + } +}