import { Injectable, NotFoundException, Logger, OnModuleInit, OnModuleDestroy, } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { LessThanOrEqual, Repository } from 'typeorm'; import { ScheduledTask } from '../../entities/scheduled-task.entity'; import { CreateTaskDto } from './dto/create-task.dto'; import { UpdateTaskDto } from './dto/update-task.dto'; import { NatsService } from '../../services/nats.service'; /** Parse a 5-field cron expression and return the next Date after `after`. */ function nextCronDate(expr: string, after: Date): Date | null { const parts = expr.trim().split(/\s+/); if (parts.length !== 5) return null; const [minuteExpr, hourExpr, domExpr, monthExpr, dowExpr] = parts; function matches(expr: string, value: number): boolean { if (expr === '*') return true; return parseInt(expr, 10) === value; } // Walk minute-by-minute up to 366 days forward to find next match. const candidate = new Date(after.getTime() + 60_000); // advance at least 1 minute candidate.setSeconds(0, 0); const limit = new Date(after.getTime() + 366 * 24 * 60 * 60 * 1000); while (candidate < limit) { const min = candidate.getUTCMinutes(); const hour = candidate.getUTCHours(); const dom = candidate.getUTCDate(); const month = candidate.getUTCMonth() + 1; // 1-12 const dow = candidate.getUTCDay(); // 0=Sun if ( matches(minuteExpr, min) && matches(hourExpr, hour) && matches(domExpr, dom) && matches(monthExpr, month) && matches(dowExpr, dow) ) { return candidate; } candidate.setTime(candidate.getTime() + 60_000); } return null; } @Injectable() export class SchedulesService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(SchedulesService.name); private executorInterval: ReturnType | null = null; constructor( @InjectRepository(ScheduledTask) private taskRepository: Repository, private readonly natsService: NatsService, ) {} // --------------------------------------------------------------------------- // Lifecycle hooks // --------------------------------------------------------------------------- onModuleInit() { // Bootstrap: calculate next_run for any task that has none. this.bootstrapNextRuns().catch(err => this.logger.error('Failed to bootstrap next_run values', err), ); // Poll every 60 seconds for due tasks. this.executorInterval = setInterval(() => { this.executeDueTasks().catch(err => this.logger.error('Schedule executor error', err), ); }, 60_000); this.logger.log('Schedule executor started (60s polling interval)'); } onModuleDestroy() { if (this.executorInterval) { clearInterval(this.executorInterval); this.executorInterval = null; } } // --------------------------------------------------------------------------- // Execution engine // --------------------------------------------------------------------------- /** On startup, stamp next_run on tasks that don't have one yet. */ private async bootstrapNextRuns(): Promise { const tasks = await this.taskRepository.find({ where: { is_active: true, next_run: null as any }, }); for (const task of tasks) { const next = nextCronDate(task.cron_expression, new Date()); if (next) { task.next_run = next; await this.taskRepository.save(task); } } if (tasks.length > 0) { this.logger.log(`Bootstrapped next_run for ${tasks.length} task(s)`); } } /** Find all active tasks whose next_run <= now and fire them. */ private async executeDueTasks(): Promise { const now = new Date(); const dueTasks = await this.taskRepository.find({ where: { is_active: true, next_run: LessThanOrEqual(now), }, }); if (dueTasks.length === 0) return; this.logger.log(`Executing ${dueTasks.length} due task(s)`); for (const task of dueTasks) { try { await this.executeTask(task); // Advance next_run. const next = nextCronDate(task.cron_expression, now); task.next_run = next ?? null; await this.taskRepository.save(task); } catch (err) { this.logger.error( `Failed to execute task ${task.id} (${task.task_name})`, (err as Error).stack, ); // Still advance next_run so we don't hammer on a broken task. const next = nextCronDate(task.cron_expression, now); task.next_run = next ?? null; await this.taskRepository.save(task); } } } /** Dispatch a single task via NATS based on its task_type. */ private async executeTask(task: ScheduledTask): Promise { const { license_id, task_type, task_name, task_config } = task; this.logger.log( `Firing task: [${task_type}] "${task_name}" for license ${license_id}`, ); switch (task_type) { case 'restart': await this.natsService.sendServerCommand(license_id, 'restart', { source: 'scheduler', task_id: task.id, }); break; case 'announcement': { const message = (task_config?.message as string) ?? 'Scheduled announcement'; await this.natsService.publish(`corrosion.${license_id}.cmd.server`, { action: 'command', command: `say ${message}`, source: 'scheduler', task_id: task.id, timestamp: new Date().toISOString(), }); break; } case 'command': { const command = (task_config?.command as string) ?? ''; if (!command) { this.logger.warn(`Task ${task.id} has no command configured — skipping`); return; } await this.natsService.publish(`corrosion.${license_id}.cmd.server`, { action: 'command', command, source: 'scheduler', task_id: task.id, timestamp: new Date().toISOString(), }); break; } case 'plugin_reload': { const plugin_name = (task_config?.plugin_name as string) ?? ''; await this.natsService.publish(`corrosion.${license_id}.cmd.plugin`, { action: 'reload', plugin_name, source: 'scheduler', task_id: task.id, timestamp: new Date().toISOString(), }); break; } default: this.logger.warn(`Unknown task_type "${task_type}" for task ${task.id}`); } } // --------------------------------------------------------------------------- // CRUD // --------------------------------------------------------------------------- async getTasks(licenseId: string): Promise { return await this.taskRepository.find({ where: { license_id: licenseId }, order: { created_at: 'DESC' }, }); } async createTask( licenseId: string, dto: CreateTaskDto, ): Promise { const timezone = dto.timezone || 'UTC'; const now = new Date(); const next = nextCronDate(dto.cron_expression, now); const task = this.taskRepository.create({ license_id: licenseId, task_type: dto.task_type, task_name: dto.task_name, cron_expression: dto.cron_expression, timezone, task_config: dto.task_config || {}, is_active: true, next_run: next ?? null, created_at: now, }); return await this.taskRepository.save(task); } async updateTask( licenseId: string, taskId: string, dto: UpdateTaskDto, ): Promise { const task = await this.taskRepository.findOne({ where: { id: taskId, license_id: licenseId, }, }); if (!task) { throw new NotFoundException(`Scheduled task ${taskId} not found`); } Object.assign(task, dto); // Recalculate next_run if the cron expression changed. if (dto.cron_expression) { const next = nextCronDate(dto.cron_expression, new Date()); task.next_run = next ?? null; } return await this.taskRepository.save(task); } async deleteTask(licenseId: string, taskId: string) { const task = await this.taskRepository.findOne({ where: { id: taskId, license_id: licenseId, }, }); if (!task) { throw new NotFoundException(`Scheduled task ${taskId} not found`); } await this.taskRepository.delete(taskId); return { deleted: true }; } async toggleTask(licenseId: string, taskId: string, enabled: boolean) { const task = await this.taskRepository.findOne({ where: { id: taskId, license_id: licenseId, }, }); if (!task) { throw new NotFoundException(`Scheduled task ${taskId} not found`); } task.is_active = enabled; // When re-enabling, calculate next_run if it's missing. if (enabled && !task.next_run) { const next = nextCronDate(task.cron_expression, new Date()); task.next_run = next ?? null; } return await this.taskRepository.save(task); } }