diff --git a/internal/process/process_command.go b/internal/process/process_command.go index ed888ba..51d35e1 100644 --- a/internal/process/process_command.go +++ b/internal/process/process_command.go @@ -11,7 +11,6 @@ import ( "os/exec" "strings" "sync/atomic" - "syscall" "time" "github.com/mostlygeek/llama-swap/internal/config" @@ -22,6 +21,15 @@ import ( var ErrStartAborted = fmt.Errorf("aborted") +// cmdWaitDelay is the upper bound the runtime will wait for child I/O to +// drain after the process exits before force-closing the stdout/stderr +// pipes. Required so that cmd.Wait() returns even when a forked grandchild +// inherits and holds the pipes open (e.g. a shell wrapper that backgrounds +// the real binary). killProcess sends the stop signal directly (not via the +// cmd context), so this delay is measured from process exit rather than from +// the stop request, and stays independent of the caller's graceful timeout. +const cmdWaitDelay = 10 * time.Second + type runReq struct { timeout time.Duration respond chan error @@ -39,6 +47,7 @@ type waitReadyReq struct { type startResult struct { cmd *exec.Cmd cmdDone chan struct{} + cancel context.CancelFunc handlerFn http.HandlerFunc err error } @@ -51,6 +60,11 @@ type ProcessCommand struct { processLogger *logmon.Monitor proxyLogger *logmon.Monitor + // waitDelay is assigned to cmd.WaitDelay when starting the upstream + // process. Defaults to cmdWaitDelay; tests override it to keep the + // pipe-close backstop from dominating their runtime. + waitDelay time.Duration + runCh chan runReq stopCh chan stopReq waitReadyCh chan waitReadyReq @@ -85,6 +99,7 @@ func New( runCh: make(chan runReq), stopCh: make(chan stopReq), waitReadyCh: make(chan waitReadyReq), + waitDelay: cmdWaitDelay, } p.state.Store(StateStopped) @@ -122,6 +137,7 @@ func (p *ProcessCommand) run() { var ( cmd *exec.Cmd cmdDone <-chan struct{} + cmdCancel context.CancelFunc readyWaiters []waitReadyReq // runResp parks the in-flight Run caller's response channel. The // interface contract is that Run blocks until the process is @@ -164,9 +180,10 @@ func (p *ProcessCommand) run() { setState(StateShutdown) if cmd != nil { p.handler.Store(nil) - p.killProcess(cmd, cmdDone, 100*time.Millisecond) + p.killProcess(cmd, cmdCancel, cmdDone, 100*time.Millisecond) cmd = nil cmdDone = nil + cmdCancel = nil } notifyWaiters(fmt.Errorf("[%s] shutdown", p.id)) respondRun(fmt.Errorf("[%s] shutdown", p.id)) @@ -177,8 +194,12 @@ func (p *ProcessCommand) run() { // cmdDone is nil while no process is running, so this case is // dormant outside of StateReady. case <-cmdDone: + if cmdCancel != nil { + cmdCancel() + } cmd = nil cmdDone = nil + cmdCancel = nil p.handler.Store(nil) setState(StateStopped) respondRun(fmt.Errorf("[%s] upstream exited unexpectedly", p.id)) @@ -226,6 +247,7 @@ func (p *ProcessCommand) run() { if res.err == nil { cmd = res.cmd cmdDone = res.cmdDone + cmdCancel = res.cancel fn := res.handlerFn p.handler.Store(&fn) setState(StateReady) @@ -273,7 +295,7 @@ func (p *ProcessCommand) run() { cancelStart() res := <-resultCh if res.cmd != nil { - p.killProcess(res.cmd, res.cmdDone, stop.timeout) + p.killProcess(res.cmd, res.cancel, res.cmdDone, stop.timeout) } setState(StateStopped) notifyWaiters(ErrStartAborted) @@ -293,7 +315,7 @@ func (p *ProcessCommand) run() { setState(StateShutdown) res := <-resultCh if res.cmd != nil { - p.killProcess(res.cmd, res.cmdDone, 100*time.Millisecond) + p.killProcess(res.cmd, res.cancel, res.cmdDone, 100*time.Millisecond) } notifyWaiters(fmt.Errorf("[%s] shutdown", p.id)) respondRun(fmt.Errorf("[%s] shutdown", p.id)) @@ -310,9 +332,10 @@ func (p *ProcessCommand) run() { case stop := <-p.stopCh: if cmd != nil { setState(StateStopping) - p.killProcess(cmd, cmdDone, stop.timeout) + p.killProcess(cmd, cmdCancel, cmdDone, stop.timeout) cmd = nil cmdDone = nil + cmdCancel = nil p.handler.Store(nil) } // Stop is a no-op (and not an error) when already Stopped — this @@ -377,16 +400,26 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti reverseProxy.ServeHTTP(w, r) }) - cmd := exec.Command(args[0], args[1:]...) + // cmdCtx + cmd.Cancel are wired as a safety net: if the context is ever + // cancelled while the process is alive, cmd.Cancel sends SIGTERM / CmdStop + // and the runtime escalates to SIGKILL after cmd.WaitDelay. In the normal + // teardown path killProcess sends the stop signal directly instead, so + // cmd.WaitDelay only acts as the inherited-pipe backstop measured from + // process exit (see killProcess). + cmdCtx, cmdCancel := context.WithCancel(context.Background()) + cmd := exec.CommandContext(cmdCtx, args[0], args[1:]...) cmd.Stderr = p.processLogger cmd.Stdout = p.processLogger cmd.Env = append(cmd.Environ(), p.config.Env...) + cmd.Cancel = func() error { return p.sendStopSignal(cmd) } + cmd.WaitDelay = p.waitDelay setProcAttributes(cmd) p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.id, strings.Join(args, " "), strings.Join(p.config.Env, ", ")) cmdDone := make(chan struct{}) if err := cmd.Start(); err != nil { + cmdCancel() return startResult{err: fmt.Errorf("failed to start command '%s': %w", strings.Join(args, " "), err)} } @@ -402,21 +435,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti close(cmdDone) }() + abort := func(err error) startResult { + p.killProcess(cmd, cmdCancel, cmdDone, 5*time.Second) + return startResult{err: err} + } + prematureExit := func() startResult { + cmdCancel() + return startResult{err: fmt.Errorf("upstream command exited prematurely")} + } + if startCtx.Err() != nil { - p.killProcess(cmd, cmdDone, 5*time.Second) - return startResult{err: ErrStartAborted} + return abort(ErrStartAborted) } checkEndpoint := strings.TrimSpace(p.config.CheckEndpoint) if checkEndpoint == "none" { - return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn} + return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn} } // Wait 250ms for the command to start up before health checking select { case <-startCtx.Done(): - p.killProcess(cmd, cmdDone, 5*time.Second) - return startResult{err: ErrStartAborted} + return abort(ErrStartAborted) case <-time.After(250 * time.Millisecond): } @@ -424,16 +464,14 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti for { select { case <-startCtx.Done(): - p.killProcess(cmd, cmdDone, 5*time.Second) - return startResult{err: ErrStartAborted} + return abort(ErrStartAborted) case <-cmdDone: - return startResult{err: fmt.Errorf("upstream command exited prematurely")} + return prematureExit() default: } if time.Now().After(deadline) { - p.killProcess(cmd, cmdDone, 5*time.Second) - return startResult{err: fmt.Errorf("health check timed out after %v", healthCheckTimeout)} + return abort(fmt.Errorf("health check timed out after %v", healthCheckTimeout)) } req, _ := http.NewRequestWithContext(startCtx, "GET", p.config.CheckEndpoint, nil) @@ -445,28 +483,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti p.proxyLogger.Infof("<%s> Health check passed on %s%s", p.id, p.config.Proxy, p.config.CheckEndpoint) break } else if startCtx.Err() != nil { - p.killProcess(cmd, cmdDone, 5*time.Second) - return startResult{err: ErrStartAborted} + return abort(ErrStartAborted) } select { case <-startCtx.Done(): - p.killProcess(cmd, cmdDone, 5*time.Second) - return startResult{err: ErrStartAborted} + return abort(ErrStartAborted) case <-cmdDone: - return startResult{err: fmt.Errorf("upstream command exited prematurely")} + return prematureExit() case <-time.After(time.Second): } } - return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn} + return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn} } -func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gracefulTimeout time.Duration) { +// sendStopSignal runs the configured CmdStop (if any) or sends SIGTERM to +// the upstream process. Wired up as cmd.Cancel so it fires whenever the +// cmd's context is cancelled. +func (p *ProcessCommand) sendStopSignal(cmd *exec.Cmd) error { if cmd == nil || cmd.Process == nil { - return + return nil } - if p.config.CmdStop != "" { stopArgs, err := config.SanitizeCommand( strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)), @@ -475,12 +513,48 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...) stopCmd.Env = cmd.Env setProcAttributes(stopCmd) - stopCmd.Run() - } else { - cmd.Process.Signal(syscall.SIGTERM) + return stopCmd.Run() } - } else { - cmd.Process.Signal(syscall.SIGTERM) + // fall through to SIGTERM if sanitize failed + } + // On Unix this SIGTERMs the whole process group so a forked grandchild + // (e.g. a shell wrapper that backgrounds the real binary) is taken down + // with the parent rather than orphaned. + return terminateProcessTree(cmd) +} + +// killProcess terminates the upstream process. The flow: +// +// 1. Send the graceful stop signal (CmdStop / SIGTERM) directly — NOT by +// cancelling cmdCtx. Cancelling the context would start cmd.WaitDelay +// immediately, which force-kills the process WaitDelay after the signal +// and would silently cap gracefulTimeout at WaitDelay whenever +// gracefulTimeout is the longer of the two. +// 2. We wait up to gracefulTimeout for the process to exit on its own. +// 3. If still alive, we SIGKILL the process group directly (Unix) so any +// forked descendant is force-terminated alongside the parent. +// 4. We wait on cmdDone. cmd.WaitDelay (set when the cmd was built) is the +// critical backstop here: once the process exits, if a forked grandchild +// inherited the stdout/stderr pipes and is still holding them, the runtime +// force-closes the pipes WaitDelay after the exit and cmd.Wait() unblocks. +// Because we never cancelled the context, that WaitDelay timer measures +// from process exit (see os/exec awaitGoroutines), not from this call. +// Without WaitDelay this select would hang forever (the v219 bug). +// +// cancel() is still invoked (deferred) to release the context, but only after +// the process has exited and os/exec's ctx watcher has already torn down, so it +// never re-fires cmd.Cancel. +func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cancel context.CancelFunc, cmdDone <-chan struct{}, gracefulTimeout time.Duration) { + if cancel == nil { + return + } + defer cancel() + + // Deliver CmdStop / SIGTERM in a goroutine so a slow or hanging CmdStop + // cannot block the run() goroutine; the gracefulTimeout + Process.Kill + // path below still guarantees teardown. + if cmd != nil { + go func() { _ = p.sendStopSignal(cmd) }() } timer := time.NewTimer(gracefulTimeout) @@ -488,10 +562,16 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra select { case <-cmdDone: + return case <-timer.C: - cmd.Process.Kill() - <-cmdDone } + + if cmd != nil { + // SIGKILL the whole process group on Unix so any descendant that + // ignored or outlived the graceful signal is force-terminated too. + _ = killProcessTree(cmd) + } + <-cmdDone } func (p *ProcessCommand) ID() string { diff --git a/internal/process/process_command_forking_test.go b/internal/process/process_command_forking_test.go new file mode 100644 index 0000000..96610f7 --- /dev/null +++ b/internal/process/process_command_forking_test.go @@ -0,0 +1,262 @@ +//go:build !windows + +package process + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "syscall" + "testing" + "time" + + "github.com/mostlygeek/llama-swap/internal/config" +) + +// TestProcessCommand_StopForkingWrapper is a regression for the bug reported +// against v219 where Stop would hang indefinitely when the upstream command +// is a shell wrapper that forks the real binary (e.g. `#!/bin/bash` then +// `"$@"`). After SIGTERM the wrapper dies but the grandchild inherits the +// stdout/stderr pipes; cmd.Wait() blocks waiting for the pipe-copy goroutine +// to drain EOF, which never happens while the grandchild holds the fds. +// +// The fix is cmd.WaitDelay (combined with exec.CommandContext + cmd.Cancel), +// which causes the runtime to force-close the pipes after the delay so +// cmd.Wait() — and therefore Stop — returns. +func TestProcessCommand_StopForkingWrapper(t *testing.T) { + skipIfNoSimpleResponder(t) + + port := getFreePort(t) + dir := t.TempDir() + pidFile := filepath.Join(dir, "child.pid") + + // Wrapper script: backgrounds the child (which inherits stdout/stderr), + // records its PID for cleanup, then waits. When SIGTERM hits bash it + // dies without forwarding the signal; the grandchild keeps running and + // keeps the inherited pipe fds open. This is the scenario reported in + // the v219 regression. + wrapper := filepath.Join(dir, "wrapper.sh") + script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n", + simpleResponderPath, port, pidFile) + if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil { + t.Fatalf("WriteFile: %v", err) + } + t.Cleanup(func() { killChildFromPidFile(pidFile) }) + + p := newProcessCommand(t, config.ModelConfig{ + Cmd: wrapper, + Proxy: fmt.Sprintf("http://127.0.0.1:%d", port), + CheckEndpoint: "/health", + HealthCheckTimeout: 10, + }) + // Shrink the pipe-close backstop so the test doesn't sit at the + // production default (10s). Must be set before Run() so doStart picks + // it up when building the cmd. + const testWaitDelay = 250 * time.Millisecond + p.waitDelay = testWaitDelay + + runErr := runAsync(t, p) + + // Stop must return within a bounded time even though the grandchild + // is still holding the pipe open. Budget is generous on top of + // testWaitDelay to absorb scheduling jitter on slow CI runners; the + // pre-fix behaviour was an unbounded hang, so any reasonable cap + // distinguishes pass from fail. + stopReturned := make(chan error, 1) + stopStart := time.Now() + go func() { stopReturned <- p.Stop(testStopTimeout) }() + + const stopBudget = testWaitDelay + 2*time.Second + select { + case err := <-stopReturned: + if err != nil { + t.Fatalf("Stop: %v", err) + } + t.Logf("Stop returned in %v", time.Since(stopStart)) + case <-time.After(stopBudget): + t.Fatalf("Stop did not return within %v — cmd.Wait() likely hung on inherited pipe", stopBudget) + } + + if got := p.State(); got != StateStopped { + t.Errorf("after Stop: expected state %s, got %s", StateStopped, got) + } + + select { + case <-runErr: + case <-time.After(testReturnTimeout): + t.Errorf("Run did not return after Stop") + } +} + +// TestProcessCommand_StopHonorsGracefulTimeout is a regression for the bug +// where cmd.WaitDelay capped the graceful shutdown window. killProcess used to +// cancel the cmd context to deliver SIGTERM, which starts cmd.WaitDelay +// immediately; a process whose SIGTERM handler needs longer than WaitDelay to +// finish was force-killed early even though Stop was given a much longer +// timeout. The fix sends the signal directly so WaitDelay measures from process +// exit (its inherited-pipe backstop role), leaving the graceful window to the +// caller's Stop timeout. +func TestProcessCommand_StopHonorsGracefulTimeout(t *testing.T) { + dir := t.TempDir() + marker := filepath.Join(dir, "graceful.done") + ready := filepath.Join(dir, "trap.ready") + + // On SIGTERM, sleep past the (short) WaitDelay, then write the marker and + // exit cleanly. If WaitDelay still drove the kill, bash would be SIGKILLed + // mid-handler and the marker would never be written. The ready file is + // written only after the trap is installed so the test does not race + // SIGTERM ahead of it (CheckEndpoint:none marks ready before bash runs). + script := filepath.Join(dir, "graceful.sh") + body := fmt.Sprintf( + "#!/bin/bash\ncleanup() { sleep 0.6; echo done > %q; exit 0; }\ntrap cleanup SIGTERM\necho ready > %q\nwhile true; do sleep 0.1; done\n", + marker, ready, + ) + if err := os.WriteFile(script, []byte(body), 0o755); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + p := newProcessCommand(t, config.ModelConfig{ + Cmd: script, + Proxy: "http://127.0.0.1:1", // unused: health check disabled + CheckEndpoint: "none", + }) + // WaitDelay shorter than the handler's 0.6s sleep, and far shorter than the + // Stop timeout below — this is the window the old code mis-killed in. + p.waitDelay = 200 * time.Millisecond + + runErr := runAsync(t, p) + + // Wait until the trap is installed before stopping. + trapDeadline := time.Now().Add(2 * time.Second) + for { + if _, err := os.Stat(ready); err == nil { + break + } + if time.Now().After(trapDeadline) { + t.Fatalf("script did not install SIGTERM trap in time") + } + time.Sleep(10 * time.Millisecond) + } + + stopStart := time.Now() + if err := p.Stop(5 * time.Second); err != nil { + t.Fatalf("Stop: %v", err) + } + elapsed := time.Since(stopStart) + + // The handler must have run to completion (marker written) rather than + // being force-killed at waitDelay. + if _, err := os.Stat(marker); err != nil { + t.Fatalf("graceful handler did not complete (marker missing): %v", err) + } + // And Stop must have waited for the handler (>~0.6s), not returned at the + // 200ms waitDelay. + if elapsed < 500*time.Millisecond { + t.Fatalf("Stop returned in %v — process was killed before its graceful handler finished", elapsed) + } + + if got := p.State(); got != StateStopped { + t.Errorf("after Stop: expected state %s, got %s", StateStopped, got) + } + select { + case <-runErr: + case <-time.After(testReturnTimeout): + t.Errorf("Run did not return after Stop") + } +} + +// TestProcessCommand_StopReapsForkedGrandchild verifies that stopping a forking +// wrapper takes down the backgrounded grandchild too, rather than leaving it as +// an orphan. The fix is Setpgid (runtime_unix.go): the wrapper leads its own +// process group, so the stop signal is delivered to the whole group via the +// negative PID and reaches the grandchild the wrapper never reaped. +func TestProcessCommand_StopReapsForkedGrandchild(t *testing.T) { + skipIfNoSimpleResponder(t) + + port := getFreePort(t) + dir := t.TempDir() + pidFile := filepath.Join(dir, "child.pid") + + wrapper := filepath.Join(dir, "wrapper.sh") + script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n", + simpleResponderPath, port, pidFile) + if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil { + t.Fatalf("WriteFile: %v", err) + } + t.Cleanup(func() { killChildFromPidFile(pidFile) }) + + p := newProcessCommand(t, config.ModelConfig{ + Cmd: wrapper, + Proxy: fmt.Sprintf("http://127.0.0.1:%d", port), + CheckEndpoint: "/health", + HealthCheckTimeout: 10, + }) + + runErr := runAsync(t, p) + + // Read the grandchild PID the wrapper recorded. + var childPID int + deadline := time.Now().Add(2 * time.Second) + for { + data, err := os.ReadFile(pidFile) + if err == nil { + if pid, perr := strconv.Atoi(strings.TrimSpace(string(data))); perr == nil && pid > 0 { + childPID = pid + break + } + } + if time.Now().After(deadline) { + t.Fatalf("wrapper did not record grandchild PID") + } + time.Sleep(10 * time.Millisecond) + } + + if err := p.Stop(testStopTimeout); err != nil { + t.Fatalf("Stop: %v", err) + } + + // After Stop the grandchild must be gone. Signal 0 probes liveness without + // actually sending a signal; give it a brief window to exit after the + // group SIGTERM. + proc, err := os.FindProcess(childPID) + if err != nil { + t.Fatalf("FindProcess: %v", err) + } + gone := false + for i := 0; i < 100; i++ { + if err := proc.Signal(syscall.Signal(0)); err != nil { + gone = true + break + } + time.Sleep(10 * time.Millisecond) + } + if !gone { + t.Errorf("grandchild PID %d still alive after Stop — process group was not reaped", childPID) + } + + select { + case <-runErr: + case <-time.After(testReturnTimeout): + t.Errorf("Run did not return after Stop") + } +} + +// killChildFromPidFile reads a PID written by the wrapper script and SIGKILLs +// it so leaked orphans don't accumulate between test runs. Best-effort. +func killChildFromPidFile(pidFile string) { + data, err := os.ReadFile(pidFile) + if err != nil { + return + } + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil || pid <= 0 { + return + } + proc, err := os.FindProcess(pid) + if err != nil { + return + } + _ = proc.Kill() +} diff --git a/internal/process/runtime_unix.go b/internal/process/runtime_unix.go index f7c5adc..0012ec9 100644 --- a/internal/process/runtime_unix.go +++ b/internal/process/runtime_unix.go @@ -4,9 +4,41 @@ package process import ( "os/exec" + "syscall" ) -// setProcAttributes sets platform-specific process attributes +// setProcAttributes starts the upstream in its own process group (Setpgid) so +// the entire process tree can be signalled at once via its negative PID. This +// is what lets us reap a forked grandchild — e.g. a shell wrapper that +// backgrounds the real binary and exits — instead of leaking it as an orphan +// that holds the inherited stdout/stderr pipes open. func setProcAttributes(cmd *exec.Cmd) { - // No-op on Unix systems + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} +} + +// terminateProcessTree sends SIGTERM to the whole process group led by the +// command, giving every process in the tree a chance to shut down gracefully. +func terminateProcessTree(cmd *exec.Cmd) error { + return signalProcessTree(cmd, syscall.SIGTERM) +} + +// killProcessTree sends SIGKILL to the whole process group, force-terminating +// every process in the tree. +func killProcessTree(cmd *exec.Cmd) error { + return signalProcessTree(cmd, syscall.SIGKILL) +} + +// signalProcessTree signals the process group led by cmd.Process. Because the +// child was started with Setpgid it is its own group leader (pgid == pid), so +// targeting -pid reaches the child and every descendant still in the group. +// Falls back to signalling just the child if the group send fails (e.g. the +// group has already drained), so we never silently skip the signal. +func signalProcessTree(cmd *exec.Cmd, sig syscall.Signal) error { + if cmd == nil || cmd.Process == nil { + return nil + } + if err := syscall.Kill(-cmd.Process.Pid, sig); err != nil { + return cmd.Process.Signal(sig) + } + return nil } diff --git a/internal/process/runtime_windows.go b/internal/process/runtime_windows.go index 3888db8..e28dc69 100644 --- a/internal/process/runtime_windows.go +++ b/internal/process/runtime_windows.go @@ -14,3 +14,23 @@ func setProcAttributes(cmd *exec.Cmd) { CreationFlags: 0x08000000, // CREATE_NO_WINDOW } } + +// terminateProcessTree asks the upstream process to stop. Windows has no +// process-group signalling here — process-tree teardown is handled by the +// configured CmdStop, which defaults to `taskkill /f /t` — so this preserves +// the previous single-process SIGTERM behaviour. +func terminateProcessTree(cmd *exec.Cmd) error { + if cmd == nil || cmd.Process == nil { + return nil + } + return cmd.Process.Signal(syscall.SIGTERM) +} + +// killProcessTree force-terminates the upstream process. Tree teardown on +// Windows relies on CmdStop (taskkill /t); this kills the launched process. +func killProcessTree(cmd *exec.Cmd) error { + if cmd == nil || cmd.Process == nil { + return nil + } + return cmd.Process.Kill() +} diff --git a/internal/router/base.go b/internal/router/base.go index 33ff58a..8897f3b 100644 --- a/internal/router/base.go +++ b/internal/router/base.go @@ -745,24 +745,28 @@ func (b *baseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) { }() } + // finishLoading stops the loading stream and fences its goroutine off from + // the ResponseWriter before the real handler (or ServeHTTP's return) + // reclaims it. release() must run even when waitForCompletion times out: + // otherwise a still-streaming goroutine flushes a finalized response and + // panics on the recycled *bufio.Writer. + finishLoading := func() { + cancelLoad() + if lw != nil { + lw.waitForCompletion(1 * time.Second) + lw.release() + } + } + var resp handlerResp select { case resp = <-hr.respond: - cancelLoad() - if lw != nil { - lw.waitForCompletion(1 * time.Second) - } + finishLoading() case <-req.Context().Done(): - cancelLoad() - if lw != nil { - lw.waitForCompletion(1 * time.Second) - } + finishLoading() return case <-b.shutdownCtx.Done(): - cancelLoad() - if lw != nil { - lw.waitForCompletion(1 * time.Second) - } + finishLoading() SendError(w, req, fmt.Errorf("%s is shutting down", b.name)) return } diff --git a/internal/router/loading.go b/internal/router/loading.go index 99c6ee8..862212e 100644 --- a/internal/router/loading.go +++ b/internal/router/loading.go @@ -38,6 +38,13 @@ type loadingWriter struct { pendingMu sync.Mutex pendingUpdate string + // writeMu serializes writes to the underlying writer and guards released. + // Once released is set, the streaming goroutine must not touch the writer + // again — ServeHTTP has reclaimed it (to run the real handler or to return) + // and writing/flushing a finalized response panics. + writeMu sync.Mutex + released bool + // closed by start when the goroutine finishes (after cleanup messages) done chan struct{} @@ -217,12 +224,33 @@ func (s *loadingWriter) sendData(data string) { return } - _, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData) - if err != nil { + s.writeMu.Lock() + defer s.writeMu.Unlock() + // Once ServeHTTP has reclaimed the writer (release), writing/flushing it + // races the real handler or panics on a finalized response. Stop here. + if s.released { + return + } + + if _, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData); err != nil { s.logger.Debugf("<%s> Failed to write SSE data (client likely disconnected): %v", s.modelName, err) return } - s.Flush() + if flusher, ok := s.writer.(http.Flusher); ok { + flusher.Flush() + } +} + +// release fences the loadingWriter off from the underlying ResponseWriter. +// After it returns, the streaming goroutine will not write to or flush the +// writer again: any in-flight write completes under writeMu first, and later +// writes short-circuit on released. The caller can then safely hand the writer +// to the real handler or let ServeHTTP return without racing a finalized +// response (a use-after-return Flush panics on the recycled *bufio.Writer). +func (s *loadingWriter) release() { + s.writeMu.Lock() + s.released = true + s.writeMu.Unlock() } func (s *loadingWriter) Header() http.Header {