use anyhow::{Context, Result}; use sqlx::PgPool; use std::sync::Arc; use uuid::Uuid; use super::amp_adapter::AmpAdapter; use super::encryption; use super::nats_bridge::NatsBridge; use super::pterodactyl_adapter::PterodactylAdapter; /// Default timeout for module installation operations (60 seconds). const MODULE_INSTALL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); /// Module installation orchestrator. /// /// Handles the full lifecycle of module deployment to game servers: /// 1. Verify module is purchased by the license /// 2. Fetch module metadata (plugin file URL) /// 3. Determine server connection type (AMP, Pterodactyl, Companion) /// 4. Dispatch installation command to appropriate adapter /// 5. Update installation status in database pub struct ModuleInstaller { db: PgPool, nats: Arc, encryption_key: String, } impl ModuleInstaller { pub fn new(db: PgPool, nats: Arc, encryption_key: String) -> Self { Self { db, nats, encryption_key } } /// Orchestrate module installation for a license. /// /// This is the main entry point. Returns immediately after validating /// purchase and dispatching installation. The actual installation happens /// asynchronously and status is tracked in the database. pub async fn install_module( &self, license_id: Uuid, module_id: Uuid, ) -> Result<()> { // 1. Verify module is purchased let purchased = self.verify_module_purchase(license_id, module_id).await?; if !purchased { anyhow::bail!("Module not purchased by this license"); } // 2. Get module metadata let module = self.get_module_metadata(module_id).await?; // 3. Get server connection info let connection = self.get_server_connection(license_id).await?; // 4. Create or update installation record (status = installing) self.upsert_installation_status(license_id, module_id, "installing", None) .await?; // 5. Dispatch installation based on connection type let result = match connection.connection_type.as_str() { "amp" => { self.install_via_amp( &connection, &module.plugin_file_url, &module.slug, ) .await } "pterodactyl" => { self.install_via_pterodactyl( &connection, &module.plugin_file_url, &module.slug, ) .await } "bare_metal" => { self.install_via_companion( license_id, &module.plugin_file_url, &module.slug, ) .await } _ => anyhow::bail!("Unknown connection type: {}", connection.connection_type), }; // 6. Update status based on result match result { Ok(_) => { self.upsert_installation_status( license_id, module_id, "installed", None, ) .await?; tracing::info!( "Module {} installed successfully for license {}", module.slug, license_id ); } Err(e) => { let error_msg = e.to_string(); self.upsert_installation_status( license_id, module_id, "failed", Some(&error_msg), ) .await?; tracing::error!( "Module {} installation failed for license {}: {}", module.slug, license_id, error_msg ); return Err(e); } } Ok(()) } /// Get installation status for a module. pub async fn get_installation_status( &self, license_id: Uuid, module_id: Uuid, ) -> Result> { let status = sqlx::query_as::<_, ModuleInstallationStatus>( "SELECT status, installed_at, error_message \ FROM module_installations \ WHERE license_id = $1 AND module_id = $2", ) .bind(license_id) .bind(module_id) .fetch_optional(&self.db) .await .context("Failed to fetch installation status")?; Ok(status) } // ──────────────────────────────────────────────────────────── // Private implementation methods // ──────────────────────────────────────────────────────────── /// Verify that the module has been purchased by this license. async fn verify_module_purchase( &self, license_id: Uuid, module_id: Uuid, ) -> Result { let purchased: (bool,) = sqlx::query_as( "SELECT EXISTS(SELECT 1 FROM module_purchases \ WHERE license_id = $1 AND module_id = $2)", ) .bind(license_id) .bind(module_id) .fetch_one(&self.db) .await .context("Failed to verify module purchase")?; Ok(purchased.0) } /// Fetch module metadata from the database. async fn get_module_metadata(&self, module_id: Uuid) -> Result { let module = sqlx::query_as::<_, ModuleMetadata>( "SELECT slug, plugin_file_url FROM modules WHERE id = $1", ) .bind(module_id) .fetch_optional(&self.db) .await .context("Failed to fetch module metadata")? .ok_or_else(|| anyhow::anyhow!("Module not found"))?; if module.plugin_file_url.is_none() { anyhow::bail!("Module does not have a plugin file URL"); } Ok(module) } /// Get server connection details for a license. async fn get_server_connection(&self, license_id: Uuid) -> Result { let connection = sqlx::query_as::<_, ServerConnectionInfo>( "SELECT connection_type, panel_api_endpoint, panel_api_key_encrypted, \ panel_server_identifier \ FROM server_connections \ WHERE license_id = $1", ) .bind(license_id) .fetch_optional(&self.db) .await .context("Failed to fetch server connection")? .ok_or_else(|| anyhow::anyhow!("Server connection not found"))?; Ok(connection) } /// Create or update installation status record. async fn upsert_installation_status( &self, license_id: Uuid, module_id: Uuid, status: &str, error_message: Option<&str>, ) -> Result<()> { let now = if status == "installed" { Some(chrono::Utc::now()) } else { None }; sqlx::query( "INSERT INTO module_installations (license_id, module_id, status, installed_at, error_message) \ VALUES ($1, $2, $3, $4, $5) \ ON CONFLICT (license_id, module_id) \ DO UPDATE SET status = $3, installed_at = $4, error_message = $5", ) .bind(license_id) .bind(module_id) .bind(status) .bind(now) .bind(error_message) .execute(&self.db) .await .context("Failed to update installation status")?; Ok(()) } /// Install via AMP panel adapter. async fn install_via_amp( &self, connection: &ServerConnectionInfo, plugin_url: &Option, module_slug: &str, ) -> Result<()> { let api_endpoint = connection .panel_api_endpoint .as_ref() .ok_or_else(|| anyhow::anyhow!("AMP endpoint not configured"))?; let encrypted_key = connection .panel_api_key_encrypted .as_ref() .ok_or_else(|| anyhow::anyhow!("AMP API key not configured"))?; // Decrypt API key let api_key = encryption::decrypt(encrypted_key, &self.encryption_key) .context("Failed to decrypt AMP API key")?; // Extract credentials (format: "username:password") let parts: Vec<&str> = api_key.split(':').collect(); if parts.len() != 2 { anyhow::bail!("Invalid AMP credentials format"); } let adapter = AmpAdapter::new( api_endpoint.clone(), parts[0].to_string(), parts[1].to_string(), ); // Download plugin file let plugin_data = self .download_plugin_file(plugin_url.as_ref().unwrap()) .await?; // Determine filename from slug let filename = format!("{}.cs", module_slug.replace('-', "")); // Upload to oxide/plugins/ directory let target_path = format!("oxide/plugins/{}", filename); let server_id = connection .panel_server_identifier .as_ref() .ok_or_else(|| anyhow::anyhow!("Panel server ID not configured"))?; adapter .put_file(server_id, &target_path, &plugin_data) .await .context("Failed to upload plugin to AMP server")?; // Reload plugins via console command adapter .send_command(server_id, "oxide.reload *") .await .context("Failed to reload plugins")?; Ok(()) } /// Install via Pterodactyl panel adapter. async fn install_via_pterodactyl( &self, connection: &ServerConnectionInfo, plugin_url: &Option, module_slug: &str, ) -> Result<()> { let api_endpoint = connection .panel_api_endpoint .as_ref() .ok_or_else(|| anyhow::anyhow!("Pterodactyl endpoint not configured"))?; let encrypted_key = connection .panel_api_key_encrypted .as_ref() .ok_or_else(|| anyhow::anyhow!("Pterodactyl API key not configured"))?; // Decrypt API key let api_key = encryption::decrypt(encrypted_key, &self.encryption_key) .context("Failed to decrypt Pterodactyl API key")?; let adapter = PterodactylAdapter::new(api_endpoint.clone(), api_key); // Download plugin file let plugin_data = self .download_plugin_file(plugin_url.as_ref().unwrap()) .await?; // Determine filename from slug let filename = format!("{}.cs", module_slug.replace('-', "")); // Upload to oxide/plugins/ directory let target_path = format!("oxide/plugins/{}", filename); let server_id = connection .panel_server_identifier .as_ref() .ok_or_else(|| anyhow::anyhow!("Panel server ID not configured"))?; adapter .put_file(server_id, &target_path, &plugin_data) .await .context("Failed to upload plugin to Pterodactyl server")?; // Reload plugins via console command adapter .send_command(server_id, "oxide.reload *") .await .context("Failed to reload plugins")?; Ok(()) } /// Install via companion agent (bare metal). async fn install_via_companion( &self, license_id: Uuid, plugin_url: &Option, module_slug: &str, ) -> Result<()> { let filename = format!("{}.cs", module_slug.replace('-', "")); let target_path = format!("oxide/plugins/{}", filename); // Build NATS command payload #[derive(serde::Serialize)] struct ModuleInstallCommand { module_id: String, download_url: String, filename: String, target_path: String, } let cmd = ModuleInstallCommand { module_id: module_slug.to_string(), download_url: plugin_url.as_ref().unwrap().clone(), filename, target_path, }; // Publish to NATS subject let subject = format!("corrosion.{}.cmd.module.install", license_id); self.nats .request_json::( &subject, &cmd, MODULE_INSTALL_TIMEOUT, ) .await .context("Companion agent module install request timed out or failed") .and_then(|result| { if result.success { Ok(()) } else { anyhow::bail!("Companion agent reported failure: {}", result.error.unwrap_or_default()) } }) } /// Download plugin file from a URL. async fn download_plugin_file(&self, url: &str) -> Result> { let response = reqwest::get(url) .await .context("Failed to download plugin file")?; if !response.status().is_success() { anyhow::bail!( "Plugin download failed with status: {}", response.status() ); } let bytes = response .bytes() .await .context("Failed to read plugin file bytes")?; Ok(bytes.to_vec()) } } // ──────────────────────────────────────────────────────────── // DTOs // ──────────────────────────────────────────────────────────── #[derive(Debug, sqlx::FromRow)] struct ModuleMetadata { slug: String, plugin_file_url: Option, } #[derive(Debug, sqlx::FromRow)] struct ServerConnectionInfo { connection_type: String, panel_api_endpoint: Option, panel_api_key_encrypted: Option, panel_server_identifier: Option, } #[derive(Debug, sqlx::FromRow, serde::Serialize)] pub struct ModuleInstallationStatus { pub status: String, pub installed_at: Option>, pub error_message: Option, } #[derive(Debug, serde::Deserialize)] struct ModuleInstallResult { success: bool, error: Option, }