initttt
This commit is contained in:
72
packages/logic/domains/tasks/controller.ts
Normal file
72
packages/logic/domains/tasks/controller.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { db } from "@pkg/db";
|
||||
import { FlowExecCtx } from "@core/flow.execution.context";
|
||||
import { CreateTask, TaskStatus, TaskType, UpdateTask } from "./data";
|
||||
import { TasksRepository } from "./repository";
|
||||
|
||||
export class TasksController {
|
||||
constructor(private tasksRepo: TasksRepository) {}
|
||||
|
||||
createTask(fctx: FlowExecCtx, taskData: CreateTask) {
|
||||
return this.tasksRepo.createTask(fctx, taskData);
|
||||
}
|
||||
|
||||
getTaskById(fctx: FlowExecCtx, taskId: string) {
|
||||
return this.tasksRepo.getTaskById(fctx, taskId);
|
||||
}
|
||||
|
||||
updateTask(fctx: FlowExecCtx, taskId: string, updates: UpdateTask) {
|
||||
return this.tasksRepo.updateTask(fctx, taskId, updates);
|
||||
}
|
||||
|
||||
deleteTask(fctx: FlowExecCtx, taskId: string) {
|
||||
return this.tasksRepo.deleteTask(fctx, taskId);
|
||||
}
|
||||
|
||||
getTasksByStatuses(fctx: FlowExecCtx, statuses: TaskStatus[]) {
|
||||
return this.tasksRepo.getTasksByStatuses(fctx, statuses);
|
||||
}
|
||||
|
||||
getTasksByTypeAndStatuses(
|
||||
fctx: FlowExecCtx,
|
||||
type: TaskType,
|
||||
statuses: TaskStatus[],
|
||||
) {
|
||||
return this.tasksRepo.getTasksByTypeAndStatuses(fctx, type, statuses);
|
||||
}
|
||||
|
||||
markTaskAsCompleted(
|
||||
fctx: FlowExecCtx,
|
||||
taskId: string,
|
||||
result?: Record<string, any>,
|
||||
) {
|
||||
return this.tasksRepo.markTaskAsCompleted(fctx, taskId, result);
|
||||
}
|
||||
|
||||
markTaskAsFailed(fctx: FlowExecCtx, taskId: string, error: any) {
|
||||
return this.tasksRepo.markTaskAsFailed(fctx, taskId, error);
|
||||
}
|
||||
|
||||
updateTaskProgress(fctx: FlowExecCtx, taskId: string, progress: number) {
|
||||
return this.tasksRepo.updateTask(fctx, taskId, {
|
||||
progress: Math.max(0, Math.min(100, progress)),
|
||||
});
|
||||
}
|
||||
|
||||
cancelTask(fctx: FlowExecCtx, taskId: string) {
|
||||
return this.tasksRepo.updateTask(fctx, taskId, {
|
||||
status: TaskStatus.CANCELLED,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
startTask(fctx: FlowExecCtx, taskId: string) {
|
||||
return this.tasksRepo.updateTask(fctx, taskId, {
|
||||
status: TaskStatus.RUNNING,
|
||||
startedAt: new Date(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export function getTasksController(): TasksController {
|
||||
return new TasksController(new TasksRepository(db));
|
||||
}
|
||||
71
packages/logic/domains/tasks/data.ts
Normal file
71
packages/logic/domains/tasks/data.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
import * as v from "valibot";
|
||||
|
||||
export enum TaskStatus {
|
||||
PENDING = "pending",
|
||||
RUNNING = "running",
|
||||
COMPLETED = "completed",
|
||||
FAILED = "failed",
|
||||
CANCELLED = "cancelled",
|
||||
}
|
||||
|
||||
export const taskStatusSchema = v.picklist([
|
||||
"pending",
|
||||
"running",
|
||||
"completed",
|
||||
"failed",
|
||||
"cancelled",
|
||||
]);
|
||||
export type TaskStatusType = v.InferOutput<typeof taskStatusSchema>;
|
||||
|
||||
export enum TaskType {
|
||||
APK_BUILD = "apk_build",
|
||||
}
|
||||
|
||||
export const taskTypeSchema = v.picklist(["apk_build"]);
|
||||
export type TaskTypeValue = v.InferOutput<typeof taskTypeSchema>;
|
||||
|
||||
export const taskErrorSchema = v.object({
|
||||
code: v.string(),
|
||||
message: v.string(),
|
||||
detail: v.optional(v.string()),
|
||||
timestamp: v.date(),
|
||||
});
|
||||
export type TaskError = v.InferOutput<typeof taskErrorSchema>;
|
||||
|
||||
export const taskSchema = v.object({
|
||||
id: v.string(),
|
||||
type: taskTypeSchema,
|
||||
status: taskStatusSchema,
|
||||
progress: v.pipe(v.number(), v.integer()),
|
||||
payload: v.optional(v.nullable(v.record(v.string(), v.any()))),
|
||||
result: v.optional(v.nullable(v.record(v.string(), v.any()))),
|
||||
error: v.optional(v.nullable(taskErrorSchema)),
|
||||
userId: v.string(),
|
||||
resourceId: v.string(),
|
||||
startedAt: v.optional(v.nullable(v.date())),
|
||||
completedAt: v.optional(v.nullable(v.date())),
|
||||
createdAt: v.date(),
|
||||
updatedAt: v.date(),
|
||||
});
|
||||
export type Task = v.InferOutput<typeof taskSchema>;
|
||||
|
||||
export const createTaskSchema = v.object({
|
||||
id: v.string(),
|
||||
type: taskTypeSchema,
|
||||
status: v.optional(taskStatusSchema),
|
||||
progress: v.optional(v.pipe(v.number(), v.integer())),
|
||||
payload: v.optional(v.nullable(v.record(v.string(), v.any()))),
|
||||
userId: v.string(),
|
||||
resourceId: v.string(),
|
||||
});
|
||||
export type CreateTask = v.InferOutput<typeof createTaskSchema>;
|
||||
|
||||
export const updateTaskSchema = v.object({
|
||||
status: v.optional(taskStatusSchema),
|
||||
progress: v.optional(v.pipe(v.number(), v.integer())),
|
||||
result: v.optional(v.nullable(v.record(v.string(), v.any()))),
|
||||
error: v.optional(v.nullable(taskErrorSchema)),
|
||||
startedAt: v.optional(v.nullable(v.date())),
|
||||
completedAt: v.optional(v.nullable(v.date())),
|
||||
});
|
||||
export type UpdateTask = v.InferOutput<typeof updateTaskSchema>;
|
||||
87
packages/logic/domains/tasks/errors.ts
Normal file
87
packages/logic/domains/tasks/errors.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import { FlowExecCtx } from "@/core/flow.execution.context";
|
||||
import { ERROR_CODES, type Err } from "@pkg/result";
|
||||
import { getError } from "@pkg/logger";
|
||||
|
||||
export const taskErrors = {
|
||||
dbError: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "Database operation failed",
|
||||
description: "Please try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
taskNotFound: (fctx: FlowExecCtx, taskId: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.NOT_FOUND,
|
||||
message: "Task not found",
|
||||
description: "The requested task does not exist",
|
||||
detail: `No task found with ID: ${taskId}`,
|
||||
}),
|
||||
|
||||
createTaskFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while creating task",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
getTaskFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while fetching task",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
updateTaskFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while updating task",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
deleteTaskFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while deleting task",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
getTasksFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while fetching tasks",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
getTasksByStatusFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while fetching tasks by status",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
|
||||
checkTaskExistenceFailed: (fctx: FlowExecCtx, detail: string): Err =>
|
||||
getError({
|
||||
flowId: fctx.flowId,
|
||||
code: ERROR_CODES.DATABASE_ERROR,
|
||||
message: "An error occurred while checking task existence",
|
||||
description: "Try again later",
|
||||
detail,
|
||||
}),
|
||||
};
|
||||
|
||||
163
packages/logic/domains/tasks/repository.ts
Normal file
163
packages/logic/domains/tasks/repository.ts
Normal file
@@ -0,0 +1,163 @@
|
||||
import { CreateTask, Task, TaskStatus, TaskType, UpdateTask } from "./data";
|
||||
import { ResultAsync, errAsync, okAsync } from "neverthrow";
|
||||
import { FlowExecCtx } from "@core/flow.execution.context";
|
||||
import { Database, and, asc, eq, inArray } from "@pkg/db";
|
||||
import { task } from "@pkg/db/schema";
|
||||
import { type Err } from "@pkg/result";
|
||||
import { taskErrors } from "./errors";
|
||||
import { logger } from "@pkg/logger";
|
||||
|
||||
export class TasksRepository {
|
||||
constructor(private db: Database) {}
|
||||
|
||||
createTask(fctx: FlowExecCtx, taskData: CreateTask): ResultAsync<Task, Err> {
|
||||
logger.info("Creating new task", { ...fctx, taskId: taskData.id });
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.insert(task)
|
||||
.values({
|
||||
id: taskData.id,
|
||||
type: taskData.type,
|
||||
status: taskData.status || TaskStatus.PENDING,
|
||||
progress: taskData.progress || 0,
|
||||
payload: taskData.payload ?? null,
|
||||
userId: taskData.userId,
|
||||
resourceId: taskData.resourceId,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.returning()
|
||||
.execute(),
|
||||
(error) =>
|
||||
taskErrors.createTaskFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).map((result) => result[0] as Task);
|
||||
}
|
||||
|
||||
getTaskById(fctx: FlowExecCtx, taskId: string): ResultAsync<Task, Err> {
|
||||
return ResultAsync.fromPromise(
|
||||
this.db.query.task.findFirst({
|
||||
where: eq(task.id, taskId),
|
||||
}),
|
||||
(error) =>
|
||||
taskErrors.getTaskFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).andThen((result) => {
|
||||
if (!result) {
|
||||
return errAsync(taskErrors.taskNotFound(fctx, taskId));
|
||||
}
|
||||
|
||||
return okAsync(result as Task);
|
||||
});
|
||||
}
|
||||
|
||||
updateTask(
|
||||
fctx: FlowExecCtx,
|
||||
taskId: string,
|
||||
updates: UpdateTask,
|
||||
): ResultAsync<Task, Err> {
|
||||
return this.getTaskById(fctx, taskId).andThen(() =>
|
||||
ResultAsync.fromPromise(
|
||||
this.db
|
||||
.update(task)
|
||||
.set({ ...updates, updatedAt: new Date() })
|
||||
.where(eq(task.id, taskId))
|
||||
.returning()
|
||||
.execute(),
|
||||
(error) =>
|
||||
taskErrors.updateTaskFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).andThen((updateResult) => {
|
||||
if (!updateResult[0]) {
|
||||
return errAsync(taskErrors.taskNotFound(fctx, taskId));
|
||||
}
|
||||
return okAsync(updateResult[0] as Task);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
deleteTask(fctx: FlowExecCtx, taskId: string): ResultAsync<boolean, Err> {
|
||||
return ResultAsync.fromPromise(
|
||||
this.db.delete(task).where(eq(task.id, taskId)).execute(),
|
||||
(error) =>
|
||||
taskErrors.deleteTaskFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).map(() => true);
|
||||
}
|
||||
|
||||
getTasksByStatuses(
|
||||
fctx: FlowExecCtx,
|
||||
statuses: TaskStatus[],
|
||||
): ResultAsync<Task[], Err> {
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.select()
|
||||
.from(task)
|
||||
.where(inArray(task.status, statuses))
|
||||
.orderBy(asc(task.createdAt)),
|
||||
(error) =>
|
||||
taskErrors.getTasksByStatusFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).map((result) => result as Task[]);
|
||||
}
|
||||
|
||||
getTasksByTypeAndStatuses(
|
||||
fctx: FlowExecCtx,
|
||||
type: TaskType,
|
||||
statuses: TaskStatus[],
|
||||
): ResultAsync<Task[], Err> {
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.select()
|
||||
.from(task)
|
||||
.where(and(eq(task.type, type), inArray(task.status, statuses)))
|
||||
.orderBy(asc(task.createdAt)),
|
||||
(error) =>
|
||||
taskErrors.getTasksByStatusFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).map((result) => result as Task[]);
|
||||
}
|
||||
|
||||
markTaskAsCompleted(
|
||||
fctx: FlowExecCtx,
|
||||
taskId: string,
|
||||
result?: Record<string, any>,
|
||||
): ResultAsync<Task, Err> {
|
||||
return this.updateTask(fctx, taskId, {
|
||||
status: TaskStatus.COMPLETED,
|
||||
progress: 100,
|
||||
result: result ?? null,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
markTaskAsFailed(
|
||||
fctx: FlowExecCtx,
|
||||
taskId: string,
|
||||
error: any,
|
||||
): ResultAsync<Task, Err> {
|
||||
return this.updateTask(fctx, taskId, {
|
||||
status: TaskStatus.FAILED,
|
||||
error: {
|
||||
code: error.code || "UNKNOWN_ERROR",
|
||||
message: error.message || "Task failed",
|
||||
detail: error.detail,
|
||||
timestamp: new Date(),
|
||||
},
|
||||
completedAt: new Date(),
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user