164 lines
5.4 KiB
TypeScript
164 lines
5.4 KiB
TypeScript
import { CreateTask, Task, TaskStatus, TaskType, UpdateTask } from "./data";
|
|
import { ResultAsync, errAsync, okAsync } from "neverthrow";
|
|
import { FlowExecCtx } from "@core/flow.execution.context";
|
|
import { Database, and, asc, eq, inArray } from "@pkg/db";
|
|
import { task } from "@pkg/db/schema";
|
|
import { type Err } from "@pkg/result";
|
|
import { taskErrors } from "./errors";
|
|
import { logger } from "@pkg/logger";
|
|
|
|
export class TasksRepository {
|
|
constructor(private db: Database) {}
|
|
|
|
createTask(fctx: FlowExecCtx, taskData: CreateTask): ResultAsync<Task, Err> {
|
|
logger.info("Creating new task", { ...fctx, taskId: taskData.id });
|
|
|
|
return ResultAsync.fromPromise(
|
|
this.db
|
|
.insert(task)
|
|
.values({
|
|
id: taskData.id,
|
|
type: taskData.type,
|
|
status: taskData.status || TaskStatus.PENDING,
|
|
progress: taskData.progress || 0,
|
|
payload: taskData.payload ?? null,
|
|
userId: taskData.userId,
|
|
resourceId: taskData.resourceId,
|
|
createdAt: new Date(),
|
|
updatedAt: new Date(),
|
|
})
|
|
.returning()
|
|
.execute(),
|
|
(error) =>
|
|
taskErrors.createTaskFailed(
|
|
fctx,
|
|
error instanceof Error ? error.message : String(error),
|
|
),
|
|
).map((result) => result[0] as Task);
|
|
}
|
|
|
|
getTaskById(fctx: FlowExecCtx, taskId: string): ResultAsync<Task, Err> {
|
|
return ResultAsync.fromPromise(
|
|
this.db.query.task.findFirst({
|
|
where: eq(task.id, taskId),
|
|
}),
|
|
(error) =>
|
|
taskErrors.getTaskFailed(
|
|
fctx,
|
|
error instanceof Error ? error.message : String(error),
|
|
),
|
|
).andThen((result) => {
|
|
if (!result) {
|
|
return errAsync(taskErrors.taskNotFound(fctx, taskId));
|
|
}
|
|
|
|
return okAsync(result as Task);
|
|
});
|
|
}
|
|
|
|
updateTask(
|
|
fctx: FlowExecCtx,
|
|
taskId: string,
|
|
updates: UpdateTask,
|
|
): ResultAsync<Task, Err> {
|
|
return this.getTaskById(fctx, taskId).andThen(() =>
|
|
ResultAsync.fromPromise(
|
|
this.db
|
|
.update(task)
|
|
.set({ ...updates, updatedAt: new Date() })
|
|
.where(eq(task.id, taskId))
|
|
.returning()
|
|
.execute(),
|
|
(error) =>
|
|
taskErrors.updateTaskFailed(
|
|
fctx,
|
|
error instanceof Error ? error.message : String(error),
|
|
),
|
|
).andThen((updateResult) => {
|
|
if (!updateResult[0]) {
|
|
return errAsync(taskErrors.taskNotFound(fctx, taskId));
|
|
}
|
|
return okAsync(updateResult[0] as Task);
|
|
}),
|
|
);
|
|
}
|
|
|
|
deleteTask(fctx: FlowExecCtx, taskId: string): ResultAsync<boolean, Err> {
|
|
return ResultAsync.fromPromise(
|
|
this.db.delete(task).where(eq(task.id, taskId)).execute(),
|
|
(error) =>
|
|
taskErrors.deleteTaskFailed(
|
|
fctx,
|
|
error instanceof Error ? error.message : String(error),
|
|
),
|
|
).map(() => true);
|
|
}
|
|
|
|
getTasksByStatuses(
|
|
fctx: FlowExecCtx,
|
|
statuses: TaskStatus[],
|
|
): ResultAsync<Task[], Err> {
|
|
return ResultAsync.fromPromise(
|
|
this.db
|
|
.select()
|
|
.from(task)
|
|
.where(inArray(task.status, statuses))
|
|
.orderBy(asc(task.createdAt)),
|
|
(error) =>
|
|
taskErrors.getTasksByStatusFailed(
|
|
fctx,
|
|
error instanceof Error ? error.message : String(error),
|
|
),
|
|
).map((result) => result as Task[]);
|
|
}
|
|
|
|
getTasksByTypeAndStatuses(
|
|
fctx: FlowExecCtx,
|
|
type: TaskType,
|
|
statuses: TaskStatus[],
|
|
): ResultAsync<Task[], Err> {
|
|
return ResultAsync.fromPromise(
|
|
this.db
|
|
.select()
|
|
.from(task)
|
|
.where(and(eq(task.type, type), inArray(task.status, statuses)))
|
|
.orderBy(asc(task.createdAt)),
|
|
(error) =>
|
|
taskErrors.getTasksByStatusFailed(
|
|
fctx,
|
|
error instanceof Error ? error.message : String(error),
|
|
),
|
|
).map((result) => result as Task[]);
|
|
}
|
|
|
|
markTaskAsCompleted(
|
|
fctx: FlowExecCtx,
|
|
taskId: string,
|
|
result?: Record<string, any>,
|
|
): ResultAsync<Task, Err> {
|
|
return this.updateTask(fctx, taskId, {
|
|
status: TaskStatus.COMPLETED,
|
|
progress: 100,
|
|
result: result ?? null,
|
|
completedAt: new Date(),
|
|
});
|
|
}
|
|
|
|
markTaskAsFailed(
|
|
fctx: FlowExecCtx,
|
|
taskId: string,
|
|
error: any,
|
|
): ResultAsync<Task, Err> {
|
|
return this.updateTask(fctx, taskId, {
|
|
status: TaskStatus.FAILED,
|
|
error: {
|
|
code: error.code || "UNKNOWN_ERROR",
|
|
message: error.message || "Task failed",
|
|
detail: error.detail,
|
|
timestamp: new Date(),
|
|
},
|
|
completedAt: new Date(),
|
|
});
|
|
}
|
|
}
|