Files
llama.cpp/tools/server/server.cpp
T
Pascal 354ebac8cb server: real-time reasoning interruption via control endpoint (#23971)
* server: real-time reasoning interruption via control endpoint

Builds on the manual reasoning budget trigger from #23949. Adds a
CONTROL task that mirrors the CANCEL path on the live slot and calls
common_sampler_reasoning_budget_force to end thinking mid-generation.
POST /v1/chat/completions/control with { id_slot, action }, opt-in
reasoning_control arms the budget sampler on demand. Router and single
model. Minimal WebUI button as a skeleton for further UI work.

* ui: track reasoning phase via explicit streaming state

Add isReasoning to the chat store, mirroring the isLoading pattern:
per conversation map, private setter, public accessor and reactive
export. Set from the stream callbacks, true on reasoning chunks, false
on the first content chunk, reset on stream end and resynced on
conversation switch. The skip button now keys off isReasoning so it
shows only during the thinking phase, not the whole generation.

* ui: extract control endpoint and action into constants

Move the chat completion routes, the slots route and the reasoning
control action out of chat.service into api-endpoints and a dedicated
control-actions module. No behavior change, drops the magic strings so
the control protocol has a single source of truth.

* server: target reasoning control by completion id

Address @ngxson review on the control endpoint.

Switch from id_slot to the chat completion id to avoid a TOCTOU: the
slot can be reassigned between the lookup and the control request, so
matching the live completion (oaicompat_cmpl_id) is safe and a finished
one simply matches nothing. Rename the action to reasoning_end, guard
it on the reasoning_control flag of the target slot, and reduce the
response to {success} with an optional message.

* ui: target reasoning control by completion id

Keep the streamed completion id on the message and post it back to the
control endpoint instead of probing /slots. Drops the slot discovery
and the TOCTOU that came with it. Action renamed to reasoning_end,
response read as {success}.

* server: address review from @ngxson

Move the control fields into task_params and drop the redundant
comments on the control path.

* server: document the reasoning control endpoint

* Update tools/ui/src/lib/types/database.d.ts

Co-authored-by: Aleksander Grygier <aleksander.grygier@gmail.com>

* ui: rename cmplId to completionId

Per @allozaur review, clearer name for the streamed completion id.

* ui: wire completion id capture through the agentic flow

The webui streams through the agentic flow, which relayed onModel but
not onCompletionId, so the completion id never reached the message and
the control request was never sent. Relay it through the flow and its
callbacks type, declare id on the chunk type, and log an explicit error
when the button fires without a usable id.

* ui: target reasoning control model from the message

The model is a property of the completion, so read it from the streaming
message like the id, not from the model dropdown which is unrelated UI
state. Makes the request self-consistent by construction instead of just
unlikely to drift.

---------

Co-authored-by: Aleksander Grygier <aleksander.grygier@gmail.com>
2026-06-02 07:26:20 +02:00

370 lines
15 KiB
C++

#include "server-context.h"
#include "server-http.h"
#include "server-models.h"
#include "server-cors-proxy.h"
#include "server-tools.h"
#include "arg.h"
#include "build-info.h"
#include "common.h"
#include "fit.h"
#include "llama.h"
#include "log.h"
#include <atomic>
#include <clocale>
#include <exception>
#include <signal.h>
#include <thread> // for std::thread::hardware_concurrency
#if defined(_WIN32)
#include <windows.h>
#endif
static std::function<void(int)> shutdown_handler;
static std::atomic_flag is_terminating = ATOMIC_FLAG_INIT;
static inline void signal_handler(int signal) {
if (is_terminating.test_and_set()) {
// in case it hangs, we can force terminate the server by hitting Ctrl+C twice
// this is for better developer experience, we can remove when the server is stable enough
fprintf(stderr, "Received second interrupt, terminating immediately.\n");
exit(1);
}
shutdown_handler(signal);
}
// wrapper function that handles exceptions and logs errors
// this is to make sure handler_t never throws exceptions; instead, it returns an error response
static server_http_context::handler_t ex_wrapper(server_http_context::handler_t func) {
return [func = std::move(func)](const server_http_req & req) -> server_http_res_ptr {
std::string message;
error_type error;
try {
return func(req);
} catch (const std::invalid_argument & e) {
// treat invalid_argument as invalid request (400)
error = ERROR_TYPE_INVALID_REQUEST;
message = e.what();
} catch (const std::exception & e) {
// treat other exceptions as server error (500)
error = ERROR_TYPE_SERVER;
message = e.what();
} catch (...) {
error = ERROR_TYPE_SERVER;
message = "unknown error";
}
auto res = std::make_unique<server_http_res>();
res->status = 500;
try {
json error_data = format_error_response(message, error);
res->status = json_value(error_data, "code", 500);
res->data = safe_json_to_str({{ "error", error_data }});
SRV_WRN("got exception: %s\n", res->data.c_str());
} catch (const std::exception & e) {
SRV_ERR("got another exception: %s | while handling exception: %s\n", e.what(), message.c_str());
res->data = "Internal Server Error";
}
return res;
};
}
// satisfies -Wmissing-declarations
int llama_server(int argc, char ** argv);
int llama_server(int argc, char ** argv) {
std::setlocale(LC_NUMERIC, "C");
// own arguments required by this example
common_params params;
common_init();
if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_SERVER)) {
return 1;
}
llama_backend_init();
llama_numa_init(params.numa);
// router server never loads a model and must not touch the GPU
// skip device enumeration so the CUDA primary context stays uncreated
const bool is_router_server = params.model.path.empty();
common_params_print_info(params, !is_router_server);
// validate batch size for embeddings
// embeddings require all tokens to be processed in a single ubatch
// see https://github.com/ggml-org/llama.cpp/issues/12836
if (params.embedding && params.n_batch > params.n_ubatch) {
SRV_WRN("embeddings enabled with n_batch (%d) > n_ubatch (%d)\n", params.n_batch, params.n_ubatch);
SRV_WRN("setting n_batch = n_ubatch = %d to avoid assertion failure\n", params.n_ubatch);
params.n_batch = params.n_ubatch;
}
if (params.n_parallel < 0) {
SRV_INF("%s", "n_parallel is set to auto, using n_parallel = 4 and kv_unified = true\n");
params.n_parallel = 4;
params.kv_unified = true;
}
// for consistency between server router mode and single-model mode, we set the same model name as alias
if (params.model_alias.empty() && !params.model.name.empty()) {
params.model_alias.insert(params.model.name);
}
// struct that contains llama context and inference
server_context ctx_server;
server_http_context ctx_http;
if (!ctx_http.init(params)) {
SRV_ERR("%s", "failed to initialize HTTP server\n");
return 1;
}
//
// Router
//
// register API routes
server_routes routes(params, ctx_server);
server_tools tools;
std::optional<server_models_routes> models_routes{};
if (is_router_server) {
// setup server instances manager
try {
models_routes.emplace(params, argc, argv);
} catch (const std::exception & e) {
SRV_ERR("failed to initialize router models: %s\n", e.what());
return 1;
}
// proxy handlers
// note: routes.get_health stays the same
routes.get_metrics = models_routes->proxy_get;
routes.post_props = models_routes->proxy_post;
routes.post_completions = models_routes->proxy_post;
routes.post_completions_oai = models_routes->proxy_post;
routes.post_chat_completions = models_routes->proxy_post;
routes.post_control = models_routes->proxy_post;
routes.post_responses_oai = models_routes->proxy_post;
routes.post_transcriptions_oai = models_routes->proxy_post;
routes.post_anthropic_messages = models_routes->proxy_post;
routes.post_anthropic_count_tokens = models_routes->proxy_post;
routes.post_infill = models_routes->proxy_post;
routes.post_embeddings = models_routes->proxy_post;
routes.post_embeddings_oai = models_routes->proxy_post;
routes.post_rerank = models_routes->proxy_post;
routes.post_tokenize = models_routes->proxy_post;
routes.post_detokenize = models_routes->proxy_post;
routes.post_apply_template = models_routes->proxy_post;
routes.get_lora_adapters = models_routes->proxy_get;
routes.post_lora_adapters = models_routes->proxy_post;
routes.get_slots = models_routes->proxy_get;
routes.post_slots = models_routes->proxy_post;
// custom routes for router
routes.get_props = models_routes->get_router_props;
routes.get_models = models_routes->get_router_models;
ctx_http.post("/models/load", ex_wrapper(models_routes->post_router_models_load));
ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload));
}
ctx_http.get ("/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check)
ctx_http.get ("/v1/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check)
ctx_http.get ("/metrics", ex_wrapper(routes.get_metrics));
ctx_http.get ("/props", ex_wrapper(routes.get_props));
ctx_http.post("/props", ex_wrapper(routes.post_props));
ctx_http.get ("/models", ex_wrapper(routes.get_models)); // public endpoint (no API key check)
ctx_http.get ("/v1/models", ex_wrapper(routes.get_models)); // public endpoint (no API key check)
ctx_http.post("/completion", ex_wrapper(routes.post_completions)); // legacy
ctx_http.post("/completions", ex_wrapper(routes.post_completions));
ctx_http.post("/v1/completions", ex_wrapper(routes.post_completions_oai));
ctx_http.post("/chat/completions", ex_wrapper(routes.post_chat_completions));
ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions));
ctx_http.post("/v1/chat/completions/control", ex_wrapper(routes.post_control));
ctx_http.post("/v1/responses", ex_wrapper(routes.post_responses_oai));
ctx_http.post("/responses", ex_wrapper(routes.post_responses_oai));
ctx_http.post("/v1/audio/transcriptions", ex_wrapper(routes.post_transcriptions_oai));
ctx_http.post("/audio/transcriptions", ex_wrapper(routes.post_transcriptions_oai));
ctx_http.post("/v1/messages", ex_wrapper(routes.post_anthropic_messages)); // anthropic messages API
ctx_http.post("/v1/messages/count_tokens", ex_wrapper(routes.post_anthropic_count_tokens)); // anthropic token counting
ctx_http.post("/infill", ex_wrapper(routes.post_infill));
ctx_http.post("/embedding", ex_wrapper(routes.post_embeddings)); // legacy
ctx_http.post("/embeddings", ex_wrapper(routes.post_embeddings));
ctx_http.post("/v1/embeddings", ex_wrapper(routes.post_embeddings_oai));
ctx_http.post("/rerank", ex_wrapper(routes.post_rerank));
ctx_http.post("/reranking", ex_wrapper(routes.post_rerank));
ctx_http.post("/v1/rerank", ex_wrapper(routes.post_rerank));
ctx_http.post("/v1/reranking", ex_wrapper(routes.post_rerank));
ctx_http.post("/tokenize", ex_wrapper(routes.post_tokenize));
ctx_http.post("/detokenize", ex_wrapper(routes.post_detokenize));
ctx_http.post("/apply-template", ex_wrapper(routes.post_apply_template));
// LoRA adapters hotswap
ctx_http.get ("/lora-adapters", ex_wrapper(routes.get_lora_adapters));
ctx_http.post("/lora-adapters", ex_wrapper(routes.post_lora_adapters));
// Save & load slots
ctx_http.get ("/slots", ex_wrapper(routes.get_slots));
ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots));
// Google Cloud Platform (Vertex AI) compat
ctx_http.register_gcp_compat();
// CORS proxy (EXPERIMENTAL, only used by the Web UI for MCP)
// Supports both new ui_mcp_proxy and deprecated webui_mcp_proxy fields
if (params.ui_mcp_proxy || params.webui_mcp_proxy) {
SRV_WRN("%s", "-----------------\n");
SRV_WRN("%s", "CORS proxy is enabled, do not expose server to untrusted environments\n");
SRV_WRN("%s", "This feature is EXPERIMENTAL and may be removed or changed in future versions\n");
SRV_WRN("%s", "-----------------\n");
ctx_http.get ("/cors-proxy", ex_wrapper(proxy_handler_get));
ctx_http.post("/cors-proxy", ex_wrapper(proxy_handler_post));
}
// EXPERIMENTAL built-in tools
if (!params.server_tools.empty()) {
try {
tools.setup(params.server_tools);
} catch (const std::exception & e) {
SRV_ERR("tools setup failed: %s\n", e.what());
return 1;
}
SRV_WRN("%s", "-----------------\n");
SRV_WRN("%s", "Built-in tools are enabled, do not expose server to untrusted environments\n");
SRV_WRN("%s", "This feature is EXPERIMENTAL and may be changed in the future\n");
SRV_WRN("%s", "-----------------\n");
ctx_http.get ("/tools", ex_wrapper(tools.handle_get));
ctx_http.post("/tools", ex_wrapper(tools.handle_post));
}
//
// Start the server
//
std::function<void()> clean_up;
if (is_router_server) {
SRV_INF("%s", "starting router server, no model will be loaded in this process\n");
clean_up = [&models_routes]() {
SRV_INF("%s: cleaning up before exit...\n", __func__);
if (models_routes.has_value()) {
models_routes->models.unload_all();
}
llama_backend_free();
};
if (!ctx_http.start()) {
clean_up();
SRV_ERR("%s", "exiting due to HTTP server error\n");
return 1;
}
ctx_http.is_ready.store(true);
shutdown_handler = [&](int) {
ctx_http.stop();
};
} else {
// setup clean up function, to be called before exit
clean_up = [&ctx_http, &ctx_server]() {
SRV_INF("%s: cleaning up before exit...\n", __func__);
ctx_http.stop();
ctx_server.terminate();
llama_backend_free();
};
// start the HTTP server before loading the model to be able to serve /health requests
if (!ctx_http.start()) {
clean_up();
SRV_ERR("%s", "exiting due to HTTP server error\n");
return 1;
}
// load the model
SRV_INF("%s", "loading model\n");
if (server_models::is_child_server()) {
ctx_server.on_sleeping_changed([&](bool sleeping) {
server_models::notify_router_sleeping_state(sleeping);
});
}
if (!ctx_server.load_model(params)) {
clean_up();
if (ctx_http.thread.joinable()) {
ctx_http.thread.join();
}
SRV_ERR("%s", "exiting due to model loading error\n");
return 1;
}
routes.update_meta(ctx_server);
ctx_http.is_ready.store(true);
SRV_INF("%s", "model loaded\n");
shutdown_handler = [&](int) {
// this will unblock start_loop()
ctx_server.terminate();
};
}
// TODO: refactor in common/console
#if defined (__unix__) || (defined (__APPLE__) && defined (__MACH__))
struct sigaction sigint_action;
sigint_action.sa_handler = signal_handler;
sigemptyset (&sigint_action.sa_mask);
sigint_action.sa_flags = 0;
sigaction(SIGINT, &sigint_action, NULL);
sigaction(SIGTERM, &sigint_action, NULL);
#elif defined (_WIN32)
auto console_ctrl_handler = +[](DWORD ctrl_type) -> BOOL {
return (ctrl_type == CTRL_C_EVENT) ? (signal_handler(SIGINT), true) : false;
};
SetConsoleCtrlHandler(reinterpret_cast<PHANDLER_ROUTINE>(console_ctrl_handler), true);
#endif
if (is_router_server) {
SRV_INF("router server is listening on %s\n", ctx_http.listening_address.c_str());
SRV_WRN("%s", "NOTE: router mode is experimental\n");
SRV_WRN("%s", " it is not recommended to use this mode in untrusted environments\n");
if (ctx_http.thread.joinable()) {
ctx_http.thread.join(); // keep the main thread alive
}
// when the HTTP server stops, clean up and exit
clean_up();
} else {
SRV_INF("server is listening on %s\n", ctx_http.listening_address.c_str());
// optionally, notify router server that this instance is ready
std::thread monitor_thread;
if (server_models::is_child_server()) {
json model_info = routes.get_model_info();
monitor_thread = server_models::setup_child_server(shutdown_handler, model_info);
}
// this call blocks the main thread until queue_tasks.terminate() is called
ctx_server.start_loop();
clean_up();
if (ctx_http.thread.joinable()) {
ctx_http.thread.join();
}
if (monitor_thread.joinable()) {
monitor_thread.join();
}
auto * ll_ctx = ctx_server.get_llama_context();
if (ll_ctx != nullptr) {
common_memory_breakdown_print(ll_ctx);
}
}
return 0;
}