Implements all remaining backend infrastructure for Corrosion platform.
Backend Services (5 new):
- license.rs: License validation, activation, check-in with NATS token generation
- map_manager.rs: Map upload/rotation with SHA-256 checksums, circular advancement
- health_checker.rs: Post-wipe verification with retry loop and backoff
- backup_manager.rs: Tar.gz backups with retention policy (last 10), recursive upload
- scheduler.rs: Tokio-cron integration for scheduled wipes with NATS events
WipeEngine Orchestration (wipe_engine.rs):
- execute_wipe(): Master orchestrator managing full lifecycle
- execute_pre_wipe(): Countdown warnings, backups, player kicks
- execute_wipe_actions(): Map/plugin deletion, seed rotation, Steam updates
- execute_post_wipe_verification(): Health checks with restart attempts
- execute_rollback(): Failure recovery with backup restore
- JSONB execution logs, NATS status events, service composition pattern
WebSocket/NATS Bridge (ws.rs):
- JWT authentication via query parameter
- License-scoped NATS subscriptions (corrosion.{license_id}.*)
- Bi-directional: NATS→WebSocket event forwarding, WebSocket→NATS publishing
- Axum 0.8 with ws feature, auto Ping/Pong handling
Panel Adapter Fixes:
- AMP/Pterodactyl/Companion adapters fully wired
- RCON command execution, file operations, Steam update triggers
Fixes:
- Added ws feature to Axum dependency
- Fixed Message::Text() type conversions (String→Utf8Bytes via .into())
- Fixed BackupInfo FromRow derive
- Fixed recursive async with Box::pin pattern
- Fixed async JobScheduler::new() constructor
- Removed manual WebSocket Ping/Pong handler
Compilation: 0 errors, 327 warnings (unused vars/functions)
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
322 lines
10 KiB
Rust
322 lines
10 KiB
Rust
use anyhow::{Context, Result};
|
|
use chrono::{DateTime, Utc};
|
|
use serde::Serialize;
|
|
use uuid::Uuid;
|
|
|
|
use super::panel_adapter::PanelAdapter;
|
|
|
|
/// Metadata for a stored backup.
|
|
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
|
|
pub struct BackupInfo {
|
|
pub id: Uuid,
|
|
pub license_id: Uuid,
|
|
pub wipe_history_id: Option<Uuid>,
|
|
pub storage_path: String,
|
|
pub size_bytes: i64,
|
|
pub created_at: DateTime<Utc>,
|
|
}
|
|
|
|
/// Pre-wipe backup and rollback management.
|
|
///
|
|
/// Creates snapshots of server state (map, save data, plugin data,
|
|
/// config files) before a wipe executes. Backups are used by the
|
|
/// wipe engine's rollback mechanism if post-wipe verification fails.
|
|
pub struct BackupManager {
|
|
db: sqlx::PgPool,
|
|
storage_base_path: String,
|
|
}
|
|
|
|
impl BackupManager {
|
|
pub fn new(db: sqlx::PgPool, storage_base_path: String) -> Self {
|
|
Self {
|
|
db,
|
|
storage_base_path,
|
|
}
|
|
}
|
|
|
|
/// Create a pre-wipe backup of the server's current state.
|
|
///
|
|
/// Captures: map/save files, plugin data directories, server.cfg,
|
|
/// and any other files configured for backup in the wipe profile.
|
|
/// Returns a backup reference ID stored in wipe_history.
|
|
pub async fn create_backup(
|
|
&self,
|
|
adapter: &dyn PanelAdapter,
|
|
server_id: &str,
|
|
license_id: Uuid,
|
|
wipe_history_id: Uuid,
|
|
) -> Result<String> {
|
|
// Generate backup ID
|
|
let backup_id = Uuid::new_v4();
|
|
|
|
// Define files to backup
|
|
let files_to_backup = vec![
|
|
"/server/rust/server.cfg",
|
|
"/server/rust/server/procedural/",
|
|
"/server/rust/oxide/data/",
|
|
"/server/rust/oxide/config/",
|
|
];
|
|
|
|
// Create temporary directory for collecting files
|
|
let temp_dir = std::env::temp_dir().join(format!("backup_{}", backup_id));
|
|
tokio::fs::create_dir_all(&temp_dir)
|
|
.await
|
|
.context("Failed to create temporary backup directory")?;
|
|
|
|
// Download files from server via panel adapter
|
|
for file_path in &files_to_backup {
|
|
match adapter.get_file(server_id, file_path).await {
|
|
Ok(data) => {
|
|
// Create local path preserving structure
|
|
let local_path = temp_dir.join(file_path.trim_start_matches('/'));
|
|
if let Some(parent) = local_path.parent() {
|
|
tokio::fs::create_dir_all(parent)
|
|
.await
|
|
.context("Failed to create backup directory structure")?;
|
|
}
|
|
tokio::fs::write(&local_path, data)
|
|
.await
|
|
.with_context(|| format!("Failed to write backup file: {}", file_path))?;
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!("Failed to backup file {}: {}", file_path, e);
|
|
// Continue with other files
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create tar.gz archive
|
|
let archive_path = format!(
|
|
"{}/{}/backup_{}.tar.gz",
|
|
self.storage_base_path, license_id, backup_id
|
|
);
|
|
|
|
// Ensure parent directory exists
|
|
if let Some(parent) = std::path::Path::new(&archive_path).parent() {
|
|
tokio::fs::create_dir_all(parent)
|
|
.await
|
|
.context("Failed to create backup storage directory")?;
|
|
}
|
|
|
|
// Create archive using tar command
|
|
let output = tokio::process::Command::new("tar")
|
|
.arg("-czf")
|
|
.arg(&archive_path)
|
|
.arg("-C")
|
|
.arg(&temp_dir)
|
|
.arg(".")
|
|
.output()
|
|
.await
|
|
.context("Failed to execute tar command")?;
|
|
|
|
if !output.status.success() {
|
|
anyhow::bail!(
|
|
"tar failed: {}",
|
|
String::from_utf8_lossy(&output.stderr)
|
|
);
|
|
}
|
|
|
|
// Get archive size
|
|
let metadata = tokio::fs::metadata(&archive_path)
|
|
.await
|
|
.context("Failed to read archive metadata")?;
|
|
let size_bytes = metadata.len() as i64;
|
|
|
|
// Clean up temp directory
|
|
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
|
|
|
|
// Insert backup record in DB
|
|
sqlx::query(
|
|
"INSERT INTO backups (id, license_id, wipe_history_id, storage_path, size_bytes, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, NOW())",
|
|
)
|
|
.bind(backup_id)
|
|
.bind(license_id)
|
|
.bind(wipe_history_id)
|
|
.bind(&archive_path)
|
|
.bind(size_bytes)
|
|
.execute(&self.db)
|
|
.await
|
|
.context("Failed to insert backup record")?;
|
|
|
|
tracing::info!(
|
|
"Backup created: {} ({} bytes) for wipe {}",
|
|
backup_id,
|
|
size_bytes,
|
|
wipe_history_id
|
|
);
|
|
|
|
Ok(backup_id.to_string())
|
|
}
|
|
|
|
/// Restore a previously created backup to the server.
|
|
///
|
|
/// Used during rollback when post-wipe verification fails.
|
|
pub async fn restore_backup(
|
|
&self,
|
|
adapter: &dyn PanelAdapter,
|
|
server_id: &str,
|
|
backup_reference: &str,
|
|
) -> Result<()> {
|
|
// Parse backup ID
|
|
let backup_id = Uuid::parse_str(backup_reference)
|
|
.context("Invalid backup reference format")?;
|
|
|
|
// Load backup record from DB
|
|
let backup: Option<(String,)> = sqlx::query_as(
|
|
"SELECT storage_path FROM backups WHERE id = $1",
|
|
)
|
|
.bind(backup_id)
|
|
.fetch_optional(&self.db)
|
|
.await
|
|
.context("Failed to query backup")?;
|
|
|
|
if let Some((storage_path,)) = backup {
|
|
// Create temporary directory for extraction
|
|
let temp_dir = std::env::temp_dir().join(format!("restore_{}", backup_id));
|
|
tokio::fs::create_dir_all(&temp_dir)
|
|
.await
|
|
.context("Failed to create temporary restore directory")?;
|
|
|
|
// Extract archive
|
|
let output = tokio::process::Command::new("tar")
|
|
.arg("-xzf")
|
|
.arg(&storage_path)
|
|
.arg("-C")
|
|
.arg(&temp_dir)
|
|
.output()
|
|
.await
|
|
.context("Failed to execute tar extraction")?;
|
|
|
|
if !output.status.success() {
|
|
anyhow::bail!(
|
|
"tar extraction failed: {}",
|
|
String::from_utf8_lossy(&output.stderr)
|
|
);
|
|
}
|
|
|
|
// Upload extracted files back to server
|
|
self.upload_directory_recursive(adapter, server_id, &temp_dir, "/")
|
|
.await?;
|
|
|
|
// Clean up temp directory
|
|
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
|
|
|
|
tracing::info!("Backup restored: {}", backup_reference);
|
|
} else {
|
|
anyhow::bail!("Backup not found: {}", backup_reference);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Recursively upload directory contents to server.
|
|
fn upload_directory_recursive<'a>(
|
|
&'a self,
|
|
adapter: &'a dyn PanelAdapter,
|
|
server_id: &'a str,
|
|
local_dir: &'a std::path::Path,
|
|
remote_base: &'a str,
|
|
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>> {
|
|
Box::pin(async move {
|
|
let mut entries = tokio::fs::read_dir(local_dir)
|
|
.await
|
|
.context("Failed to read local directory")?;
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let path = entry.path();
|
|
let file_name = entry.file_name();
|
|
let remote_path = format!(
|
|
"{}/{}",
|
|
remote_base.trim_end_matches('/'),
|
|
file_name.to_string_lossy()
|
|
);
|
|
|
|
if path.is_dir() {
|
|
// Recurse into subdirectory
|
|
self.upload_directory_recursive(adapter, server_id, &path, &remote_path)
|
|
.await?;
|
|
} else {
|
|
// Upload file
|
|
let data = tokio::fs::read(&path)
|
|
.await
|
|
.with_context(|| format!("Failed to read file: {:?}", path))?;
|
|
|
|
adapter
|
|
.put_file(server_id, &remote_path, &data)
|
|
.await
|
|
.with_context(|| format!("Failed to upload file: {}", remote_path))?;
|
|
|
|
tracing::debug!("Restored file: {}", remote_path);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// List all backups for a license, ordered by creation date (newest first).
|
|
pub async fn list_backups(&self, license_id: Uuid) -> Result<Vec<BackupInfo>> {
|
|
let backups: Vec<BackupInfo> = sqlx::query_as(
|
|
"SELECT id, license_id, wipe_history_id, storage_path, size_bytes, created_at
|
|
FROM backups
|
|
WHERE license_id = $1
|
|
ORDER BY created_at DESC",
|
|
)
|
|
.bind(license_id)
|
|
.fetch_all(&self.db)
|
|
.await
|
|
.context("Failed to query backups")?;
|
|
|
|
Ok(backups)
|
|
}
|
|
|
|
/// Clean up old backups beyond the configured retention period/count.
|
|
pub async fn cleanup_old_backups(&self, license_id: Uuid) -> Result<u32> {
|
|
// Load retention config (default: keep last 10 backups)
|
|
let retention_count = 10;
|
|
|
|
// Query backups older than retention threshold
|
|
let old_backups: Vec<(Uuid, String)> = sqlx::query_as(
|
|
"SELECT id, storage_path
|
|
FROM backups
|
|
WHERE license_id = $1
|
|
ORDER BY created_at DESC
|
|
OFFSET $2",
|
|
)
|
|
.bind(license_id)
|
|
.bind(retention_count)
|
|
.fetch_all(&self.db)
|
|
.await
|
|
.context("Failed to query old backups")?;
|
|
|
|
let mut deleted_count = 0;
|
|
|
|
for (backup_id, storage_path) in old_backups {
|
|
// Delete backup file from storage
|
|
if tokio::fs::remove_file(&storage_path).await.is_err() {
|
|
tracing::warn!("Failed to delete backup file: {}", storage_path);
|
|
}
|
|
|
|
// Delete backup record from DB
|
|
sqlx::query("DELETE FROM backups WHERE id = $1")
|
|
.bind(backup_id)
|
|
.execute(&self.db)
|
|
.await
|
|
.context("Failed to delete backup record")?;
|
|
|
|
deleted_count += 1;
|
|
}
|
|
|
|
if deleted_count > 0 {
|
|
tracing::info!(
|
|
"Cleaned up {} old backups for license {}",
|
|
deleted_count,
|
|
license_id
|
|
);
|
|
}
|
|
|
|
Ok(deleted_count)
|
|
}
|
|
}
|