package main import ( "context" "fmt" "log" "net" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/crypto/ssh" ) var ( // Legacy globals kept for K8s path (globalSSH/globalPod*) and for the // shared DB pool (globalDB). New code should use globalExecutor/globalControl. globalSSH *ssh.Client globalDB *pgxpool.Pool globalPodIP string globalPodNS string globalPod string globalExecutor Executor globalControl ControlPlane ) // resolveControl returns the effective control plane name based on config, // defaulting to "kubectl" when SSH is configured and "local" otherwise. func resolveControl() string { if controlPlane != "" { return controlPlane } if sshHost != "" { return "kubectl" } return "local" } // connectAll creates the executor, control plane, and DB connection, then sets // all globals. Called from main() and handleReconnect. func connectAll() error { ctrl := resolveControl() // Start from the full loaded config so provider-specific fields // (docker_*, cmd_*) that have no flag/env equivalent are preserved. cfg := loadedConfig cfg.SSHHost = sshHost cfg.SSHUser = sshUser cfg.SSHKey = resolveKeyPath() cfg.DBHost = dbHost cfg.DBPort = dbPort cfg.DBUser = dbUser cfg.DBPass = dbPass cfg.DBName = dbName cfg.DBSchema = dbSchema cfg.Control = ctrl cfg.ControlNamespace = controlNS cfg.BrokerGameAddr = brokerGameAddr cfg.BrokerAdminAddr = brokerAdminAddr cfg.BrokerTLS = brokerTLS cfg.BackupDir = backupDir cfg.ServerIniDir = serverIniDir exec, err := newExecutor(cfg.SSHHost, cfg.SSHUser, cfg.SSHKey) if err != nil { return fmt.Errorf("executor: %w", err) } // AMP mode wraps the executor to elevate WriteFile through sudo. // Applies regardless of whether the inner executor is local or SSH. if ctrl == "amp" { user := cfg.AmpUser if user == "" { user = "amp" } exec = &Executor{Executor: exec, ampUser: user} } globalExecutor = exec // kubectl needs DB-pod discovery (via the executor, not the DB) to learn the // namespace before the control plane and DB connect. A discovery failure is // fatal — without a namespace there is nothing to drive the control plane. if ctrl == "kubectl" { ns, pod, podIP, err := discoverDBPod(exec) if err != nil { exec.Close() globalExecutor = nil return fmt.Errorf("DB pod discovery: %w", err) } globalPodNS = ns globalPod = pod globalPodIP = podIP // Propagate discovered namespace so kubectlControl can use it. if cfg.ControlNamespace == "" { cfg.ControlNamespace = ns controlNS = ns } if s, ok := exec.(*sshExecutor); ok { globalSSH = s.client } } // The control plane (logs, battlegroup, server control) does not depend on // the database. Establish it before connecting the DB so a DB outage never // disables it — the DB can be re-established later via /api/v1/reconnect // without losing control-plane functionality. globalControl = newControlPlane(ctrl, cfg) // DB connect is best-effort: on failure keep the executor + control plane // intact and return the error so the caller can surface it (main starts the // server anyway; the systemd watchdog or a manual reconnect retries the DB). var pool *pgxpool.Pool if ctrl == "kubectl" { pool, err = connectDB(context.Background(), cfg.DBUser, cfg.DBPass) } else { pool, err = connectDBDirect(context.Background(), cfg) } if err != nil { globalDB = nil return fmt.Errorf("DB connect: %w", err) } globalDB = pool // Best-effort: ensure the GM/Server chat persona exists for admin messaging // (whisper, map announce). Idempotent (ON CONFLICT DO NOTHING); a failure here // must never block startup or reconnect, so it is logged and swallowed. if err := cmdEnsureGMIdentity(context.Background()); err != nil { log.Printf("connectAll: ensure GM identity: %v", err) } return nil } // cmdConnect wraps connectAll in the legacy Msg return type. func cmdConnect() Msg { if err := connectAll(); err != nil { return msgConnect{err: err} } return msgConnect{} } // connectDBDirect opens a pgxpool without SSH tunnelling, routing TCP through // the executor's Dial (which is net.Dial for local, SSH tunnel for SSH). func connectDBDirect(ctx context.Context, cfg appConfig) (*pgxpool.Pool, error) { connStr := fmt.Sprintf( "host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.DBHost, cfg.DBPort, cfg.DBUser, cfg.DBPass, cfg.DBName) poolCfg, err := pgxpool.ParseConfig(connStr) if err != nil { return nil, err } poolCfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { _, err := conn.Exec(ctx, fmt.Sprintf(`SET search_path TO %s, public`, pgx.Identifier{cfg.DBSchema}.Sanitize())) return err } if globalExecutor != nil { addr := fmt.Sprintf("%s:%d", cfg.DBHost, cfg.DBPort) poolCfg.ConnConfig.DialFunc = func(ctx context.Context, _, _ string) (net.Conn, error) { return globalExecutor.Dial("tcp", addr) } } pool, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, err } dbUser = cfg.DBUser dbPass = cfg.DBPass return pool, nil } func connectDB(ctx context.Context, user, pass string) (*pgxpool.Pool, error) { connStr := fmt.Sprintf( "host=127.0.0.1 port=%d user=%s password=%s dbname=%s sslmode=disable", dbPort, user, pass, dbName) poolCfg, err := pgxpool.ParseConfig(connStr) if err != nil { return nil, err } poolCfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { _, err := conn.Exec(ctx, fmt.Sprintf(`SET search_path TO %s, public`, pgx.Identifier{dbSchema}.Sanitize())) return err } poolCfg.ConnConfig.LookupFunc = func(_ context.Context, _ string) ([]string, error) { return []string{globalPodIP}, nil } poolCfg.ConnConfig.DialFunc = func(_ context.Context, _, _ string) (net.Conn, error) { return globalSSH.Dial("tcp", fmt.Sprintf("%s:%d", globalPodIP, dbPort)) } pool, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, err } dbUser = user dbPass = pass return pool, nil }