use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; use tokio_cron_scheduler::{Job, JobScheduler}; use uuid::Uuid; use super::nats_bridge::NatsBridge; /// Cron-based wipe schedule management. /// /// Manages wipe schedules backed by `tokio-cron-scheduler`. Each schedule /// maps a cron expression + timezone to a wipe profile. When a schedule /// fires, it creates a wipe_history record and dispatches execution to /// the WipeEngine. pub struct SchedulerService { db: sqlx::PgPool, nats: Arc, scheduler: JobScheduler, /// Maps schedule_id to job_id for removal job_handles: Arc>>, } impl SchedulerService { pub async fn new(db: sqlx::PgPool, nats: Arc) -> Result { let scheduler = JobScheduler::new() .await .context("Failed to create JobScheduler")?; Ok(Self { db, nats, scheduler, job_handles: Arc::new(Mutex::new(HashMap::new())), }) } /// Start the scheduler, loading all active schedules from the database. pub async fn start(&self) -> Result<()> { // Query all active wipe_schedules from DB let schedules: Vec<(Uuid,)> = sqlx::query_as( "SELECT id FROM wipe_schedules WHERE enabled = true", ) .fetch_all(&self.db) .await .context("Failed to query active wipe schedules")?; tracing::info!("Loading {} active wipe schedules", schedules.len()); // Register each schedule as a cron job for (schedule_id,) in schedules { if let Err(e) = self.register_wipe_schedule(schedule_id).await { tracing::error!("Failed to register schedule {}: {}", schedule_id, e); // Continue loading other schedules } } // Start the scheduler event loop self.scheduler .start() .await .context("Failed to start scheduler")?; tracing::info!("Scheduler started successfully"); Ok(()) } /// Register a wipe schedule as a cron job. pub async fn register_wipe_schedule(&self, schedule_id: Uuid) -> Result<()> { // Load WipeSchedule from DB let schedule: Option<(String, String, Uuid, Uuid)> = sqlx::query_as( "SELECT cron_expression, timezone, wipe_profile_id, license_id FROM wipe_schedules WHERE id = $1 AND enabled = true", ) .bind(schedule_id) .fetch_optional(&self.db) .await .context("Failed to query wipe schedule")?; if schedule.is_none() { anyhow::bail!("Schedule not found or disabled: {}", schedule_id); } let (cron_expression, timezone, wipe_profile_id, license_id) = schedule.unwrap(); // Clone references for closure let db = self.db.clone(); let nats = self.nats.clone(); let schedule_id_clone = schedule_id; // Create tokio-cron-scheduler job let job = Job::new_async(cron_expression.as_str(), move |_uuid, _l| { let db = db.clone(); let nats = nats.clone(); let wipe_profile_id = wipe_profile_id; let license_id = license_id; let schedule_id = schedule_id_clone; Box::pin(async move { tracing::info!( "Scheduled wipe triggered: schedule {} (profile {})", schedule_id, wipe_profile_id ); // Create wipe_history record let wipe_history_id = Uuid::new_v4(); let result = sqlx::query( "INSERT INTO wipe_history (id, license_id, wipe_profile_id, wipe_schedule_id, status, created_at) VALUES ($1, $2, $3, $4, 'pending', NOW())", ) .bind(wipe_history_id) .bind(license_id) .bind(wipe_profile_id) .bind(schedule_id) .execute(&db) .await; if let Err(e) = result { tracing::error!("Failed to create wipe_history record: {}", e); return; } // Publish wipe execution event to NATS let payload = serde_json::json!({ "wipe_history_id": wipe_history_id, "license_id": license_id, "wipe_profile_id": wipe_profile_id, "schedule_id": schedule_id, "triggered_by": "scheduler", "timestamp": Utc::now().to_rfc3339(), }); if let Err(e) = nats .publish_jetstream( &format!("corrosion.wipes.{}.execute", license_id), payload.to_string().as_bytes(), ) .await { tracing::error!( "Failed to publish wipe execution event for {}: {}", wipe_history_id, e ); } tracing::info!( "Wipe execution dispatched: {} (schedule: {})", wipe_history_id, schedule_id ); }) }) .context("Failed to create cron job")?; // Add job to scheduler let job_id = self .scheduler .add(job) .await .context("Failed to add job to scheduler")?; // Store job handle for later removal self.job_handles.lock().await.insert(schedule_id, job_id); tracing::info!( "Registered wipe schedule: {} (cron: {}, tz: {})", schedule_id, cron_expression, timezone ); Ok(()) } /// Remove a schedule from the running scheduler. pub async fn remove_schedule(&self, schedule_id: Uuid) -> Result<()> { // Look up job handle by schedule_id let job_id = { let mut handles = self.job_handles.lock().await; handles.remove(&schedule_id) }; if let Some(job_id) = job_id { // Remove from tokio-cron-scheduler self.scheduler .remove(&job_id) .await .context("Failed to remove job from scheduler")?; tracing::info!("Removed wipe schedule: {}", schedule_id); } else { tracing::warn!("Schedule not found in scheduler: {}", schedule_id); } Ok(()) } /// Calculate the next N scheduled run times for a given schedule. pub async fn get_next_runs( &self, schedule_id: Uuid, count: usize, ) -> Result>> { // Load cron_expression and timezone from DB let schedule: Option<(String, String)> = sqlx::query_as( "SELECT cron_expression, timezone FROM wipe_schedules WHERE id = $1", ) .bind(schedule_id) .fetch_optional(&self.db) .await .context("Failed to query wipe schedule")?; if schedule.is_none() { anyhow::bail!("Schedule not found: {}", schedule_id); } let (cron_expression, _timezone) = schedule.unwrap(); // Parse cron expression using cron library use cron::Schedule; use std::str::FromStr; let schedule = Schedule::from_str(&cron_expression) .with_context(|| format!("Invalid cron expression: {}", cron_expression))?; // Compute next N fire times let now = Utc::now(); let next_times: Vec> = schedule .upcoming(Utc) .take(count) .collect(); tracing::debug!( "Calculated {} next run times for schedule {}", next_times.len(), schedule_id ); Ok(next_times) } /// Register hourly stats aggregation job (runs at :05 past every hour). pub async fn register_stats_aggregation(&self) -> Result<()> { let db = self.db.clone(); let job = Job::new_async("0 5 * * * *", move |_uuid, _l| { let db = db.clone(); Box::pin(async move { tracing::info!("Running hourly stats aggregation"); // Get all active licenses let licenses: Vec<(Uuid,)> = match sqlx::query_as( "SELECT id FROM licenses WHERE status = 'active'", ) .fetch_all(&db) .await { Ok(l) => l, Err(e) => { tracing::error!("Failed to query active licenses: {}", e); return; } }; tracing::info!("Aggregating stats for {} licenses", licenses.len()); for (license_id,) in licenses { if let Err(e) = crate::db::stats::aggregate_hourly_stats(&db, license_id).await { tracing::error!( "Failed to aggregate stats for license {}: {}", license_id, e ); } } tracing::info!("Hourly stats aggregation complete"); }) }) .context("Failed to create stats aggregation job")?; self.scheduler .add(job) .await .context("Failed to add stats aggregation job to scheduler")?; tracing::info!("Registered hourly stats aggregation job (cron: 0 5 * * * *)"); Ok(()) } /// Register daily stats cleanup job (runs at 03:00 UTC). pub async fn register_stats_cleanup(&self) -> Result<()> { let db = self.db.clone(); let job = Job::new_async("0 0 3 * * *", move |_uuid, _l| { let db = db.clone(); Box::pin(async move { tracing::info!("Running daily stats cleanup"); // Delete raw stats older than 7 days match crate::db::stats::cleanup_old_stats(&db, 7).await { Ok(deleted) => { tracing::info!("Deleted {} old raw stats records", deleted); } Err(e) => { tracing::error!("Failed to cleanup old raw stats: {}", e); } } // Delete hourly stats older than 90 days match crate::db::stats::cleanup_old_hourly_stats(&db, 90).await { Ok(deleted) => { tracing::info!("Deleted {} old hourly stats records", deleted); } Err(e) => { tracing::error!("Failed to cleanup old hourly stats: {}", e); } } tracing::info!("Daily stats cleanup complete"); }) }) .context("Failed to create stats cleanup job")?; self.scheduler .add(job) .await .context("Failed to add stats cleanup job to scheduler")?; tracing::info!("Registered daily stats cleanup job (cron: 0 0 3 * * *)"); Ok(()) } }