Add link session preparation flow
- wire front link resolve/prepare routes - add orchestrator session command handling - update admin dashboards and device/link logic
This commit is contained in:
136
apps/orchestrator/src/core/utils.ts
Normal file
136
apps/orchestrator/src/core/utils.ts
Normal file
@@ -0,0 +1,136 @@
|
||||
import type { FlowExecCtx } from "@pkg/logic/core/flow.execution.context";
|
||||
import { formatErrorDetail } from "@pkg/logger";
|
||||
import { ERROR_CODES, errorStatusMap, type Err } from "@pkg/result";
|
||||
import { settings } from "@pkg/settings";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { MiddlewareHandler } from "hono";
|
||||
import type { ContentfulStatusCode } from "hono/utils/http-status";
|
||||
import * as v from "valibot";
|
||||
|
||||
export type AppBindings = {
|
||||
Variables: {
|
||||
fctx: FlowExecCtx;
|
||||
};
|
||||
};
|
||||
|
||||
export function buildFlowExecCtx(flowId?: string): FlowExecCtx {
|
||||
return { flowId: flowId || randomUUID() };
|
||||
}
|
||||
|
||||
export function createAppError(
|
||||
fctx: FlowExecCtx,
|
||||
code: string,
|
||||
message: string,
|
||||
description: string,
|
||||
detail: string,
|
||||
actionable?: boolean,
|
||||
): Err {
|
||||
return {
|
||||
flowId: fctx.flowId,
|
||||
code,
|
||||
message,
|
||||
description,
|
||||
detail,
|
||||
actionable,
|
||||
};
|
||||
}
|
||||
|
||||
export function jsonError(error: Err, status?: number) {
|
||||
return {
|
||||
body: {
|
||||
data: null,
|
||||
error: {
|
||||
...error,
|
||||
},
|
||||
},
|
||||
status:
|
||||
status ||
|
||||
errorStatusMap[error.code] ||
|
||||
errorStatusMap[ERROR_CODES.INTERNAL_SERVER_ERROR] ||
|
||||
500,
|
||||
};
|
||||
}
|
||||
|
||||
export function toStatusCode(status: number): ContentfulStatusCode {
|
||||
return status as ContentfulStatusCode;
|
||||
}
|
||||
|
||||
export function isErrPayload(value: unknown): value is Err {
|
||||
return !!value && typeof value === "object" && "code" in value;
|
||||
}
|
||||
|
||||
export function parseDeviceId(rawId: string, fctx: FlowExecCtx): number | Err {
|
||||
const parsed = Number(rawId);
|
||||
|
||||
if (!Number.isInteger(parsed) || parsed <= 0) {
|
||||
return createAppError(
|
||||
fctx,
|
||||
ERROR_CODES.VALIDATION_ERROR,
|
||||
"Invalid device id",
|
||||
"The provided device id is not valid",
|
||||
`deviceId=${rawId}`,
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
return parsed;
|
||||
}
|
||||
|
||||
export async function parseJsonBody<TInput, TSchema extends v.GenericSchema<TInput>>(
|
||||
request: Request,
|
||||
schema: TSchema,
|
||||
fctx: FlowExecCtx,
|
||||
): Promise<TInput | Err> {
|
||||
let payload: unknown;
|
||||
|
||||
try {
|
||||
payload = await request.json();
|
||||
} catch (error) {
|
||||
return createAppError(
|
||||
fctx,
|
||||
ERROR_CODES.VALIDATION_ERROR,
|
||||
"Invalid JSON body",
|
||||
"The request body must be valid JSON",
|
||||
formatErrorDetail(error),
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
const parsed = v.safeParse(schema, payload);
|
||||
if (!parsed.success) {
|
||||
return createAppError(
|
||||
fctx,
|
||||
ERROR_CODES.VALIDATION_ERROR,
|
||||
"Invalid request body",
|
||||
"The request payload did not match the expected shape",
|
||||
parsed.issues.map((issue) => issue.message).join("; "),
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
return parsed.output;
|
||||
}
|
||||
|
||||
export const requireInternalApiKey: MiddlewareHandler<AppBindings> = async (
|
||||
c,
|
||||
next,
|
||||
) => {
|
||||
const fctx = buildFlowExecCtx(c.req.header("x-flow-id"));
|
||||
c.set("fctx", fctx);
|
||||
|
||||
const apiKey = c.req.header("x-internal-api-key");
|
||||
if (apiKey !== settings.internalApiKey) {
|
||||
const error = createAppError(
|
||||
fctx,
|
||||
ERROR_CODES.AUTH_ERROR,
|
||||
"Unauthorized",
|
||||
"The internal API key is invalid",
|
||||
"Missing or invalid x-internal-api-key header",
|
||||
true,
|
||||
);
|
||||
const response = jsonError(error, 403);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
await next();
|
||||
};
|
||||
57
apps/orchestrator/src/domains/command/executor.ts
Normal file
57
apps/orchestrator/src/domains/command/executor.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import type { Device } from "@pkg/logic/domains/device/data";
|
||||
import { formatErrorDetail } from "@pkg/logger";
|
||||
import { execFile } from "node:child_process";
|
||||
import { promisify } from "node:util";
|
||||
|
||||
export type InternalAction = "start" | "stop" | "restart";
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
export class AndroidCommandExecutor {
|
||||
constructor(
|
||||
private readonly binary: string,
|
||||
private readonly timeoutMs: number,
|
||||
) {}
|
||||
|
||||
async prepareAssignedApp(device: Device, packageName: string) {
|
||||
const serial = `${device.host}:${device.wsPort}`;
|
||||
|
||||
await this.run(["connect", serial]);
|
||||
await this.run(["-s", serial, "wait-for-device"]);
|
||||
await this.run(
|
||||
["-s", serial, "shell", "am", "force-stop", packageName],
|
||||
{ allowFailure: true },
|
||||
);
|
||||
await this.run([
|
||||
"-s",
|
||||
serial,
|
||||
"shell",
|
||||
"monkey",
|
||||
"-p",
|
||||
packageName,
|
||||
"-c",
|
||||
"android.intent.category.LAUNCHER",
|
||||
"1",
|
||||
]);
|
||||
|
||||
return { serial };
|
||||
}
|
||||
|
||||
async disconnect(device: Device) {
|
||||
const serial = `${device.host}:${device.wsPort}`;
|
||||
await this.run(["disconnect", serial], { allowFailure: true });
|
||||
}
|
||||
|
||||
private async run(args: string[], options?: { allowFailure?: boolean }) {
|
||||
try {
|
||||
return await execFileAsync(this.binary, args, {
|
||||
timeout: this.timeoutMs,
|
||||
});
|
||||
} catch (error) {
|
||||
if (options?.allowFailure) {
|
||||
return { stdout: "", stderr: formatErrorDetail(error) };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
74
apps/orchestrator/src/domains/command/router.ts
Normal file
74
apps/orchestrator/src/domains/command/router.ts
Normal file
@@ -0,0 +1,74 @@
|
||||
import { Hono } from "hono";
|
||||
import type { AppBindings } from "../../core/utils";
|
||||
import { jsonError, parseDeviceId, toStatusCode } from "../../core/utils";
|
||||
import {
|
||||
getDeviceById,
|
||||
handleContainerAction,
|
||||
listDevices,
|
||||
} from "./service";
|
||||
|
||||
export const commandRouter = new Hono<AppBindings>()
|
||||
.get("/devices", async (c) => {
|
||||
const fctx = c.get("fctx");
|
||||
const result = await listDevices(fctx);
|
||||
|
||||
if (result.isErr()) {
|
||||
const response = jsonError(result.error);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
return c.json({ data: result.value, error: null });
|
||||
})
|
||||
.get("/devices/:id", async (c) => {
|
||||
const fctx = c.get("fctx");
|
||||
const parsedId = parseDeviceId(c.req.param("id"), fctx);
|
||||
|
||||
if (typeof parsedId !== "number") {
|
||||
const response = jsonError(parsedId);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
const result = await getDeviceById(fctx, parsedId);
|
||||
if (result.isErr()) {
|
||||
const response = jsonError(result.error);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
return c.json({ data: result.value, error: null });
|
||||
})
|
||||
.post("/devices/:id/start", async (c) => {
|
||||
const fctx = c.get("fctx");
|
||||
const parsedId = parseDeviceId(c.req.param("id"), fctx);
|
||||
|
||||
if (typeof parsedId !== "number") {
|
||||
const response = jsonError(parsedId);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
const response = await handleContainerAction("start", parsedId, fctx);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
})
|
||||
.post("/devices/:id/stop", async (c) => {
|
||||
const fctx = c.get("fctx");
|
||||
const parsedId = parseDeviceId(c.req.param("id"), fctx);
|
||||
|
||||
if (typeof parsedId !== "number") {
|
||||
const response = jsonError(parsedId);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
const response = await handleContainerAction("stop", parsedId, fctx);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
})
|
||||
.post("/devices/:id/restart", async (c) => {
|
||||
const fctx = c.get("fctx");
|
||||
const parsedId = parseDeviceId(c.req.param("id"), fctx);
|
||||
|
||||
if (typeof parsedId !== "number") {
|
||||
const response = jsonError(parsedId);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
const response = await handleContainerAction("restart", parsedId, fctx);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
});
|
||||
64
apps/orchestrator/src/domains/command/service.ts
Normal file
64
apps/orchestrator/src/domains/command/service.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { getDeviceController } from "@pkg/logic/domains/device/controller";
|
||||
import type { FlowExecCtx } from "@pkg/logic/core/flow.execution.context";
|
||||
import type { Device } from "@pkg/logic/domains/device/data";
|
||||
import { createAppError, jsonError } from "../../core/utils";
|
||||
import { AndroidCommandExecutor } from "./executor";
|
||||
import { logDomainEvent } from "@pkg/logger";
|
||||
import { ERROR_CODES } from "@pkg/result";
|
||||
|
||||
export type InternalAction = "start" | "stop" | "restart";
|
||||
|
||||
const deviceController = getDeviceController();
|
||||
const adbBinary = process.env.ADB_BIN || "adb";
|
||||
const adbTimeoutMs = Number(process.env.ADB_TIMEOUT_MS || "15000");
|
||||
|
||||
const androidExecutor = new AndroidCommandExecutor(adbBinary, adbTimeoutMs);
|
||||
|
||||
export async function handleContainerAction(
|
||||
action: InternalAction,
|
||||
deviceId: number,
|
||||
fctx: FlowExecCtx,
|
||||
) {
|
||||
const deviceResult = await deviceController.getById(fctx, deviceId);
|
||||
if (deviceResult.isErr()) {
|
||||
return jsonError(deviceResult.error);
|
||||
}
|
||||
|
||||
logDomainEvent({
|
||||
level: "warn",
|
||||
event: "orchestrator.device_container.stubbed",
|
||||
fctx,
|
||||
meta: {
|
||||
action,
|
||||
deviceId,
|
||||
containerId: deviceResult.value.containerId,
|
||||
},
|
||||
});
|
||||
|
||||
return jsonError(
|
||||
createAppError(
|
||||
fctx,
|
||||
ERROR_CODES.NOT_IMPLEMENTED,
|
||||
"Container action not implemented",
|
||||
"This container control endpoint is currently a stub",
|
||||
`action=${action} deviceId=${deviceId}`,
|
||||
),
|
||||
501,
|
||||
);
|
||||
}
|
||||
|
||||
export async function listDevices(fctx: FlowExecCtx) {
|
||||
return deviceController.list(fctx);
|
||||
}
|
||||
|
||||
export async function getDeviceById(fctx: FlowExecCtx, deviceId: number) {
|
||||
return deviceController.getById(fctx, deviceId);
|
||||
}
|
||||
|
||||
export async function prepareAssignedApp(device: Device, packageName: string) {
|
||||
return androidExecutor.prepareAssignedApp(device, packageName);
|
||||
}
|
||||
|
||||
export async function disconnectDevice(device: Device) {
|
||||
return androidExecutor.disconnect(device);
|
||||
}
|
||||
9
apps/orchestrator/src/domains/orchestrate/data.ts
Normal file
9
apps/orchestrator/src/domains/orchestrate/data.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import * as v from "valibot";
|
||||
|
||||
export const sessionPrepareSchema = v.object({
|
||||
deviceId: v.pipe(v.number(), v.integer()),
|
||||
packageName: v.pipe(v.string(), v.minLength(1)),
|
||||
linkToken: v.optional(v.pipe(v.string(), v.minLength(1))),
|
||||
});
|
||||
|
||||
export type SessionPrepareInput = v.InferOutput<typeof sessionPrepareSchema>;
|
||||
30
apps/orchestrator/src/domains/orchestrate/router.ts
Normal file
30
apps/orchestrator/src/domains/orchestrate/router.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
import { Hono } from "hono";
|
||||
import type { AppBindings } from "../../core/utils";
|
||||
import {
|
||||
isErrPayload,
|
||||
jsonError,
|
||||
parseJsonBody,
|
||||
toStatusCode,
|
||||
} from "../../core/utils";
|
||||
import { type SessionPrepareInput, sessionPrepareSchema } from "./data";
|
||||
import { prepareSession } from "./service";
|
||||
|
||||
export const orchestrateRouter = new Hono<AppBindings>().post(
|
||||
"/sessions/prepare",
|
||||
async (c) => {
|
||||
const fctx = c.get("fctx");
|
||||
const body = await parseJsonBody<SessionPrepareInput, typeof sessionPrepareSchema>(
|
||||
c.req.raw,
|
||||
sessionPrepareSchema,
|
||||
fctx,
|
||||
);
|
||||
|
||||
if (isErrPayload(body)) {
|
||||
const response = jsonError(body);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
}
|
||||
|
||||
const response = await prepareSession(fctx, body);
|
||||
return c.json(response.body, toStatusCode(response.status));
|
||||
},
|
||||
);
|
||||
134
apps/orchestrator/src/domains/orchestrate/service.ts
Normal file
134
apps/orchestrator/src/domains/orchestrate/service.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import type { FlowExecCtx } from "@pkg/logic/core/flow.execution.context";
|
||||
import { formatErrorDetail, logDomainEvent } from "@pkg/logger";
|
||||
import { createAppError, jsonError } from "../../core/utils";
|
||||
import type { SessionPrepareInput } from "./data";
|
||||
import { ERROR_CODES } from "@pkg/result";
|
||||
import { getDeviceController } from "@pkg/logic/domains/device/controller";
|
||||
import { disconnectDevice, prepareAssignedApp } from "../command/service";
|
||||
|
||||
const deviceController = getDeviceController();
|
||||
|
||||
export async function prepareSession(
|
||||
fctx: FlowExecCtx,
|
||||
input: SessionPrepareInput,
|
||||
) {
|
||||
logDomainEvent({
|
||||
event: "orchestrator.session_prepare.started",
|
||||
fctx,
|
||||
meta: {
|
||||
deviceId: input.deviceId,
|
||||
packageName: input.packageName,
|
||||
linkToken: input.linkToken,
|
||||
},
|
||||
});
|
||||
|
||||
const deviceResult = await deviceController.getById(fctx, input.deviceId);
|
||||
if (deviceResult.isErr()) {
|
||||
logDomainEvent({
|
||||
level: "warn",
|
||||
event: "orchestrator.session_prepare.device_lookup_failed",
|
||||
fctx,
|
||||
error: deviceResult.error,
|
||||
meta: {
|
||||
deviceId: input.deviceId,
|
||||
},
|
||||
});
|
||||
|
||||
return jsonError(deviceResult.error);
|
||||
}
|
||||
|
||||
const allocatedDevice = await deviceController.allocate(
|
||||
fctx,
|
||||
input.deviceId,
|
||||
);
|
||||
if (allocatedDevice.isErr()) {
|
||||
logDomainEvent({
|
||||
level: "warn",
|
||||
event: "orchestrator.session_prepare.device_unavailable",
|
||||
fctx,
|
||||
error: allocatedDevice.error,
|
||||
meta: {
|
||||
deviceId: input.deviceId,
|
||||
},
|
||||
});
|
||||
|
||||
return jsonError(allocatedDevice.error);
|
||||
}
|
||||
|
||||
let releaseRequired = true;
|
||||
|
||||
try {
|
||||
const prepResult = await prepareAssignedApp(
|
||||
allocatedDevice.value,
|
||||
input.packageName,
|
||||
);
|
||||
|
||||
logDomainEvent({
|
||||
event: "orchestrator.session_prepare.succeeded",
|
||||
fctx,
|
||||
meta: {
|
||||
deviceId: input.deviceId,
|
||||
packageName: input.packageName,
|
||||
serial: prepResult.serial,
|
||||
},
|
||||
});
|
||||
|
||||
releaseRequired = false;
|
||||
|
||||
return {
|
||||
body: {
|
||||
data: {
|
||||
deviceId: input.deviceId,
|
||||
containerId: allocatedDevice.value.containerId,
|
||||
packageName: input.packageName,
|
||||
serial: prepResult.serial,
|
||||
status: "prepared",
|
||||
},
|
||||
error: null,
|
||||
},
|
||||
status: 200,
|
||||
};
|
||||
} catch (error) {
|
||||
logDomainEvent({
|
||||
level: "error",
|
||||
event: "orchestrator.session_prepare.failed",
|
||||
fctx,
|
||||
error,
|
||||
meta: {
|
||||
deviceId: input.deviceId,
|
||||
packageName: input.packageName,
|
||||
},
|
||||
});
|
||||
|
||||
return jsonError(
|
||||
createAppError(
|
||||
fctx,
|
||||
ERROR_CODES.EXTERNAL_SERVICE_ERROR,
|
||||
"Failed to prepare Android session",
|
||||
"The orchestrator could not connect to the Android device or launch the app",
|
||||
formatErrorDetail(error),
|
||||
),
|
||||
502,
|
||||
);
|
||||
} finally {
|
||||
await disconnectDevice(allocatedDevice.value);
|
||||
|
||||
if (releaseRequired) {
|
||||
const releaseResult = await deviceController.release(
|
||||
fctx,
|
||||
input.deviceId,
|
||||
);
|
||||
if (releaseResult.isErr()) {
|
||||
logDomainEvent({
|
||||
level: "error",
|
||||
event: "orchestrator.session_prepare.release_failed",
|
||||
fctx,
|
||||
error: releaseResult.error,
|
||||
meta: {
|
||||
deviceId: input.deviceId,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,15 @@ import "./instrumentation.js";
|
||||
import { createHttpTelemetryMiddleware } from "@pkg/logic/core/http.telemetry";
|
||||
import { serve } from "@hono/node-server";
|
||||
import { Hono } from "hono";
|
||||
import type { AppBindings } from "./core/utils";
|
||||
import { requireInternalApiKey } from "./core/utils";
|
||||
import { commandRouter } from "./domains/command/router";
|
||||
import { orchestrateRouter } from "./domains/orchestrate/router";
|
||||
|
||||
const app = new Hono().use("*", createHttpTelemetryMiddleware("orchestrator"));
|
||||
const app = new Hono<AppBindings>().use(
|
||||
"*",
|
||||
createHttpTelemetryMiddleware("orchestrator"),
|
||||
);
|
||||
|
||||
const host = process.env.HOST || "0.0.0.0";
|
||||
const port = Number(process.env.PORT || "3000");
|
||||
@@ -17,6 +24,10 @@ app.get("/ping", (c) => {
|
||||
return c.text("pong");
|
||||
});
|
||||
|
||||
app.use("/internal/*", requireInternalApiKey);
|
||||
app.route("/internal", commandRouter);
|
||||
app.route("/internal", orchestrateRouter);
|
||||
|
||||
serve(
|
||||
{
|
||||
fetch: app.fetch,
|
||||
|
||||
Reference in New Issue
Block a user