Files
Benson Wong d3f329f924 proxy: Improve logging performance and allow separate log streaming (#421)
Replace container/ring.Ring with a custom circularBuffer that uses a
single contiguous []byte slice. This fixes the original implementation
which created 10,240 ring elements instead of 10KB of storage.

GetHistory is now 139x faster (145μs → 1μs) and uses 117x less memory
(1.2MB → 10KB). Allocations reduced from 2 to 1 per write operation.

Create a LogMonitor per proxy.Process, replacing the usage
of a shared one. The buffer in LogMonitor is lazy allocated on the first
call to Write and freed when the Process is stopped. This reduces
unnecessary memory usage when a model is not active.

The /logs/stream/{model_id} endpoint was added to stream logs from a
specific process.
2025-12-18 21:49:25 -08:00

158 lines
3.4 KiB
Go

package proxy
import (
"fmt"
"net/http"
"slices"
"sync"
"github.com/mostlygeek/llama-swap/proxy/config"
)
type ProcessGroup struct {
sync.Mutex
config config.Config
id string
swap bool
exclusive bool
persistent bool
proxyLogger *LogMonitor
upstreamLogger *LogMonitor
// map of current processes
processes map[string]*Process
lastUsedProcess string
}
func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, upstreamLogger *LogMonitor) *ProcessGroup {
groupConfig, ok := config.Groups[id]
if !ok {
panic("Unable to find configuration for group id: " + id)
}
pg := &ProcessGroup{
id: id,
config: config,
swap: groupConfig.Swap,
exclusive: groupConfig.Exclusive,
persistent: groupConfig.Persistent,
proxyLogger: proxyLogger,
upstreamLogger: upstreamLogger,
processes: make(map[string]*Process),
}
// Create a Process for each member in the group
for _, modelID := range groupConfig.Members {
modelConfig, modelID, _ := pg.config.FindConfig(modelID)
processLogger := NewLogMonitorWriter(upstreamLogger)
process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, processLogger, pg.proxyLogger)
pg.processes[modelID] = process
}
return pg
}
// ProxyRequest proxies a request to the specified model
func (pg *ProcessGroup) ProxyRequest(modelID string, writer http.ResponseWriter, request *http.Request) error {
if !pg.HasMember(modelID) {
return fmt.Errorf("model %s not part of group %s", modelID, pg.id)
}
if pg.swap {
pg.Lock()
if pg.lastUsedProcess != modelID {
// is there something already running?
if pg.lastUsedProcess != "" {
pg.processes[pg.lastUsedProcess].Stop()
}
// wait for the request to the new model to be fully handled
// and prevent race conditions see issue #277
pg.processes[modelID].ProxyRequest(writer, request)
pg.lastUsedProcess = modelID
// short circuit and exit
pg.Unlock()
return nil
}
pg.Unlock()
}
pg.processes[modelID].ProxyRequest(writer, request)
return nil
}
func (pg *ProcessGroup) HasMember(modelName string) bool {
return slices.Contains(pg.config.Groups[pg.id].Members, modelName)
}
func (pg *ProcessGroup) GetMember(modelName string) (*Process, bool) {
if pg.HasMember(modelName) {
return pg.processes[modelName], true
}
return nil, false
}
func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error {
pg.Lock()
process, exists := pg.processes[modelID]
if !exists {
pg.Unlock()
return fmt.Errorf("process not found for %s", modelID)
}
if pg.lastUsedProcess == modelID {
pg.lastUsedProcess = ""
}
pg.Unlock()
switch strategy {
case StopImmediately:
process.StopImmediately()
default:
process.Stop()
}
return nil
}
func (pg *ProcessGroup) StopProcesses(strategy StopStrategy) {
pg.Lock()
defer pg.Unlock()
if len(pg.processes) == 0 {
return
}
// stop Processes in parallel
var wg sync.WaitGroup
for _, process := range pg.processes {
wg.Add(1)
go func(process *Process) {
defer wg.Done()
switch strategy {
case StopImmediately:
process.StopImmediately()
default:
process.Stop()
}
}(process)
}
wg.Wait()
}
func (pg *ProcessGroup) Shutdown() {
var wg sync.WaitGroup
for _, process := range pg.processes {
wg.Add(1)
go func(process *Process) {
defer wg.Done()
process.Shutdown()
}(process)
}
wg.Wait()
}