import { CreateTask, Task, TaskStatus, TaskType, UpdateTask } from "./data"; import { ResultAsync, errAsync, okAsync } from "neverthrow"; import { FlowExecCtx } from "@core/flow.execution.context"; import { Database, and, asc, eq, inArray } from "@pkg/db"; import { task } from "@pkg/db/schema"; import { type Err } from "@pkg/result"; import { taskErrors } from "./errors"; import { logger } from "@pkg/logger"; export class TasksRepository { constructor(private db: Database) {} createTask(fctx: FlowExecCtx, taskData: CreateTask): ResultAsync { logger.info("Creating new task", { ...fctx, taskId: taskData.id }); return ResultAsync.fromPromise( this.db .insert(task) .values({ id: taskData.id, type: taskData.type, status: taskData.status || TaskStatus.PENDING, progress: taskData.progress || 0, payload: taskData.payload ?? null, userId: taskData.userId, resourceId: taskData.resourceId, createdAt: new Date(), updatedAt: new Date(), }) .returning() .execute(), (error) => taskErrors.createTaskFailed( fctx, error instanceof Error ? error.message : String(error), ), ).map((result) => result[0] as Task); } getTaskById(fctx: FlowExecCtx, taskId: string): ResultAsync { return ResultAsync.fromPromise( this.db.query.task.findFirst({ where: eq(task.id, taskId), }), (error) => taskErrors.getTaskFailed( fctx, error instanceof Error ? error.message : String(error), ), ).andThen((result) => { if (!result) { return errAsync(taskErrors.taskNotFound(fctx, taskId)); } return okAsync(result as Task); }); } updateTask( fctx: FlowExecCtx, taskId: string, updates: UpdateTask, ): ResultAsync { return this.getTaskById(fctx, taskId).andThen(() => ResultAsync.fromPromise( this.db .update(task) .set({ ...updates, updatedAt: new Date() }) .where(eq(task.id, taskId)) .returning() .execute(), (error) => taskErrors.updateTaskFailed( fctx, error instanceof Error ? error.message : String(error), ), ).andThen((updateResult) => { if (!updateResult[0]) { return errAsync(taskErrors.taskNotFound(fctx, taskId)); } return okAsync(updateResult[0] as Task); }), ); } deleteTask(fctx: FlowExecCtx, taskId: string): ResultAsync { return ResultAsync.fromPromise( this.db.delete(task).where(eq(task.id, taskId)).execute(), (error) => taskErrors.deleteTaskFailed( fctx, error instanceof Error ? error.message : String(error), ), ).map(() => true); } getTasksByStatuses( fctx: FlowExecCtx, statuses: TaskStatus[], ): ResultAsync { return ResultAsync.fromPromise( this.db .select() .from(task) .where(inArray(task.status, statuses)) .orderBy(asc(task.createdAt)), (error) => taskErrors.getTasksByStatusFailed( fctx, error instanceof Error ? error.message : String(error), ), ).map((result) => result as Task[]); } getTasksByTypeAndStatuses( fctx: FlowExecCtx, type: TaskType, statuses: TaskStatus[], ): ResultAsync { return ResultAsync.fromPromise( this.db .select() .from(task) .where(and(eq(task.type, type), inArray(task.status, statuses))) .orderBy(asc(task.createdAt)), (error) => taskErrors.getTasksByStatusFailed( fctx, error instanceof Error ? error.message : String(error), ), ).map((result) => result as Task[]); } markTaskAsCompleted( fctx: FlowExecCtx, taskId: string, result?: Record, ): ResultAsync { return this.updateTask(fctx, taskId, { status: TaskStatus.COMPLETED, progress: 100, result: result ?? null, completedAt: new Date(), }); } markTaskAsFailed( fctx: FlowExecCtx, taskId: string, error: any, ): ResultAsync { return this.updateTask(fctx, taskId, { status: TaskStatus.FAILED, error: { code: error.code || "UNKNOWN_ERROR", message: error.message || "Task failed", detail: error.detail, timestamp: new Date(), }, completedAt: new Date(), }); } }