From 9be9a87fa019e542622f2a9c3f18bb715ebeb175 Mon Sep 17 00:00:00 2001 From: Benson Wong <83972+mostlygeek@users.noreply.github.com> Date: Mon, 1 Jun 2026 00:45:30 -0700 Subject: [PATCH] internal/process: improve windows shutdown behaviour (#808) Add Windows specific shutdown code paths so stopping of child processes is more reliable: - stopping llama-swap won't leave behind any child processes it created - uses Job Objects in Windows so the whole llama-swap tree is closed by the os - add procCtx to baseRouter. It replaces shutdownCtx as a signal for managing lifetime state. - shutdownCtx is only used by the router to stop handling new requests during shutdown - improve debug logging to make it easier to trace source of issues Fixes #804 Updates #807 --- go.mod | 2 +- internal/process/process_command.go | 51 ++++++++++++++++++++----- internal/process/runtime_windows.go | 41 ++++++++++++++------ internal/process/treecleanup_other.go | 7 ++++ internal/process/treecleanup_windows.go | 50 ++++++++++++++++++++++++ internal/router/base.go | 21 ++++++++++ internal/router/group.go | 4 +- internal/router/matrix.go | 3 +- llama-swap.go | 32 +++++++++++++++- 9 files changed, 184 insertions(+), 27 deletions(-) create mode 100644 internal/process/treecleanup_other.go create mode 100644 internal/process/treecleanup_windows.go diff --git a/go.mod b/go.mod index 46995b6..08232ea 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 golang.org/x/sync v0.20.0 + golang.org/x/sys v0.41.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -70,7 +71,6 @@ require ( golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/net v0.47.0 // indirect - golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.31.0 // indirect google.golang.org/protobuf v1.34.1 // indirect ) diff --git a/internal/process/process_command.go b/internal/process/process_command.go index 51d35e1..1390b6b 100644 --- a/internal/process/process_command.go +++ b/internal/process/process_command.go @@ -30,6 +30,13 @@ var ErrStartAborted = fmt.Errorf("aborted") // the stop request, and stays independent of the caller's graceful timeout. const cmdWaitDelay = 10 * time.Second +// parentCancelGraceTimeout is the graceful timeout used when the process is +// torn down because parentCtx was cancelled (final router teardown or app +// shutdown). In the normal flow the process has already been stopped via +// Stop() by this point, so killProcess is a no-op kill; the short grace just +// bounds the rare case where a process is still alive when its context is cut. +const parentCancelGraceTimeout = time.Second + type runReq struct { timeout time.Duration respond chan error @@ -180,7 +187,7 @@ func (p *ProcessCommand) run() { setState(StateShutdown) if cmd != nil { p.handler.Store(nil) - p.killProcess(cmd, cmdCancel, cmdDone, 100*time.Millisecond) + p.killProcess(cmd, cmdCancel, cmdDone, parentCancelGraceTimeout) cmd = nil cmdDone = nil cmdCancel = nil @@ -315,7 +322,7 @@ func (p *ProcessCommand) run() { setState(StateShutdown) res := <-resultCh if res.cmd != nil { - p.killProcess(res.cmd, res.cancel, res.cmdDone, 100*time.Millisecond) + p.killProcess(res.cmd, res.cancel, res.cmdDone, parentCancelGraceTimeout) } notifyWaiters(fmt.Errorf("[%s] shutdown", p.id)) respondRun(fmt.Errorf("[%s] shutdown", p.id)) @@ -425,12 +432,20 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti go func() { waitErr := cmd.Wait() - if exitErr, ok := waitErr.(*exec.ExitError); ok { - p.proxyLogger.Debugf("<%s> process exited: code=%d, err=%v", p.id, exitErr.ExitCode(), waitErr) - } else if waitErr != nil { - p.proxyLogger.Debugf("<%s> process exited with error: %v", p.id, waitErr) - } else { + switch st := p.State(); { + case waitErr == nil: p.proxyLogger.Debugf("<%s> process exited cleanly", p.id) + case st == StateStopping || st == StateShutdown: + // Expected: we force-terminated the process. A forced kill exits + // the child with a non-zero code (e.g. taskkill /f on Windows + // yields exit status 1), so this is not an error. + p.proxyLogger.Debugf("<%s> process stopped by llama-swap: %v", p.id, waitErr) + default: + if exitErr, ok := waitErr.(*exec.ExitError); ok { + p.proxyLogger.Debugf("<%s> process exited: code=%d, err=%v", p.id, exitErr.ExitCode(), waitErr) + } else { + p.proxyLogger.Debugf("<%s> process exited with error: %v", p.id, waitErr) + } } close(cmdDone) }() @@ -503,24 +518,40 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti // cmd's context is cancelled. func (p *ProcessCommand) sendStopSignal(cmd *exec.Cmd) error { if cmd == nil || cmd.Process == nil { + p.processLogger.Debugf("<%s> sendStopSignal() called with nil cmd or process, nothing to stop", p.id) return nil } + pid := cmd.Process.Pid if p.config.CmdStop != "" { + p.processLogger.Debugf("<%s> sendStopSignal() using CmdStop %q for pid %d", p.id, p.config.CmdStop, pid) stopArgs, err := config.SanitizeCommand( - strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)), + strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", pid)), ) if err == nil { + p.processLogger.Debugf("<%s> sendStopSignal() running stop command: %s", p.id, strings.Join(stopArgs, " ")) stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...) stopCmd.Env = cmd.Env setProcAttributes(stopCmd) - return stopCmd.Run() + runErr := stopCmd.Run() + if runErr != nil { + p.processLogger.Errorf("<%s> sendStopSignal() stop command failed: %v", p.id, runErr) + } else { + p.processLogger.Debugf("<%s> sendStopSignal() stop command completed for pid %d", p.id, pid) + } + return runErr } // fall through to SIGTERM if sanitize failed + p.processLogger.Errorf("<%s> sendStopSignal() failed to sanitize CmdStop %q: %v, falling back to terminateProcessTree", p.id, p.config.CmdStop, err) } // On Unix this SIGTERMs the whole process group so a forked grandchild // (e.g. a shell wrapper that backgrounds the real binary) is taken down // with the parent rather than orphaned. - return terminateProcessTree(cmd) + p.processLogger.Debugf("<%s> sendStopSignal() no CmdStop configured, calling terminateProcessTree for pid %d", p.id, pid) + termErr := terminateProcessTree(cmd) + if termErr != nil { + p.processLogger.Errorf("<%s> sendStopSignal() terminateProcessTree failed for pid %d: %v", p.id, pid, termErr) + } + return termErr } // killProcess terminates the upstream process. The flow: diff --git a/internal/process/runtime_windows.go b/internal/process/runtime_windows.go index e28dc69..0971bbc 100644 --- a/internal/process/runtime_windows.go +++ b/internal/process/runtime_windows.go @@ -3,11 +3,13 @@ package process import ( + "fmt" "os/exec" "syscall" ) -// setProcAttributes sets platform-specific process attributes +// setProcAttributes sets platform-specific process attributes. CREATE_NO_WINDOW +// keeps the upstream from spawning its own console window. func setProcAttributes(cmd *exec.Cmd) { cmd.SysProcAttr = &syscall.SysProcAttr{ HideWindow: true, @@ -15,22 +17,37 @@ func setProcAttributes(cmd *exec.Cmd) { } } -// terminateProcessTree asks the upstream process to stop. Windows has no -// process-group signalling here — process-tree teardown is handled by the -// configured CmdStop, which defaults to `taskkill /f /t` — so this preserves -// the previous single-process SIGTERM behaviour. +// terminateProcessTree requests a graceful shutdown of the whole process tree +// rooted at cmd.Process. Windows has no SIGTERM or process-group signalling, so +// we shell out to `taskkill /t`, which walks the child tree by PID — the +// equivalent of signalling a Unix process group. Without /f, taskkill asks the +// processes to close rather than force-killing them. func terminateProcessTree(cmd *exec.Cmd) error { - if cmd == nil || cmd.Process == nil { - return nil - } - return cmd.Process.Signal(syscall.SIGTERM) + return taskkillProcessTree(cmd, false) } -// killProcessTree force-terminates the upstream process. Tree teardown on -// Windows relies on CmdStop (taskkill /t); this kills the launched process. +// killProcessTree force-terminates the whole process tree rooted at cmd.Process +// via `taskkill /f /t`, so any descendant that ignored or outlived the graceful +// request is killed alongside the parent rather than leaked as an orphan. func killProcessTree(cmd *exec.Cmd) error { + return taskkillProcessTree(cmd, true) +} + +// taskkillProcessTree runs taskkill against cmd.Process.Pid. The /t flag +// terminates the process together with any child processes it started, which is +// the Windows analogue of signalling a Unix process group via its negative PID. +// When force is true the /f flag force-kills; otherwise taskkill requests a +// graceful close. +func taskkillProcessTree(cmd *exec.Cmd, force bool) error { if cmd == nil || cmd.Process == nil { return nil } - return cmd.Process.Kill() + args := make([]string, 0, 4) + if force { + args = append(args, "/f") + } + args = append(args, "/t", "/pid", fmt.Sprintf("%d", cmd.Process.Pid)) + kill := exec.Command("taskkill", args...) + setProcAttributes(kill) + return kill.Run() } diff --git a/internal/process/treecleanup_other.go b/internal/process/treecleanup_other.go new file mode 100644 index 0000000..2918485 --- /dev/null +++ b/internal/process/treecleanup_other.go @@ -0,0 +1,7 @@ +//go:build !windows + +package process + +// SetupTreeCleanup is a no-op on non-Windows platforms, where upstream process +// teardown is handled via process-group signalling (see runtime_unix.go). +func SetupTreeCleanup() error { return nil } diff --git a/internal/process/treecleanup_windows.go b/internal/process/treecleanup_windows.go new file mode 100644 index 0000000..e8d6252 --- /dev/null +++ b/internal/process/treecleanup_windows.go @@ -0,0 +1,50 @@ +//go:build windows + +package process + +import ( + "fmt" + "unsafe" + + "golang.org/x/sys/windows" +) + +// SetupTreeCleanup assigns the current process to a Windows Job Object +// configured with JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE. Upstream processes +// spawned afterwards are associated with the same job, so when llama-swap exits +// for any reason — graceful shutdown, a forced second Ctrl+C, or a crash — the +// OS terminates the whole job and reaps every child instead of leaving orphans +// behind. It is the parent-side complement to the per-process teardown in +// runtime_windows.go. +// +// The job handle is intentionally leaked for the lifetime of the process: the +// kill-on-close behaviour fires when the last handle is released, which the OS +// does when the process exits. +func SetupTreeCleanup() error { + job, err := windows.CreateJobObject(nil, nil) + if err != nil { + return fmt.Errorf("CreateJobObject: %w", err) + } + + info := windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION{ + BasicLimitInformation: windows.JOBOBJECT_BASIC_LIMIT_INFORMATION{ + LimitFlags: windows.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, + }, + } + if _, err := windows.SetInformationJobObject( + job, + windows.JobObjectExtendedLimitInformation, + uintptr(unsafe.Pointer(&info)), + uint32(unsafe.Sizeof(info)), + ); err != nil { + windows.CloseHandle(job) + return fmt.Errorf("SetInformationJobObject: %w", err) + } + + if err := windows.AssignProcessToJobObject(job, windows.CurrentProcess()); err != nil { + windows.CloseHandle(job) + return fmt.Errorf("AssignProcessToJobObject: %w", err) + } + + return nil +} diff --git a/internal/router/base.go b/internal/router/base.go index 8897f3b..1002efc 100644 --- a/internal/router/base.go +++ b/internal/router/base.go @@ -75,10 +75,21 @@ type baseRouter struct { logger *logmon.Monitor planner swapPlanner + // shutdownCtx governs the request machinery: cancelling it tells grant() + // and ServeHTTP to stop granting and reject callers. It is deliberately + // separate from procCtx — see procCtx below. shutdownCtx context.Context shutdownFn context.CancelFunc shuttingDown atomic.Bool + // procCtx is the parent context for every managed process and governs + // process lifetime only. handleShutdown stops processes gracefully via + // Stop() and cancels procCtx afterwards, so teardown is never a context + // cancel racing the graceful path (which collapsed the grace to 100ms and + // let the caller return before children were reaped — see process run loop). + procCtx context.Context + procCancel context.CancelFunc + handlerCh chan handlerReq shutdownCh chan shutdownReq unloadCh chan unloadReq @@ -97,6 +108,7 @@ type baseRouter struct { func newBaseRouter(name string, conf config.Config, processes map[string]process.Process, planner swapPlanner, logger *logmon.Monitor) *baseRouter { shutdownCtx, shutdownFn := context.WithCancel(context.Background()) + procCtx, procCancel := context.WithCancel(context.Background()) return &baseRouter{ name: name, config: conf, @@ -105,6 +117,8 @@ func newBaseRouter(name string, conf config.Config, processes map[string]process planner: planner, shutdownCtx: shutdownCtx, shutdownFn: shutdownFn, + procCtx: procCtx, + procCancel: procCancel, handlerCh: make(chan handlerReq), shutdownCh: make(chan shutdownReq), unloadCh: make(chan unloadReq), @@ -492,6 +506,8 @@ func (b *baseRouter) handleShutdown(req shutdownReq, active map[string]*activeSw // The grant calls below then either land (waiter happened to receive // before noticing shutdown) or fall through immediately via grant's // shutdownCtx case — either way the waiter sees a non-OK response. + // This does NOT touch processes: their lifetime is procCtx, cancelled + // only after the graceful Stop() calls below have reaped them. b.shutdownFn() for _, s := range active { @@ -535,6 +551,11 @@ func (b *baseRouter) handleShutdown(req shutdownReq, active map[string]*activeSw <-done } + // Every process is stopped (children reaped via Stop()). Cancel procCtx so + // the process run-loop goroutines exit; they are already StateStopped, so + // this is a clean no-op kill rather than a forced teardown. + b.procCancel() + req.respond <- nil } diff --git a/internal/router/group.go b/internal/router/group.go index db36caa..fef2739 100644 --- a/internal/router/group.go +++ b/internal/router/group.go @@ -36,12 +36,14 @@ func NewGroup(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Group modelCfg, _, ok := conf.FindConfig(mid) if !ok { base.shutdownFn() + base.procCancel() return nil, fmt.Errorf("no model config for %q", mid) } procLog := logmon.NewWriter(upstreamlog) - p, err := process.New(base.shutdownCtx, mid, modelCfg, procLog, proxylog) + p, err := process.New(base.procCtx, mid, modelCfg, procLog, proxylog) if err != nil { base.shutdownFn() + base.procCancel() return nil, fmt.Errorf("creating process for %q: %w", mid, err) } processes[mid] = p diff --git a/internal/router/matrix.go b/internal/router/matrix.go index 2badfc1..6f2d853 100644 --- a/internal/router/matrix.go +++ b/internal/router/matrix.go @@ -31,9 +31,10 @@ func NewMatrix(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Matr for mid, modelCfg := range conf.Models { procLog := logmon.NewWriter(upstreamlog) - p, err := process.New(base.shutdownCtx, mid, modelCfg, procLog, proxylog) + p, err := process.New(base.procCtx, mid, modelCfg, procLog, proxylog) if err != nil { base.shutdownFn() + base.procCancel() return nil, fmt.Errorf("creating process for %q: %w", mid, err) } processes[mid] = p diff --git a/llama-swap.go b/llama-swap.go index 7171436..ed07154 100644 --- a/llama-swap.go +++ b/llama-swap.go @@ -19,6 +19,7 @@ import ( "github.com/mostlygeek/llama-swap/internal/event" "github.com/mostlygeek/llama-swap/internal/logmon" "github.com/mostlygeek/llama-swap/internal/perf" + "github.com/mostlygeek/llama-swap/internal/process" "github.com/mostlygeek/llama-swap/internal/server" "github.com/mostlygeek/llama-swap/internal/shared" "github.com/mostlygeek/llama-swap/internal/watcher" @@ -122,6 +123,13 @@ func main() { applyLogSettings(cfg) proxyLog.Debugf("PID: %d", os.Getpid()) + // On Windows, bind the process tree to a Job Object so every upstream + // process is reaped when llama-swap exits — even on a forced kill. No-op + // elsewhere. Non-fatal: a failure just falls back to per-process teardown. + if err := process.SetupTreeCleanup(); err != nil { + proxyLog.Warnf("failed to set up process tree cleanup: %v", err) + } + // perfMon outlives config reloads; its config is updated in place. var perfMon *perf.Monitor if !cfg.Performance.Disabled { @@ -267,6 +275,16 @@ func main() { proxyLog.Infof("received signal %v, shutting down", sig) watcherCancel() + // Backstop against a stalled shutdown: force the process to + // exit once the whole graceful sequence has had its full budget. + // On Windows the Job Object reaps upstream processes on exit, so + // a forced exit still cleans up rather than orphaning children. + go func() { + time.Sleep(shutdownTimeout + 5*time.Second) + proxyLog.Warnf("graceful shutdown exceeded %v, forcing exit", shutdownTimeout) + os.Exit(1) + }() + activeMu.RLock() srv := activeSrv activeMu.RUnlock() @@ -275,13 +293,23 @@ func main() { // drain without blocking on them for the full timeout. srv.CloseStreams() - shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + // Both phases share a single deadline so total shutdown is + // bounded by shutdownTimeout rather than 2x it. + deadline := time.Now().Add(shutdownTimeout) + shutdownCtx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() if err := httpServer.Shutdown(shutdownCtx); err != nil { proxyLog.Warnf("http server shutdown error: %v", err) } - if err := srv.Shutdown(shutdownTimeout); err != nil { + // Clamp the remaining budget to a small positive value: a + // non-positive timeout makes the router fall back to its own + // healthCheckTimeout, which would defeat the shared deadline. + remaining := time.Until(deadline) + if remaining <= 0 { + remaining = time.Millisecond + } + if err := srv.Shutdown(remaining); err != nil { proxyLog.Warnf("router shutdown error: %v", err) }