package app import ( "context" "encoding/json" "fmt" "log" "runtime" "time" "github.com/nats-io/nats.go" "github.com/vigilcyber/corrosion-companion/internal/files" "github.com/vigilcyber/corrosion-companion/internal/process" "github.com/vigilcyber/corrosion-companion/internal/update" ) // DaemonConfig holds configuration for the daemon type DaemonConfig struct { LicenseID string HeartbeatInterval time.Duration SteamCMDPath string GameServerPath string GameServerArgs string Version string } // Daemon manages the companion agent's main operations type Daemon struct { nc *nats.Conn cfg *DaemonConfig gameServer *process.GameServer fileOps *files.Operations updater *update.Updater subscriptions []*nats.Subscription } // HeartbeatPayload represents the data sent in heartbeat messages type HeartbeatPayload struct { Timestamp string `json:"timestamp"` Status string `json:"status"` ServerStatus string `json:"server_status"` UptimeSeconds int64 `json:"uptime_seconds"` DiskFreeMB int64 `json:"disk_free_mb"` CPUPercent float64 `json:"cpu_percent"` LastUpdate string `json:"last_update"` PlayerCount int `json:"player_count"` Version string `json:"version"` OS string `json:"os"` Arch string `json:"arch"` } // NewDaemon creates a new daemon instance func NewDaemon(nc *nats.Conn, cfg *DaemonConfig) (*Daemon, error) { gameServer := process.NewGameServer(cfg.GameServerPath, cfg.GameServerArgs) fileOps := files.NewOperations() updater := update.NewUpdater(cfg.Version) d := &Daemon{ nc: nc, cfg: cfg, gameServer: gameServer, fileOps: fileOps, updater: updater, } return d, nil } // Run starts the daemon and blocks until context is cancelled func (d *Daemon) Run(ctx context.Context) error { log.Println("Starting daemon subscriptions...") // Subscribe to server control commands if err := d.subscribeServerCommands(); err != nil { return fmt.Errorf("failed to subscribe to server commands: %w", err) } // Subscribe to file operations if err := d.subscribeFileOperations(); err != nil { return fmt.Errorf("failed to subscribe to file operations: %w", err) } // Subscribe to SteamCMD update commands if err := d.subscribeSteamUpdate(); err != nil { return fmt.Errorf("failed to subscribe to steam updates: %w", err) } // Subscribe to self-update commands if err := d.subscribeSelfUpdate(); err != nil { return fmt.Errorf("failed to subscribe to self-update: %w", err) } log.Println("All subscriptions active") // Start heartbeat ticker ticker := time.NewTicker(d.cfg.HeartbeatInterval) defer ticker.Stop() // Send initial heartbeat immediately d.publishHeartbeat() // Main event loop for { select { case <-ctx.Done(): log.Println("Shutdown signal received, cleaning up...") d.cleanup() return nil case <-ticker.C: d.publishHeartbeat() } } } // subscribeServerCommands subscribes to server process control commands func (d *Daemon) subscribeServerCommands() error { subject := fmt.Sprintf("corrosion.%s.cmd.server", d.cfg.LicenseID) sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) { var cmd struct { Action string `json:"action"` } if err := json.Unmarshal(msg.Data, &cmd); err != nil { log.Printf("Failed to parse server command: %v", err) d.respondError(msg, "invalid_command", err.Error()) return } log.Printf("Received server command: %s", cmd.Action) var err error switch cmd.Action { case "start": err = d.gameServer.Start() case "stop": err = d.gameServer.Stop() case "restart": err = d.gameServer.Restart() default: err = fmt.Errorf("unknown action: %s", cmd.Action) } if err != nil { log.Printf("Server command failed: %v", err) d.respondError(msg, "command_failed", err.Error()) } else { d.respondSuccess(msg, map[string]interface{}{ "action": cmd.Action, "status": "success", }) } }) if err != nil { return err } d.subscriptions = append(d.subscriptions, sub) log.Printf("Subscribed to: %s", subject) return nil } // subscribeFileOperations subscribes to file operation commands func (d *Daemon) subscribeFileOperations() error { subjects := []string{ fmt.Sprintf("corrosion.%s.files.get", d.cfg.LicenseID), fmt.Sprintf("corrosion.%s.files.put", d.cfg.LicenseID), fmt.Sprintf("corrosion.%s.files.delete", d.cfg.LicenseID), fmt.Sprintf("corrosion.%s.files.list", d.cfg.LicenseID), } for _, subject := range subjects { sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) { d.handleFileOperation(msg) }) if err != nil { return err } d.subscriptions = append(d.subscriptions, sub) log.Printf("Subscribed to: %s", subject) } return nil } // subscribeSteamUpdate subscribes to SteamCMD update commands func (d *Daemon) subscribeSteamUpdate() error { subject := fmt.Sprintf("corrosion.%s.update.steam", d.cfg.LicenseID) sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) { var cmd struct { Validate bool `json:"validate"` } if err := json.Unmarshal(msg.Data, &cmd); err != nil { log.Printf("Failed to parse steam update command: %v", err) d.respondError(msg, "invalid_command", err.Error()) return } log.Printf("Received SteamCMD update command (validate=%v)", cmd.Validate) steamCmd := process.NewSteamCMD(d.cfg.SteamCMDPath) err := steamCmd.UpdateRustServer(cmd.Validate) if err != nil { log.Printf("SteamCMD update failed: %v", err) d.respondError(msg, "update_failed", err.Error()) } else { d.respondSuccess(msg, map[string]interface{}{ "status": "success", "validate": cmd.Validate, }) } }) if err != nil { return err } d.subscriptions = append(d.subscriptions, sub) log.Printf("Subscribed to: %s", subject) return nil } // subscribeSelfUpdate subscribes to companion agent self-update commands func (d *Daemon) subscribeSelfUpdate() error { subject := fmt.Sprintf("corrosion.%s.update.companion", d.cfg.LicenseID) sub, err := d.nc.Subscribe(subject, func(msg *nats.Msg) { var cmd struct { DownloadURL string `json:"download_url"` Version string `json:"version"` } if err := json.Unmarshal(msg.Data, &cmd); err != nil { log.Printf("Failed to parse self-update command: %v", err) d.respondError(msg, "invalid_command", err.Error()) return } log.Printf("Received self-update command: version=%s", cmd.Version) err := d.updater.PerformUpdate(cmd.DownloadURL, cmd.Version) if err != nil { log.Printf("Self-update failed: %v", err) d.respondError(msg, "update_failed", err.Error()) } else { d.respondSuccess(msg, map[string]interface{}{ "status": "success", "version": cmd.Version, "message": "Update downloaded, restart required", }) } }) if err != nil { return err } d.subscriptions = append(d.subscriptions, sub) log.Printf("Subscribed to: %s", subject) return nil } // handleFileOperation processes file operation requests func (d *Daemon) handleFileOperation(msg *nats.Msg) { // Parse common fields var baseCmd struct { RequestID string `json:"request_id"` Path string `json:"path"` DownloadURL string `json:"download_url,omitempty"` // For put operations } if err := json.Unmarshal(msg.Data, &baseCmd); err != nil { log.Printf("Failed to parse file operation: %v", err) d.respondError(msg, "invalid_command", err.Error()) return } var result interface{} var err error // Determine operation type from subject if contains(msg.Subject, ".files.get") { result, err = d.fileOps.Read(baseCmd.Path) } else if contains(msg.Subject, ".files.put") { err = d.fileOps.Write(baseCmd.Path, baseCmd.DownloadURL) result = map[string]string{"status": "written"} } else if contains(msg.Subject, ".files.delete") { err = d.fileOps.Delete(baseCmd.Path) result = map[string]string{"status": "deleted"} } else if contains(msg.Subject, ".files.list") { result, err = d.fileOps.List(baseCmd.Path) } responseSubject := fmt.Sprintf("corrosion.%s.files.response", d.cfg.LicenseID) if err != nil { log.Printf("File operation failed: %v", err) d.publishResponse(responseSubject, map[string]interface{}{ "request_id": baseCmd.RequestID, "status": "error", "error": err.Error(), }) } else { d.publishResponse(responseSubject, map[string]interface{}{ "request_id": baseCmd.RequestID, "status": "success", "data": result, }) } } // publishHeartbeat sends a heartbeat message to the cloud func (d *Daemon) publishHeartbeat() { subject := fmt.Sprintf("corrosion.%s.companion.heartbeat", d.cfg.LicenseID) status := d.gameServer.Status() uptime := d.gameServer.Uptime() diskFree := getDiskFreeSpace(d.cfg.GameServerPath) payload := HeartbeatPayload{ Timestamp: time.Now().UTC().Format(time.RFC3339), Status: "running", ServerStatus: status, UptimeSeconds: int64(uptime.Seconds()), DiskFreeMB: diskFree, CPUPercent: 0.0, // TODO: Implement CPU monitoring LastUpdate: "", // TODO: Track last SteamCMD update PlayerCount: 0, // Populated by plugin, not companion Version: d.cfg.Version, OS: runtime.GOOS, Arch: runtime.GOARCH, } data, err := json.Marshal(payload) if err != nil { log.Printf("Failed to marshal heartbeat: %v", err) return } if err := d.nc.Publish(subject, data); err != nil { log.Printf("Failed to publish heartbeat: %v", err) } } // respondError sends an error response to a command func (d *Daemon) respondError(msg *nats.Msg, code, message string) { response := map[string]interface{}{ "status": "error", "code": code, "message": message, } data, _ := json.Marshal(response) if err := msg.Respond(data); err != nil { log.Printf("Failed to send error response: %v", err) } } // respondSuccess sends a success response to a command func (d *Daemon) respondSuccess(msg *nats.Msg, payload interface{}) { data, _ := json.Marshal(payload) if err := msg.Respond(data); err != nil { log.Printf("Failed to send success response: %v", err) } } // publishResponse publishes a response to a specific subject func (d *Daemon) publishResponse(subject string, payload interface{}) { data, _ := json.Marshal(payload) if err := d.nc.Publish(subject, data); err != nil { log.Printf("Failed to publish response: %v", err) } } // cleanup gracefully shuts down all daemon operations func (d *Daemon) cleanup() { log.Println("Unsubscribing from all subjects...") for _, sub := range d.subscriptions { sub.Unsubscribe() } log.Println("Draining NATS connection...") d.nc.Drain() log.Println("Cleanup complete") } // Helper functions func contains(s, substr string) bool { return len(s) >= len(substr) && s[len(s)-len(substr):] == substr || (len(s) > len(substr) && s[0:len(substr)] == substr) || (len(s) > 0 && len(substr) > 0 && findInString(s, substr)) } func findInString(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false } func getDiskFreeSpace(path string) int64 { // TODO: Implement actual disk space check // For now, return placeholder return 50000 }