Files
corrosion-admin-panel/companion-agent/internal/app/daemon.go
Vantz Stockwell eb57c51a24
All checks were successful
Test Asgard Runner / test (push) Successful in 3s
feat: Add WebSocket RCON client to companion agent
Wire gorilla/websocket into the Go companion agent to send arbitrary
console commands (e.g. oxide.reload BetterLoot) to the Rust Dedicated
Server's WebRCON endpoint. Adds RCON_PORT and RCON_PASSWORD env vars,
a new "command" action on the existing cmd.server NATS subject, and
the internal/rcon package that handles the JSON-over-WebSocket protocol.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 00:16:47 -05:00

540 lines
15 KiB
Go

package app
import (
"context"
"encoding/json"
"fmt"
"log"
"runtime"
"time"
"github.com/nats-io/nats.go"
"github.com/vigilcyber/corrosion-companion/internal/deploy"
"github.com/vigilcyber/corrosion-companion/internal/filemanager"
"github.com/vigilcyber/corrosion-companion/internal/files"
"github.com/vigilcyber/corrosion-companion/internal/process"
"github.com/vigilcyber/corrosion-companion/internal/rcon"
"github.com/vigilcyber/corrosion-companion/internal/update"
)
// DaemonConfig holds configuration for the daemon
type DaemonConfig struct {
LicenseID string
HeartbeatInterval time.Duration
SteamCMDPath string
GameServerPath string
GameServerArgs string
Version string
InstallDir string
RconPort int
RconPassword string
}
// Daemon manages the companion agent's main operations
type Daemon struct {
nc *nats.Conn
cfg *DaemonConfig
gameServer *process.GameServer
fileOps *files.Operations
fm *filemanager.FileManager
updater *update.Updater
deployer *deploy.Deployer
subscriptions []*nats.Subscription
}
// HeartbeatPayload represents the data sent in heartbeat messages
type HeartbeatPayload struct {
Timestamp string `json:"timestamp"`
Status string `json:"status"`
ServerStatus string `json:"server_status"`
UptimeSeconds int64 `json:"uptime_seconds"`
DiskFreeMB int64 `json:"disk_free_mb"`
CPUPercent float64 `json:"cpu_percent"`
LastUpdate string `json:"last_update"`
PlayerCount int `json:"player_count"`
Version string `json:"version"`
OS string `json:"os"`
Arch string `json:"arch"`
ServerInstalled bool `json:"server_installed"`
}
// gameServerAdapter wraps process.GameServer to satisfy deploy.GameServerStarter
type gameServerAdapter struct {
gs *process.GameServer
cfg *DaemonConfig
}
func (a *gameServerAdapter) Start() error {
return a.gs.Start()
}
func (a *gameServerAdapter) UpdatePath(path string) {
a.cfg.GameServerPath = path
// Recreate game server with new path
*a.gs = *process.NewGameServer(path, a.cfg.GameServerArgs)
}
// NewDaemon creates a new daemon instance
func NewDaemon(nc *nats.Conn, cfg *DaemonConfig) (*Daemon, error) {
gameServer := process.NewGameServer(cfg.GameServerPath, cfg.GameServerArgs)
fileOps := files.NewOperations()
fm := filemanager.New(cfg.InstallDir)
updater := update.NewUpdater(cfg.Version)
adapter := &gameServerAdapter{gs: gameServer, cfg: cfg}
deployer := deploy.NewDeployer(nc, cfg.LicenseID, cfg.InstallDir, adapter)
d := &Daemon{
nc: nc,
cfg: cfg,
gameServer: gameServer,
fileOps: fileOps,
fm: fm,
updater: updater,
deployer: deployer,
}
return d, nil
}
// Run starts the daemon and blocks until context is cancelled
func (d *Daemon) Run(ctx context.Context) error {
log.Println("Starting daemon subscriptions...")
// Subscribe to server control commands
if err := d.subscribeServerCommands(); err != nil {
return fmt.Errorf("failed to subscribe to server commands: %w", err)
}
// Subscribe to file operations
if err := d.subscribeFileOperations(); err != nil {
return fmt.Errorf("failed to subscribe to file operations: %w", err)
}
// Subscribe to SteamCMD update commands
if err := d.subscribeSteamUpdate(); err != nil {
return fmt.Errorf("failed to subscribe to steam updates: %w", err)
}
// Subscribe to self-update commands
if err := d.subscribeSelfUpdate(); err != nil {
return fmt.Errorf("failed to subscribe to self-update: %w", err)
}
// Subscribe to deploy commands
if err := d.subscribeDeployCommand(); err != nil {
return fmt.Errorf("failed to subscribe to deploy commands: %w", err)
}
// Subscribe to file manager commands (VueFinder-compatible request-reply)
if err := d.subscribeFileManager(); err != nil {
return fmt.Errorf("failed to subscribe to file manager commands: %w", err)
}
log.Println("All subscriptions active")
// Start heartbeat ticker
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
defer ticker.Stop()
// Send initial heartbeat immediately
d.publishHeartbeat()
// Main event loop
for {
select {
case <-ctx.Done():
log.Println("Shutdown signal received, cleaning up...")
d.cleanup()
return nil
case <-ticker.C:
d.publishHeartbeat()
}
}
}
// subscribeServerCommands subscribes to server process control commands
func (d *Daemon) subscribeServerCommands() error {
subject := fmt.Sprintf("corrosion.%s.cmd.server", d.cfg.LicenseID)
sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) {
var cmd struct {
Action string `json:"action"`
Command string `json:"command"`
}
if err := json.Unmarshal(msg.Data, &cmd); err != nil {
log.Printf("Failed to parse server command: %v", err)
d.respondError(msg, "invalid_command", err.Error())
return
}
log.Printf("Received server command: %s", cmd.Action)
var err error
switch cmd.Action {
case "start":
err = d.gameServer.Start()
case "stop":
err = d.gameServer.Stop()
case "restart":
err = d.gameServer.Restart()
case "command":
if cmd.Command == "" {
d.respondError(msg, "invalid_command", "command field is required")
return
}
result, rconErr := rcon.SendCommand(d.cfg.RconPort, d.cfg.RconPassword, cmd.Command)
if rconErr != nil {
log.Printf("RCON command failed: %v", rconErr)
d.respondError(msg, "rcon_failed", rconErr.Error())
} else {
d.respondSuccess(msg, map[string]interface{}{
"action": "command",
"command": cmd.Command,
"response": result,
"status": "success",
})
}
return
default:
err = fmt.Errorf("unknown action: %s", cmd.Action)
}
if err != nil {
log.Printf("Server command failed: %v", err)
d.respondError(msg, "command_failed", err.Error())
} else {
d.respondSuccess(msg, map[string]interface{}{
"action": cmd.Action,
"status": "success",
})
}
})
if err != nil {
return err
}
d.subscriptions = append(d.subscriptions, sub)
log.Printf("Subscribed to: %s", subject)
return nil
}
// subscribeFileOperations subscribes to file operation commands
func (d *Daemon) subscribeFileOperations() error {
subjects := []string{
fmt.Sprintf("corrosion.%s.files.get", d.cfg.LicenseID),
fmt.Sprintf("corrosion.%s.files.put", d.cfg.LicenseID),
fmt.Sprintf("corrosion.%s.files.delete", d.cfg.LicenseID),
fmt.Sprintf("corrosion.%s.files.list", d.cfg.LicenseID),
}
for _, subject := range subjects {
sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) {
d.handleFileOperation(msg)
})
if err != nil {
return err
}
d.subscriptions = append(d.subscriptions, sub)
log.Printf("Subscribed to: %s", subject)
}
return nil
}
// subscribeSteamUpdate subscribes to SteamCMD update commands
func (d *Daemon) subscribeSteamUpdate() error {
subject := fmt.Sprintf("corrosion.%s.update.steam", d.cfg.LicenseID)
sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) {
var cmd struct {
Validate bool `json:"validate"`
}
if err := json.Unmarshal(msg.Data, &cmd); err != nil {
log.Printf("Failed to parse steam update command: %v", err)
d.respondError(msg, "invalid_command", err.Error())
return
}
log.Printf("Received SteamCMD update command (validate=%v)", cmd.Validate)
steamCmd := process.NewSteamCMD(d.cfg.SteamCMDPath)
err := steamCmd.UpdateRustServer(cmd.Validate)
if err != nil {
log.Printf("SteamCMD update failed: %v", err)
d.respondError(msg, "update_failed", err.Error())
} else {
d.respondSuccess(msg, map[string]interface{}{
"status": "success",
"validate": cmd.Validate,
})
}
})
if err != nil {
return err
}
d.subscriptions = append(d.subscriptions, sub)
log.Printf("Subscribed to: %s", subject)
return nil
}
// subscribeSelfUpdate subscribes to companion agent self-update commands
func (d *Daemon) subscribeSelfUpdate() error {
subject := fmt.Sprintf("corrosion.%s.update.companion", d.cfg.LicenseID)
sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) {
var cmd struct {
DownloadURL string `json:"download_url"`
Version string `json:"version"`
}
if err := json.Unmarshal(msg.Data, &cmd); err != nil {
log.Printf("Failed to parse self-update command: %v", err)
d.respondError(msg, "invalid_command", err.Error())
return
}
log.Printf("Received self-update command: version=%s", cmd.Version)
err := d.updater.PerformUpdate(cmd.DownloadURL, cmd.Version)
if err != nil {
log.Printf("Self-update failed: %v", err)
d.respondError(msg, "update_failed", err.Error())
} else {
d.respondSuccess(msg, map[string]interface{}{
"status": "success",
"version": cmd.Version,
"message": "Update downloaded, restart required",
})
}
})
if err != nil {
return err
}
d.subscriptions = append(d.subscriptions, sub)
log.Printf("Subscribed to: %s", subject)
return nil
}
// subscribeDeployCommand subscribes to server deployment commands
func (d *Daemon) subscribeDeployCommand() error {
subject := fmt.Sprintf("corrosion.%s.cmd.deploy", d.cfg.LicenseID)
sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) {
var cmd struct {
Action string `json:"action"`
Config deploy.DeployConfig `json:"config"`
}
if err := json.Unmarshal(msg.Data, &cmd); err != nil {
log.Printf("Failed to parse deploy command: %v", err)
d.respondError(msg, "invalid_command", err.Error())
return
}
log.Printf("Received deploy command: %s", cmd.Action)
// Run deployment in goroutine (it's long-running)
go func() {
if err := d.deployer.Deploy(cmd.Config); err != nil {
log.Printf("Deployment failed: %v", err)
} else {
log.Println("Deployment completed successfully")
}
}()
// Immediately acknowledge the command
d.respondSuccess(msg, map[string]interface{}{
"status": "accepted",
"message": "Deployment started, progress will be published to deploy.status",
})
})
if err != nil {
return err
}
d.subscriptions = append(d.subscriptions, sub)
log.Printf("Subscribed to: %s", subject)
return nil
}
// subscribeFileManager subscribes to the VueFinder-compatible file manager
// command subject. All operations (list, delete, rename, copy, move, mkdir,
// mkfile, search, preview, save, upload) are handled by the filemanager package
// which enforces the installDir jail on every path.
func (d *Daemon) subscribeFileManager() error {
subject := fmt.Sprintf("corrosion.%s.files.cmd", d.cfg.LicenseID)
sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) {
d.fm.HandleNatsRequest(msg)
})
if err != nil {
return err
}
d.subscriptions = append(d.subscriptions, sub)
log.Printf("Subscribed to: %s", subject)
return nil
}
// handleFileOperation processes file operation requests
func (d *Daemon) handleFileOperation(msg *nats.Msg) {
// Parse common fields
var baseCmd struct {
RequestID string `json:"request_id"`
Path string `json:"path"`
DownloadURL string `json:"download_url,omitempty"` // For put operations
}
if err := json.Unmarshal(msg.Data, &baseCmd); err != nil {
log.Printf("Failed to parse file operation: %v", err)
d.respondError(msg, "invalid_command", err.Error())
return
}
var result interface{}
var err error
// Determine operation type from subject
if contains(msg.Subject, ".files.get") {
result, err = d.fileOps.Read(baseCmd.Path)
} else if contains(msg.Subject, ".files.put") {
err = d.fileOps.Write(baseCmd.Path, baseCmd.DownloadURL)
result = map[string]string{"status": "written"}
} else if contains(msg.Subject, ".files.delete") {
err = d.fileOps.Delete(baseCmd.Path)
result = map[string]string{"status": "deleted"}
} else if contains(msg.Subject, ".files.list") {
result, err = d.fileOps.List(baseCmd.Path)
}
responseSubject := fmt.Sprintf("corrosion.%s.files.response", d.cfg.LicenseID)
if err != nil {
log.Printf("File operation failed: %v", err)
d.publishResponse(responseSubject, map[string]interface{}{
"request_id": baseCmd.RequestID,
"status": "error",
"error": err.Error(),
})
} else {
d.publishResponse(responseSubject, map[string]interface{}{
"request_id": baseCmd.RequestID,
"status": "success",
"data": result,
})
}
}
// publishHeartbeat sends a heartbeat message to the cloud
func (d *Daemon) publishHeartbeat() {
subject := fmt.Sprintf("corrosion.%s.companion.heartbeat", d.cfg.LicenseID)
status := d.gameServer.Status()
uptime := d.gameServer.Uptime()
diskFree := getDiskFreeSpace(d.cfg.GameServerPath)
payload := HeartbeatPayload{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Status: "running",
ServerStatus: status,
UptimeSeconds: int64(uptime.Seconds()),
DiskFreeMB: diskFree,
CPUPercent: 0.0, // TODO: Implement CPU monitoring
LastUpdate: "", // TODO: Track last SteamCMD update
PlayerCount: 0, // Populated by plugin, not companion
Version: d.cfg.Version,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
ServerInstalled: deploy.CheckServerInstalled(d.cfg.InstallDir),
}
data, err := json.Marshal(payload)
if err != nil {
log.Printf("Failed to marshal heartbeat: %v", err)
return
}
if err := d.nc.Publish(subject, data); err != nil {
log.Printf("Failed to publish heartbeat: %v", err)
}
}
// respondError sends an error response to a command
func (d *Daemon) respondError(msg *nats.Msg, code, message string) {
response := map[string]interface{}{
"status": "error",
"code": code,
"message": message,
}
data, _ := json.Marshal(response)
if err := msg.Respond(data); err != nil {
log.Printf("Failed to send error response: %v", err)
}
}
// respondSuccess sends a success response to a command
func (d *Daemon) respondSuccess(msg *nats.Msg, payload interface{}) {
data, _ := json.Marshal(payload)
if err := msg.Respond(data); err != nil {
log.Printf("Failed to send success response: %v", err)
}
}
// publishResponse publishes a response to a specific subject
func (d *Daemon) publishResponse(subject string, payload interface{}) {
data, _ := json.Marshal(payload)
if err := d.nc.Publish(subject, data); err != nil {
log.Printf("Failed to publish response: %v", err)
}
}
// cleanup gracefully shuts down all daemon operations
func (d *Daemon) cleanup() {
log.Println("Unsubscribing from all subjects...")
for _, sub := range d.subscriptions {
sub.Unsubscribe()
}
log.Println("Draining NATS connection...")
d.nc.Drain()
log.Println("Cleanup complete")
}
// Helper functions
func contains(s, substr string) bool {
return len(s) >= len(substr) && s[len(s)-len(substr):] == substr ||
(len(s) > len(substr) && s[0:len(substr)] == substr) ||
(len(s) > 0 && len(substr) > 0 && findInString(s, substr))
}
func findInString(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}
func getDiskFreeSpace(path string) int64 {
// TODO: Implement actual disk space check
// For now, return placeholder
return 50000
}