mirror of
https://github.com/mostlygeek/llama-swap.git
synced 2026-06-09 06:46:34 +02:00
internal/process: improve windows shutdown behaviour (#808)
Add Windows specific shutdown code paths so stopping of child processes is more reliable: - stopping llama-swap won't leave behind any child processes it created - uses Job Objects in Windows so the whole llama-swap tree is closed by the os - add procCtx to baseRouter. It replaces shutdownCtx as a signal for managing lifetime state. - shutdownCtx is only used by the router to stop handling new requests during shutdown - improve debug logging to make it easier to trace source of issues Fixes #804 Updates #807
This commit is contained in:
@@ -15,6 +15,7 @@ require (
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
github.com/tidwall/sjson v1.2.5
|
||||
golang.org/x/sync v0.20.0
|
||||
golang.org/x/sys v0.41.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@@ -70,7 +71,6 @@ require (
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/crypto v0.45.0 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/sys v0.41.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
)
|
||||
|
||||
@@ -30,6 +30,13 @@ var ErrStartAborted = fmt.Errorf("aborted")
|
||||
// the stop request, and stays independent of the caller's graceful timeout.
|
||||
const cmdWaitDelay = 10 * time.Second
|
||||
|
||||
// parentCancelGraceTimeout is the graceful timeout used when the process is
|
||||
// torn down because parentCtx was cancelled (final router teardown or app
|
||||
// shutdown). In the normal flow the process has already been stopped via
|
||||
// Stop() by this point, so killProcess is a no-op kill; the short grace just
|
||||
// bounds the rare case where a process is still alive when its context is cut.
|
||||
const parentCancelGraceTimeout = time.Second
|
||||
|
||||
type runReq struct {
|
||||
timeout time.Duration
|
||||
respond chan error
|
||||
@@ -180,7 +187,7 @@ func (p *ProcessCommand) run() {
|
||||
setState(StateShutdown)
|
||||
if cmd != nil {
|
||||
p.handler.Store(nil)
|
||||
p.killProcess(cmd, cmdCancel, cmdDone, 100*time.Millisecond)
|
||||
p.killProcess(cmd, cmdCancel, cmdDone, parentCancelGraceTimeout)
|
||||
cmd = nil
|
||||
cmdDone = nil
|
||||
cmdCancel = nil
|
||||
@@ -315,7 +322,7 @@ func (p *ProcessCommand) run() {
|
||||
setState(StateShutdown)
|
||||
res := <-resultCh
|
||||
if res.cmd != nil {
|
||||
p.killProcess(res.cmd, res.cancel, res.cmdDone, 100*time.Millisecond)
|
||||
p.killProcess(res.cmd, res.cancel, res.cmdDone, parentCancelGraceTimeout)
|
||||
}
|
||||
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
||||
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
||||
@@ -425,12 +432,20 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
||||
|
||||
go func() {
|
||||
waitErr := cmd.Wait()
|
||||
if exitErr, ok := waitErr.(*exec.ExitError); ok {
|
||||
p.proxyLogger.Debugf("<%s> process exited: code=%d, err=%v", p.id, exitErr.ExitCode(), waitErr)
|
||||
} else if waitErr != nil {
|
||||
p.proxyLogger.Debugf("<%s> process exited with error: %v", p.id, waitErr)
|
||||
} else {
|
||||
switch st := p.State(); {
|
||||
case waitErr == nil:
|
||||
p.proxyLogger.Debugf("<%s> process exited cleanly", p.id)
|
||||
case st == StateStopping || st == StateShutdown:
|
||||
// Expected: we force-terminated the process. A forced kill exits
|
||||
// the child with a non-zero code (e.g. taskkill /f on Windows
|
||||
// yields exit status 1), so this is not an error.
|
||||
p.proxyLogger.Debugf("<%s> process stopped by llama-swap: %v", p.id, waitErr)
|
||||
default:
|
||||
if exitErr, ok := waitErr.(*exec.ExitError); ok {
|
||||
p.proxyLogger.Debugf("<%s> process exited: code=%d, err=%v", p.id, exitErr.ExitCode(), waitErr)
|
||||
} else {
|
||||
p.proxyLogger.Debugf("<%s> process exited with error: %v", p.id, waitErr)
|
||||
}
|
||||
}
|
||||
close(cmdDone)
|
||||
}()
|
||||
@@ -503,24 +518,40 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
||||
// cmd's context is cancelled.
|
||||
func (p *ProcessCommand) sendStopSignal(cmd *exec.Cmd) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
p.processLogger.Debugf("<%s> sendStopSignal() called with nil cmd or process, nothing to stop", p.id)
|
||||
return nil
|
||||
}
|
||||
pid := cmd.Process.Pid
|
||||
if p.config.CmdStop != "" {
|
||||
p.processLogger.Debugf("<%s> sendStopSignal() using CmdStop %q for pid %d", p.id, p.config.CmdStop, pid)
|
||||
stopArgs, err := config.SanitizeCommand(
|
||||
strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)),
|
||||
strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", pid)),
|
||||
)
|
||||
if err == nil {
|
||||
p.processLogger.Debugf("<%s> sendStopSignal() running stop command: %s", p.id, strings.Join(stopArgs, " "))
|
||||
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
|
||||
stopCmd.Env = cmd.Env
|
||||
setProcAttributes(stopCmd)
|
||||
return stopCmd.Run()
|
||||
runErr := stopCmd.Run()
|
||||
if runErr != nil {
|
||||
p.processLogger.Errorf("<%s> sendStopSignal() stop command failed: %v", p.id, runErr)
|
||||
} else {
|
||||
p.processLogger.Debugf("<%s> sendStopSignal() stop command completed for pid %d", p.id, pid)
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
// fall through to SIGTERM if sanitize failed
|
||||
p.processLogger.Errorf("<%s> sendStopSignal() failed to sanitize CmdStop %q: %v, falling back to terminateProcessTree", p.id, p.config.CmdStop, err)
|
||||
}
|
||||
// On Unix this SIGTERMs the whole process group so a forked grandchild
|
||||
// (e.g. a shell wrapper that backgrounds the real binary) is taken down
|
||||
// with the parent rather than orphaned.
|
||||
return terminateProcessTree(cmd)
|
||||
p.processLogger.Debugf("<%s> sendStopSignal() no CmdStop configured, calling terminateProcessTree for pid %d", p.id, pid)
|
||||
termErr := terminateProcessTree(cmd)
|
||||
if termErr != nil {
|
||||
p.processLogger.Errorf("<%s> sendStopSignal() terminateProcessTree failed for pid %d: %v", p.id, pid, termErr)
|
||||
}
|
||||
return termErr
|
||||
}
|
||||
|
||||
// killProcess terminates the upstream process. The flow:
|
||||
|
||||
@@ -3,11 +3,13 @@
|
||||
package process
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// setProcAttributes sets platform-specific process attributes
|
||||
// setProcAttributes sets platform-specific process attributes. CREATE_NO_WINDOW
|
||||
// keeps the upstream from spawning its own console window.
|
||||
func setProcAttributes(cmd *exec.Cmd) {
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||
HideWindow: true,
|
||||
@@ -15,22 +17,37 @@ func setProcAttributes(cmd *exec.Cmd) {
|
||||
}
|
||||
}
|
||||
|
||||
// terminateProcessTree asks the upstream process to stop. Windows has no
|
||||
// process-group signalling here — process-tree teardown is handled by the
|
||||
// configured CmdStop, which defaults to `taskkill /f /t` — so this preserves
|
||||
// the previous single-process SIGTERM behaviour.
|
||||
// terminateProcessTree requests a graceful shutdown of the whole process tree
|
||||
// rooted at cmd.Process. Windows has no SIGTERM or process-group signalling, so
|
||||
// we shell out to `taskkill /t`, which walks the child tree by PID — the
|
||||
// equivalent of signalling a Unix process group. Without /f, taskkill asks the
|
||||
// processes to close rather than force-killing them.
|
||||
func terminateProcessTree(cmd *exec.Cmd) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return cmd.Process.Signal(syscall.SIGTERM)
|
||||
return taskkillProcessTree(cmd, false)
|
||||
}
|
||||
|
||||
// killProcessTree force-terminates the upstream process. Tree teardown on
|
||||
// Windows relies on CmdStop (taskkill /t); this kills the launched process.
|
||||
// killProcessTree force-terminates the whole process tree rooted at cmd.Process
|
||||
// via `taskkill /f /t`, so any descendant that ignored or outlived the graceful
|
||||
// request is killed alongside the parent rather than leaked as an orphan.
|
||||
func killProcessTree(cmd *exec.Cmd) error {
|
||||
return taskkillProcessTree(cmd, true)
|
||||
}
|
||||
|
||||
// taskkillProcessTree runs taskkill against cmd.Process.Pid. The /t flag
|
||||
// terminates the process together with any child processes it started, which is
|
||||
// the Windows analogue of signalling a Unix process group via its negative PID.
|
||||
// When force is true the /f flag force-kills; otherwise taskkill requests a
|
||||
// graceful close.
|
||||
func taskkillProcessTree(cmd *exec.Cmd, force bool) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return cmd.Process.Kill()
|
||||
args := make([]string, 0, 4)
|
||||
if force {
|
||||
args = append(args, "/f")
|
||||
}
|
||||
args = append(args, "/t", "/pid", fmt.Sprintf("%d", cmd.Process.Pid))
|
||||
kill := exec.Command("taskkill", args...)
|
||||
setProcAttributes(kill)
|
||||
return kill.Run()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
//go:build !windows
|
||||
|
||||
package process
|
||||
|
||||
// SetupTreeCleanup is a no-op on non-Windows platforms, where upstream process
|
||||
// teardown is handled via process-group signalling (see runtime_unix.go).
|
||||
func SetupTreeCleanup() error { return nil }
|
||||
@@ -0,0 +1,50 @@
|
||||
//go:build windows
|
||||
|
||||
package process
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
// SetupTreeCleanup assigns the current process to a Windows Job Object
|
||||
// configured with JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE. Upstream processes
|
||||
// spawned afterwards are associated with the same job, so when llama-swap exits
|
||||
// for any reason — graceful shutdown, a forced second Ctrl+C, or a crash — the
|
||||
// OS terminates the whole job and reaps every child instead of leaving orphans
|
||||
// behind. It is the parent-side complement to the per-process teardown in
|
||||
// runtime_windows.go.
|
||||
//
|
||||
// The job handle is intentionally leaked for the lifetime of the process: the
|
||||
// kill-on-close behaviour fires when the last handle is released, which the OS
|
||||
// does when the process exits.
|
||||
func SetupTreeCleanup() error {
|
||||
job, err := windows.CreateJobObject(nil, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CreateJobObject: %w", err)
|
||||
}
|
||||
|
||||
info := windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION{
|
||||
BasicLimitInformation: windows.JOBOBJECT_BASIC_LIMIT_INFORMATION{
|
||||
LimitFlags: windows.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
|
||||
},
|
||||
}
|
||||
if _, err := windows.SetInformationJobObject(
|
||||
job,
|
||||
windows.JobObjectExtendedLimitInformation,
|
||||
uintptr(unsafe.Pointer(&info)),
|
||||
uint32(unsafe.Sizeof(info)),
|
||||
); err != nil {
|
||||
windows.CloseHandle(job)
|
||||
return fmt.Errorf("SetInformationJobObject: %w", err)
|
||||
}
|
||||
|
||||
if err := windows.AssignProcessToJobObject(job, windows.CurrentProcess()); err != nil {
|
||||
windows.CloseHandle(job)
|
||||
return fmt.Errorf("AssignProcessToJobObject: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -75,10 +75,21 @@ type baseRouter struct {
|
||||
logger *logmon.Monitor
|
||||
planner swapPlanner
|
||||
|
||||
// shutdownCtx governs the request machinery: cancelling it tells grant()
|
||||
// and ServeHTTP to stop granting and reject callers. It is deliberately
|
||||
// separate from procCtx — see procCtx below.
|
||||
shutdownCtx context.Context
|
||||
shutdownFn context.CancelFunc
|
||||
shuttingDown atomic.Bool
|
||||
|
||||
// procCtx is the parent context for every managed process and governs
|
||||
// process lifetime only. handleShutdown stops processes gracefully via
|
||||
// Stop() and cancels procCtx afterwards, so teardown is never a context
|
||||
// cancel racing the graceful path (which collapsed the grace to 100ms and
|
||||
// let the caller return before children were reaped — see process run loop).
|
||||
procCtx context.Context
|
||||
procCancel context.CancelFunc
|
||||
|
||||
handlerCh chan handlerReq
|
||||
shutdownCh chan shutdownReq
|
||||
unloadCh chan unloadReq
|
||||
@@ -97,6 +108,7 @@ type baseRouter struct {
|
||||
|
||||
func newBaseRouter(name string, conf config.Config, processes map[string]process.Process, planner swapPlanner, logger *logmon.Monitor) *baseRouter {
|
||||
shutdownCtx, shutdownFn := context.WithCancel(context.Background())
|
||||
procCtx, procCancel := context.WithCancel(context.Background())
|
||||
return &baseRouter{
|
||||
name: name,
|
||||
config: conf,
|
||||
@@ -105,6 +117,8 @@ func newBaseRouter(name string, conf config.Config, processes map[string]process
|
||||
planner: planner,
|
||||
shutdownCtx: shutdownCtx,
|
||||
shutdownFn: shutdownFn,
|
||||
procCtx: procCtx,
|
||||
procCancel: procCancel,
|
||||
handlerCh: make(chan handlerReq),
|
||||
shutdownCh: make(chan shutdownReq),
|
||||
unloadCh: make(chan unloadReq),
|
||||
@@ -492,6 +506,8 @@ func (b *baseRouter) handleShutdown(req shutdownReq, active map[string]*activeSw
|
||||
// The grant calls below then either land (waiter happened to receive
|
||||
// before noticing shutdown) or fall through immediately via grant's
|
||||
// shutdownCtx case — either way the waiter sees a non-OK response.
|
||||
// This does NOT touch processes: their lifetime is procCtx, cancelled
|
||||
// only after the graceful Stop() calls below have reaped them.
|
||||
b.shutdownFn()
|
||||
|
||||
for _, s := range active {
|
||||
@@ -535,6 +551,11 @@ func (b *baseRouter) handleShutdown(req shutdownReq, active map[string]*activeSw
|
||||
<-done
|
||||
}
|
||||
|
||||
// Every process is stopped (children reaped via Stop()). Cancel procCtx so
|
||||
// the process run-loop goroutines exit; they are already StateStopped, so
|
||||
// this is a clean no-op kill rather than a forced teardown.
|
||||
b.procCancel()
|
||||
|
||||
req.respond <- nil
|
||||
}
|
||||
|
||||
|
||||
@@ -36,12 +36,14 @@ func NewGroup(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Group
|
||||
modelCfg, _, ok := conf.FindConfig(mid)
|
||||
if !ok {
|
||||
base.shutdownFn()
|
||||
base.procCancel()
|
||||
return nil, fmt.Errorf("no model config for %q", mid)
|
||||
}
|
||||
procLog := logmon.NewWriter(upstreamlog)
|
||||
p, err := process.New(base.shutdownCtx, mid, modelCfg, procLog, proxylog)
|
||||
p, err := process.New(base.procCtx, mid, modelCfg, procLog, proxylog)
|
||||
if err != nil {
|
||||
base.shutdownFn()
|
||||
base.procCancel()
|
||||
return nil, fmt.Errorf("creating process for %q: %w", mid, err)
|
||||
}
|
||||
processes[mid] = p
|
||||
|
||||
@@ -31,9 +31,10 @@ func NewMatrix(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Matr
|
||||
|
||||
for mid, modelCfg := range conf.Models {
|
||||
procLog := logmon.NewWriter(upstreamlog)
|
||||
p, err := process.New(base.shutdownCtx, mid, modelCfg, procLog, proxylog)
|
||||
p, err := process.New(base.procCtx, mid, modelCfg, procLog, proxylog)
|
||||
if err != nil {
|
||||
base.shutdownFn()
|
||||
base.procCancel()
|
||||
return nil, fmt.Errorf("creating process for %q: %w", mid, err)
|
||||
}
|
||||
processes[mid] = p
|
||||
|
||||
+30
-2
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/mostlygeek/llama-swap/internal/event"
|
||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||
"github.com/mostlygeek/llama-swap/internal/perf"
|
||||
"github.com/mostlygeek/llama-swap/internal/process"
|
||||
"github.com/mostlygeek/llama-swap/internal/server"
|
||||
"github.com/mostlygeek/llama-swap/internal/shared"
|
||||
"github.com/mostlygeek/llama-swap/internal/watcher"
|
||||
@@ -122,6 +123,13 @@ func main() {
|
||||
applyLogSettings(cfg)
|
||||
proxyLog.Debugf("PID: %d", os.Getpid())
|
||||
|
||||
// On Windows, bind the process tree to a Job Object so every upstream
|
||||
// process is reaped when llama-swap exits — even on a forced kill. No-op
|
||||
// elsewhere. Non-fatal: a failure just falls back to per-process teardown.
|
||||
if err := process.SetupTreeCleanup(); err != nil {
|
||||
proxyLog.Warnf("failed to set up process tree cleanup: %v", err)
|
||||
}
|
||||
|
||||
// perfMon outlives config reloads; its config is updated in place.
|
||||
var perfMon *perf.Monitor
|
||||
if !cfg.Performance.Disabled {
|
||||
@@ -267,6 +275,16 @@ func main() {
|
||||
proxyLog.Infof("received signal %v, shutting down", sig)
|
||||
watcherCancel()
|
||||
|
||||
// Backstop against a stalled shutdown: force the process to
|
||||
// exit once the whole graceful sequence has had its full budget.
|
||||
// On Windows the Job Object reaps upstream processes on exit, so
|
||||
// a forced exit still cleans up rather than orphaning children.
|
||||
go func() {
|
||||
time.Sleep(shutdownTimeout + 5*time.Second)
|
||||
proxyLog.Warnf("graceful shutdown exceeded %v, forcing exit", shutdownTimeout)
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
||||
activeMu.RLock()
|
||||
srv := activeSrv
|
||||
activeMu.RUnlock()
|
||||
@@ -275,13 +293,23 @@ func main() {
|
||||
// drain without blocking on them for the full timeout.
|
||||
srv.CloseStreams()
|
||||
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
// Both phases share a single deadline so total shutdown is
|
||||
// bounded by shutdownTimeout rather than 2x it.
|
||||
deadline := time.Now().Add(shutdownTimeout)
|
||||
shutdownCtx, cancel := context.WithDeadline(context.Background(), deadline)
|
||||
defer cancel()
|
||||
if err := httpServer.Shutdown(shutdownCtx); err != nil {
|
||||
proxyLog.Warnf("http server shutdown error: %v", err)
|
||||
}
|
||||
|
||||
if err := srv.Shutdown(shutdownTimeout); err != nil {
|
||||
// Clamp the remaining budget to a small positive value: a
|
||||
// non-positive timeout makes the router fall back to its own
|
||||
// healthCheckTimeout, which would defeat the shared deadline.
|
||||
remaining := time.Until(deadline)
|
||||
if remaining <= 0 {
|
||||
remaining = time.Millisecond
|
||||
}
|
||||
if err := srv.Shutdown(remaining); err != nil {
|
||||
proxyLog.Warnf("router shutdown error: %v", err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user