Compare commits

...

7 Commits

Author SHA1 Message Date
Xuan Son Nguyen f7421eabe8 wip 2026-06-23 13:28:14 +02:00
Xuan Son Nguyen 59797670dc cli: move to HTTP-based implementation 2026-06-23 13:14:28 +02:00
Matt Thompson dec5ca5577 server : Add id to tool call responses api (#24882) 2026-06-22 23:03:12 +02:00
Mahdiou Diallo 9c0ac887f3 ui: Prioritize favorite models in model selection (#24766)
Updated model selection prioritization to include favorite models.
2026-06-22 21:00:21 +02:00
Xuan-Son Nguyen 721354fbdf server: (router) move model downloading to dedicated process (#24834)
* server: real-time model load progress tracking via /models/sse

* update docs

* server: move model download to child process

* rm unused

* fix most problems

* clean up

* nit fixes

* fix test case

* do not detact() thread

* shorter MODEL_DOWNLOAD_TIMEOUT in test

* throttle
2026-06-22 18:24:04 +02:00
Xuan-Son Nguyen 6ee0f65793 server: refactor/generalize input file schema (#24299)
* server: refactor/generalize input file schema

* wire up input_video, accept raw base64

* nits

* nits (2)

* fix windows
2026-06-22 16:42:47 +02:00
Pascal 099b579acb ui: model status and load progress via /models/sse feed (#24878)
* ui: model status and load progress via /models/sse feed

* ui: centralize SSE wire-format delimiters into shared constants for the chat and /models/sse parsers

* ui: type /models/sse event names as a ServerModelsSseEventType enum

Address review from allozaur
2026-06-22 15:55:30 +02:00
39 changed files with 1885 additions and 817 deletions
+10 -4
View File
@@ -396,7 +396,7 @@ static bool parse_bool_value(const std::string & value) {
// CLI argument parsing functions
//
bool common_params_handle_models(common_params & params, llama_example curr_ex) {
bool common_params_handle_models(common_params & params, llama_example curr_ex, common_download_callback * callback) {
const bool spec_type_draft_mtp = std::find(params.speculative.types.begin(),
params.speculative.types.end(),
COMMON_SPECULATIVE_TYPE_DRAFT_MTP) != params.speculative.types.end();
@@ -408,6 +408,10 @@ bool common_params_handle_models(common_params & params, llama_example curr_ex)
opts.download_mtp = spec_type_draft_mtp;
opts.download_mmproj = !params.no_mmproj && params.mmproj.path.empty() && params.mmproj.url.empty();
if (callback) {
opts.callback = callback;
}
// sub-models (draft, mmproj, vocoder) are explicitly specified by the user,
// so we should not auto-discover mtp/mmproj siblings for them
common_download_opts sub_opts = opts;
@@ -584,8 +588,11 @@ static bool common_params_parse_ex(int argc, char ** argv, common_params_context
throw std::invalid_argument("error: --prompt-cache-all not supported in interactive mode yet\n");
}
// export_graph_ops loads only metadata
const bool skip_model_download = ctx_arg.ex == LLAMA_EXAMPLE_EXPORT_GRAPH_OPS;
const bool skip_model_download =
// server will call common_params_handle_models() later, so we skip it here
ctx_arg.ex == LLAMA_EXAMPLE_SERVER ||
// export_graph_ops loads only metadata
ctx_arg.ex == LLAMA_EXAMPLE_EXPORT_GRAPH_OPS;
if (!skip_model_download) {
// handle model and download
@@ -594,7 +601,6 @@ static bool common_params_parse_ex(int argc, char ** argv, common_params_context
// model is required (except for server)
// TODO @ngxson : maybe show a list of available models in CLI in this case
if (params.model.path.empty()
&& ctx_arg.ex != LLAMA_EXAMPLE_SERVER
&& !params.usage
&& !params.completion) {
throw std::invalid_argument("error: --model is required\n");
+5 -1
View File
@@ -1,6 +1,7 @@
#pragma once
#include "common.h"
#include "download.h"
#include <set>
#include <map>
@@ -133,7 +134,10 @@ void common_params_add_preset_options(std::vector<common_arg> & args);
// return true if the model is ready to use
// throw an exception if there is an error that prevents the model from being used (e.g. network error, model not found, etc)
// if params.skip_download is true, no downloads will be attempted. return false if the model is invalid or missing (e.g. ETag check failed)
bool common_params_handle_models(common_params & params, llama_example curr_ex);
bool common_params_handle_models(
common_params & params,
llama_example curr_ex,
common_download_callback * callback = nullptr);
// initialize argument parser context - used by test-arg-parser and preset
common_params_context common_params_parser_init(common_params & params, llama_example ex, void(*print_usage)(int, char **) = nullptr);
+3
View File
@@ -631,6 +631,9 @@ struct common_params {
std::map<std::string, std::string> default_template_kwargs;
// CLI params
std::string server_base;
// UI configs
bool ui = true;
bool ui_mcp_proxy = false;
+70
View File
@@ -2,6 +2,16 @@
#include <cpp-httplib/httplib.h>
#ifdef _WIN32
#include <winsock2.h>
#include <windows.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#endif
struct common_http_url {
std::string scheme;
std::string user;
@@ -97,3 +107,63 @@ static std::pair<httplib::Client, common_http_url> common_http_client(const std:
static std::string common_http_show_masked_url(const common_http_url & parts) {
return parts.scheme + "://" + (parts.user.empty() ? "" : "****:****@") + parts.host + parts.path;
}
static int common_http_get_free_port() {
#ifdef _WIN32
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
return -1;
}
typedef SOCKET native_socket_t;
#define INVALID_SOCKET_VAL INVALID_SOCKET
#define CLOSE_SOCKET(s) closesocket(s)
#else
typedef int native_socket_t;
#define INVALID_SOCKET_VAL -1
#define CLOSE_SOCKET(s) close(s)
#endif
native_socket_t sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET_VAL) {
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
struct sockaddr_in serv_addr;
std::memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(0);
if (bind(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) != 0) {
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
#ifdef _WIN32
int namelen = sizeof(serv_addr);
#else
socklen_t namelen = sizeof(serv_addr);
#endif
if (getsockname(sock, (struct sockaddr*)&serv_addr, &namelen) != 0) {
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
int port = ntohs(serv_addr.sin_port);
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return port;
}
+4 -2
View File
@@ -2,11 +2,13 @@
set(TARGET llama-cli-impl)
add_library(${TARGET} cli.cpp)
add_library(${TARGET} cli.cpp
cli-client.cpp
cli-context.cpp)
set_target_properties(${TARGET} PROPERTIES WINDOWS_EXPORT_ALL_SYMBOLS ON)
target_include_directories(${TARGET} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ../server)
target_link_libraries(${TARGET} PUBLIC server-context llama-common ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(${TARGET} PUBLIC llama-server-impl llama-common ${CMAKE_THREAD_LIBS_INIT})
if(LLAMA_TOOLS_INSTALL)
install(TARGETS ${TARGET} LIBRARY)
+141
View File
@@ -0,0 +1,141 @@
#include "cli-client.h"
#include "http.h"
#include <algorithm>
#include <chrono>
#include <thread>
// generation can stall for a long time during prompt processing, so the
// read timeout must be generous
static constexpr time_t CLI_HTTP_READ_TIMEOUT_SEC = 3600;
// upper bound for the accumulated response body kept for error reporting
static constexpr size_t CLI_HTTP_MAX_ERROR_BODY = 1024 * 1024;
// returns the path with the base url's path prefix prepended (if any)
static std::string join_path(const common_http_url & parts, const std::string & path) {
if (parts.path.empty() || parts.path == "/") {
return path;
}
std::string prefix = parts.path;
if (prefix.back() == '/') {
prefix.pop_back();
}
return prefix + path;
}
json cli_client::get(const std::string & path) {
auto [cli, parts] = common_http_client(server_base);
cli.set_read_timeout(CLI_HTTP_READ_TIMEOUT_SEC, 0);
auto res = cli.Get(join_path(parts, path));
if (!res) {
throw std::runtime_error("failed to connect to " + server_base + ": " + httplib::to_string(res.error()));
}
if (res->status < 200 || res->status >= 300) {
throw std::runtime_error("GET " + path + " failed with status " + std::to_string(res->status) + ": " + res->body);
}
json result = json::parse(res->body, nullptr, false);
if (result.is_discarded()) {
throw std::runtime_error("GET " + path + " returned invalid JSON");
}
return result;
}
json cli_client::post(const std::string & path, const json & body) {
auto [cli, parts] = common_http_client(server_base);
cli.set_read_timeout(CLI_HTTP_READ_TIMEOUT_SEC, 0);
auto res = cli.Post(join_path(parts, path), body.dump(), "application/json");
if (!res) {
throw std::runtime_error("failed to connect to " + server_base + ": " + httplib::to_string(res.error()));
}
if (res->status < 200 || res->status >= 300) {
throw std::runtime_error("POST " + path + " failed with status " + std::to_string(res->status) + ": " + res->body);
}
json result = json::parse(res->body, nullptr, false);
if (result.is_discarded()) {
throw std::runtime_error("POST " + path + " returned invalid JSON");
}
return result;
}
json cli_client::post_sse(const std::string & path,
const json & body,
const std::function<bool()> & should_stop,
const std::function<void(const json &)> & on_data) {
auto [cli, parts] = common_http_client(server_base);
cli.set_read_timeout(CLI_HTTP_READ_TIMEOUT_SEC, 0);
std::string pending; // buffer for incomplete SSE lines
std::string raw_body; // accumulated body, used only for error reporting
auto receiver = [&](const char * data, size_t len) -> bool {
if (should_stop()) {
return false; // aborts the request
}
if (raw_body.size() < CLI_HTTP_MAX_ERROR_BODY) {
raw_body.append(data, std::min(len, CLI_HTTP_MAX_ERROR_BODY - raw_body.size()));
}
pending.append(data, len);
size_t pos;
while ((pos = pending.find('\n')) != std::string::npos) {
std::string line = pending.substr(0, pos);
pending.erase(0, pos + 1);
if (!line.empty() && line.back() == '\r') {
line.pop_back();
}
if (line.rfind("data: ", 0) != 0) {
continue;
}
std::string payload = line.substr(6);
if (payload == "[DONE]") {
continue;
}
json event = json::parse(payload, nullptr, false);
if (!event.is_discarded()) {
on_data(event);
}
}
return true;
};
httplib::Headers headers = {{"Accept", "text/event-stream"}};
auto res = cli.Post(join_path(parts, path), headers, body.dump(), "application/json", receiver);
if (!res) {
if (res.error() == httplib::Error::Canceled && should_stop()) {
return json(); // cancelled by the user
}
return json {{"error", {{"message", "failed to connect to " + server_base + ": " + httplib::to_string(res.error())}}}};
}
if (res->status < 200 || res->status >= 300) {
json error_body = json::parse(raw_body, nullptr, false);
if (!error_body.is_discarded() && error_body.contains("error")) {
return error_body;
}
return json {{"error", {{"message", "request failed with status " + std::to_string(res->status)}}}};
}
return json();
}
bool cli_client::wait_health(const std::function<bool()> & is_aborted) {
int connect_attempts = 0;
while (!is_aborted()) {
auto [cli, parts] = common_http_client(server_base);
cli.set_connection_timeout(1, 0);
auto res = cli.Get(join_path(parts, "/health"));
if (res) {
if (res->status == 200) {
return true;
}
// any other status means the server is up but not ready yet
// (e.g. 503 while the model is still loading)
} else if (++connect_attempts >= 10) {
last_error = "failed to connect to " + server_base + ": " + httplib::to_string(res.error());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
last_error = "aborted while waiting for the server to become ready";
return false;
}
+52
View File
@@ -0,0 +1,52 @@
#pragma once
#include "ggml.h"
#define JSON_ASSERT GGML_ASSERT
#include <nlohmann/json.hpp>
#include <functional>
#include <string>
using json = nlohmann::ordered_json;
// openai-like client for CLI
struct cli_client {
std::string server_base; // base url, for example "http://127.0.0.1:8080"
std::string last_error; // set when wait_health() fails
// simple GET request, returns the response json
// throws std::runtime_error on transport error or non-2xx status
json get(const std::string & path);
// simple POST request, returns the response json
// throws std::runtime_error on transport error or non-2xx status
json post(const std::string & path, const json & body);
// POST request with an SSE streaming response; on_data is invoked once
// per "data:" event; the function returns after the stream is finished:
// a null json on graceful exit (incl. cancellation via should_stop),
// the error response json otherwise
json post_sse(const std::string & path,
const json & body,
const std::function<bool()> & should_stop,
const std::function<void(const json &)> & on_data);
// poll /health until the server is ready to accept requests
// returns false if is_aborted returned true or the server is unreachable
bool wait_health(const std::function<bool()> & is_aborted);
//
// higher-level wrappers
//
json create_chat_completion(const json & request,
const std::function<bool()> & should_stop,
const std::function<void(const json &)> & on_data) {
return post_sse("/v1/chat/completions", request, should_stop, on_data);
}
json get_props() {
return get("/props");
}
};
+497
View File
@@ -0,0 +1,497 @@
#include "cli-context.h"
#include "cli-view.h"
#include "arg.h"
#include "base64.hpp"
#include "log.h"
#include "console.h"
#include <algorithm>
#include <filesystem>
#include <fstream>
#include <map>
#include <set>
std::atomic<bool> g_cli_interrupted = false;
static bool should_stop() {
return g_cli_interrupted.load();
}
static constexpr size_t FILE_GLOB_MAX_RESULTS = 100;
const char * LLAMA_ASCII_LOGO = R"(
)";
// number of values an arg consumes on the command line
static int arg_num_values(const common_arg & opt) {
if (opt.value_hint_2 != nullptr) {
return 2;
}
if (opt.value_hint != nullptr) {
return 1;
}
return 0;
}
static std::string format_error_message(const json & err) {
if (err.contains("error") && err.at("error").is_object()) {
const auto & e = err.at("error");
if (e.contains("message") && e.at("message").is_string()) {
return e.at("message").get<std::string>();
}
}
return err.dump();
}
static std::string media_type_from_ext(const std::string & fname) {
std::string ext = std::filesystem::path(fname).extension().string();
std::transform(ext.begin(), ext.end(), ext.begin(), ::tolower);
if (ext == ".wav" || ext == ".mp3") {
return "audio";
}
if (ext == ".mp4" || ext == ".avi" || ext == ".mkv" || ext == ".mov" || ext == ".webm") {
return "video";
}
return "image";
}
bool cli_context::init() {
std::optional<view::spinner> spinner;
if (!params.server_base.empty()) {
std::string base = params.server_base;
while (!base.empty() && base.back() == '/') {
base.pop_back();
}
client.server_base = base;
spinner.emplace("Connecting to server at " + base);
} else {
if (params.model.path.empty() && params.model.url.empty() &&
params.model.hf_repo.empty() && params.model.docker_repo.empty()) {
view::show_error(
"no model specified",
"use -m <file.gguf> or -hf <user/repo> to run a local model,\n"
"or --server-base <url> to connect to a running llama-server"
);
return false;
}
spinner.emplace("Loading model...");
server.emplace();
if (!server->start(params)) {
view::show_error("server start failed");
return false;
}
if (!server->wait_ready(should_stop)) {
if (!should_stop()) {
view::show_error("the server exited before becoming ready");
}
return false;
}
client.server_base = server->address();
}
// for --server-base this is the main availability check; for a spawned
// server it is a cheap sanity check on top of the ready signal
auto is_aborted = [this]() {
return should_stop() || (server && !server->alive());
};
bool healthy = false;
try {
healthy = client.wait_health(is_aborted);
} catch (const std::exception & e) {
client.last_error = e.what();
}
if (!healthy) {
if (!should_stop()) {
view::show_error(client.last_error);
}
return false;
}
fetch_server_props();
return true;
}
void cli_context::fetch_server_props() {
try {
json props = client.get_props();
model_name = props.value("model_alias", "");
if (model_name.empty()) {
const std::string path = props.value("model_path", "");
if (!path.empty()) {
model_name = std::filesystem::path(path).filename().string();
}
}
build_info = props.value("build_info", "");
if (props.contains("modalities") && props.at("modalities").is_object()) {
const auto & modalities = props.at("modalities");
has_vision = modalities.value("vision", false);
has_audio = modalities.value("audio", false);
has_video = modalities.value("video", false);
}
} catch (const std::exception & e) {
// /props can be disabled on remote servers; not fatal
LOG_DBG("failed to fetch /props: %s\n", e.what());
}
}
void cli_context::add_system_prompt() {
if (!params.system_prompt.empty()) {
messages.push_back({
{"role", "system"},
{"content", params.system_prompt}
});
}
}
void cli_context::push_user_message(const std::string & text) {
json content;
if (pending_media.empty()) {
content = text;
} else {
// multimodal message: media parts first, then the text
content = pending_media;
content.push_back({
{"type", "text"},
{"text", text}
});
pending_media = json::array();
}
messages.push_back({
{"role", "user"},
{"content", content}
});
}
bool cli_context::stage_media_file(const std::string & fname, const std::string & type) {
std::ifstream file(fname, std::ios::binary);
if (!file) {
return false;
}
std::string data((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
std::string encoded = base64::encode(data);
if (type == "audio") {
std::string ext = std::filesystem::path(fname).extension().string();
std::transform(ext.begin(), ext.end(), ext.begin(), ::tolower);
pending_media.push_back({
{"type", "input_audio"},
{"input_audio", {
{"data", encoded},
{"format", ext == ".mp3" ? "mp3" : "wav"}
}}
});
} else if (type == "video") {
pending_media.push_back({
{"type", "input_video"},
{"input_video", {
{"data", encoded}
}}
});
} else {
// the server detects the actual image type from the data
pending_media.push_back({
{"type", "image_url"},
{"image_url", {
{"url", "data:image/unknown;base64," + encoded}
}}
});
}
return true;
}
bool cli_context::generate_completion(std::string & assistant_content, cli_timings & timings) {
json body = {
{"messages", messages},
{"stream", true},
// in order to get timings even when we cancel mid-way
{"timings_per_token", true},
};
bool stream_error = false;
view::assistant_turn a;
json err = client.create_chat_completion(body, should_stop, [&](const json & chunk) {
if (chunk.contains("error")) {
stream_error = true;
view::show_error(format_error_message(chunk));
return;
}
if (chunk.contains("timings")) {
const auto & t = chunk.at("timings");
timings.prompt_per_second = t.value("prompt_per_second", 0.0);
timings.predicted_per_second = t.value("predicted_per_second", 0.0);
}
if (!chunk.contains("choices") || !chunk.at("choices").is_array() || chunk.at("choices").empty()) {
return;
}
const auto & choice = chunk.at("choices").at(0);
if (!choice.contains("delta")) {
return;
}
const auto & delta = choice.at("delta");
if (delta.contains("reasoning_content") && delta.at("reasoning_content").is_string()) {
const std::string text = delta.at("reasoning_content").get<std::string>();
if (!text.empty()) {
a.push(view::ASSISTANT_DISPLAY_MODE_REASONING, text);
}
}
if (delta.contains("content") && delta.at("content").is_string()) {
const std::string text = delta.at("content").get<std::string>();
if (!text.empty()) {
assistant_content += text;
a.push(view::ASSISTANT_DISPLAY_MODE_CONTENT, text);
}
}
});
g_cli_interrupted.store(false);
if (!err.is_null()) {
view::show_error(format_error_message(err));
return false;
}
return !stream_error;
}
int cli_context::run() {
add_system_prompt();
std::string modalities = "text";
if (has_vision) {
modalities += ", vision";
}
if (has_audio) {
modalities += ", audio";
}
if (has_video) {
modalities += ", video";
}
std::vector<std::string> banner;
banner.push_back("\n");
banner.push_back(LLAMA_ASCII_LOGO);
banner.push_back("\n");
banner.push_back("build : " + build_info);
banner.push_back("model : " + model_name);
banner.push_back("modalities : " + modalities);
if (!params.system_prompt.empty()) {
console::log("using custom system prompt\n");
}
console::log("\n");
console::log("available commands:\n");
console::log(" /exit or Ctrl+C stop or exit\n");
console::log(" /regen regenerate the last response\n");
console::log(" /clear clear the chat history\n");
console::log(" /read <file> add a text file\n");
console::log(" /glob <pattern> add text files using globbing pattern\n");
if (has_vision) {
console::log(" /image <file> add an image file\n");
}
if (has_audio) {
console::log(" /audio <file> add an audio file\n");
}
if (has_video) {
console::log(" /video <file> add a video file\n");
}
console::log("\n");
view::show_banner(banner);
// interactive loop
std::string cur_msg;
auto add_text_file = [&](const std::string & fname) -> bool {
std::ifstream file(fname, std::ios::binary);
if (!file) {
view::show_error(string_format("file does not exist or cannot be opened: '%s'", fname.c_str()));
return false;
}
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
cur_msg += "--- File: ";
cur_msg += fname;
cur_msg += " ---\n";
cur_msg += content;
view::show_message(string_format("Loaded text from '%s'", fname.c_str()));
return true;
};
while (true) {
std::string buffer;
{
view::user_turn user_turn;
if (params.prompt.empty()) {
buffer = user_turn.read_input(params.multiline_input);
} else {
// process input prompt from args
for (auto & fname : params.image) {
if (!stage_media_file(fname, media_type_from_ext(fname))) {
view::show_error(string_format("file does not exist or cannot be opened: '%s'", fname.c_str()));
break;
}
view::show_message(string_format("Loaded media from '%s'", fname.c_str()));
}
buffer = params.prompt;
user_turn.echo(buffer);
params.prompt.clear(); // only use it once
}
}
if (should_stop()) {
g_cli_interrupted.store(false);
break;
}
// remove trailing newline
if (!buffer.empty() && buffer.back() == '\n') {
buffer.pop_back();
}
// skip empty messages
if (buffer.empty()) {
continue;
}
bool add_user_msg = true;
// process commands
if (string_starts_with(buffer, "/exit")) {
break;
} else if (string_starts_with(buffer, "/regen")) {
if (messages.size() >= 2) {
size_t last_idx = messages.size() - 1;
messages.erase(last_idx);
add_user_msg = false;
} else {
view::show_error("No message to regenerate.");
continue;
}
} else if (string_starts_with(buffer, "/clear")) {
messages.clear();
add_system_prompt();
pending_media = json::array();
view::show_message("Chat history cleared.");
continue;
} else if (
(string_starts_with(buffer, "/image ") && has_vision) ||
(string_starts_with(buffer, "/audio ") && has_audio) ||
(string_starts_with(buffer, "/video ") && has_video)) {
std::string type = buffer.substr(1, 5);
// just in case (bad copy-paste for example), we strip all trailing/leading spaces
std::string fname = string_strip(buffer.substr(7));
if (!stage_media_file(fname, type)) {
view::show_error(string_format("file does not exist or cannot be opened: '%s'", fname.c_str()));
continue;
}
view::show_message(string_format("Loaded media from '%s'", fname.c_str()));
continue;
} else if (string_starts_with(buffer, "/read ")) {
std::string fname = string_strip(buffer.substr(6));
add_text_file(fname);
continue;
} else if (string_starts_with(buffer, "/glob ")) {
std::error_code ec;
size_t count = 0;
auto curdir = std::filesystem::current_path();
std::string pattern = string_strip(buffer.substr(6));
std::filesystem::path rel_path;
auto startglob = pattern.find_first_of("![*?");
if (startglob != std::string::npos && startglob != 0) {
auto endpath = pattern.substr(0, startglob).find_last_of('/');
if (endpath != std::string::npos) {
std::string rel_pattern = pattern.substr(0, endpath);
#if !defined(_WIN32)
if (string_starts_with(rel_pattern, '~')) {
const char * home = std::getenv("HOME");
if (home && home[0]) {
rel_pattern = home + rel_pattern.substr(1);
}
}
#endif
rel_path = rel_pattern;
pattern.erase(0, endpath + 1);
curdir /= rel_path;
}
}
for (const auto & entry : std::filesystem::recursive_directory_iterator(curdir,
std::filesystem::directory_options::skip_permission_denied, ec)) {
if (!entry.is_regular_file()) {
continue;
}
std::string rel = std::filesystem::relative(entry.path(), curdir, ec).string();
if (ec) {
ec.clear();
continue;
}
std::replace(rel.begin(), rel.end(), '\\', '/');
if (!glob_match(pattern, rel)) {
continue;
}
if (!add_text_file((rel_path / rel).string())) {
continue;
}
if (++count >= FILE_GLOB_MAX_RESULTS) {
view::show_error(string_format("Maximum number of globbed files allowed (%zu) reached.", FILE_GLOB_MAX_RESULTS));
break;
}
}
continue;
} else {
// not a command
cur_msg += buffer;
}
// generate response
if (add_user_msg) {
push_user_message(cur_msg);
cur_msg.clear();
}
cli_timings timings;
std::string assistant_content;
generate_completion(assistant_content, timings);
messages.push_back({
{"role", "assistant"},
{"content", assistant_content}
});
if (params.show_timings) {
// TODO
}
if (params.single_turn) {
break;
}
}
view::show_message("Exiting...");
return 0;
}
void cli_context::shutdown() {
if (server) {
server->stop();
server.reset();
}
}
+82
View File
@@ -0,0 +1,82 @@
// controller for llama-cli (the "controller" in MVC)
//
// owns the chat state, drives the view and talks to llama-server through
// cli_client; when no --server-base is given it also manages a local
// llama-server child process via cli_server
#pragma once
#include "common.h"
#include "cli-client.h"
#include "cli-server.h"
#include <atomic>
#include <optional>
#include <string>
struct cli_timings {
double prompt_per_second = 0.0;
double predicted_per_second = 0.0;
};
struct cli_command_info {
std::string usage; // e.g. "/read <file>"
std::string description; // e.g. "add a text file"
};
// properties of the connected server, shown on startup
struct cli_server_info {
std::string build_info;
std::string model_name;
std::string server_base;
bool is_local_server = false; // server is spawned and managed by llama-cli
bool has_system_prompt = false;
bool has_vision = false;
bool has_audio = false;
bool has_video = false;
std::vector<cli_command_info> commands;
};
// set by the SIGINT handler; cleared once the interrupt has been handled
extern std::atomic<bool> g_cli_interrupted;
struct cli_context {
common_params params;
cli_client client; // always initialized
std::optional<cli_server> server; // only set when no --server-base is given
json messages = json::array();
json pending_media = json::array(); // staged multimodal content parts
// properties of the connected server
std::string model_name;
std::string build_info;
bool has_vision = false;
bool has_audio = false;
bool has_video = false;
cli_context(const common_params & params) : params(params) {}
// connect to --server-base or spawn a local llama-server child;
// argc/argv are needed to forward the server-relevant args to the child
bool init();
// run the interactive chat loop, returns the process exit code
int run();
// stop the local server child (if any)
void shutdown();
private:
bool generate_completion(std::string & assistant_content, cli_timings & timings);
void fetch_server_props();
void add_system_prompt();
void push_user_message(const std::string & text);
// read a file and stage it as a multimodal content part; type is one of
// "image", "audio", "video"; returns false if the file cannot be read
bool stage_media_file(const std::string & fname, const std::string & type);
};
+63
View File
@@ -0,0 +1,63 @@
#pragma once
#include <thread>
#include "http.h"
// spawn llama-server in a thread and interact with it via a random port
// note: in the future, we may have a server running as daemon and the CLI can connect to it automatically
// llama_server will be available as a dynamic library symbol
int llama_server(common_params & params, int argc, char ** argv);
struct cli_server {
std::thread th;
int port = -1;
~cli_server() {
stop();
}
void stop() {
if (th.joinable()) {
th.detach();
}
}
bool start(common_params & params) {
port = common_http_get_free_port();
if (port <= 0) {
fprintf(stderr, "failed to get a free port\n");
exit(1);
}
th = std::thread([&]() {
// argc / argv are only used in router mode, we can skip them for now
int res = llama_server(params, 0, nullptr);
if (res != 0) {
fprintf(stderr, "llama_server exited with code %d\n", res);
}
});
return true;
}
std::string address() const {
return "http://127.0.0.1:" + std::to_string(port);
}
bool wait_ready(std::function<bool()> should_stop) {
// while (true) {
// if (should_stop()) {
// break;
// }
// std::this_thread::sleep_for(std::chrono::milliseconds(5000));
// }
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
return true;
}
bool alive() const {
return th.joinable();
}
};
+103
View File
@@ -0,0 +1,103 @@
#pragma once
#include "common.h"
#include "console.h"
// note: make this view implementation generic, so that we can move to TUI in the future if we want to
namespace view {
using completion_callback = std::function<std::vector<std::pair<std::string, size_t>>(std::string_view, size_t)>;
static void set_completion_callback(completion_callback cb) {
console::set_completion_callback(std::move(cb));
}
static void init(const common_params & params) {
// TODO: avoid using atexit() here by making `console` a singleton
console::init(params.simple_io, params.use_color);
atexit([]() { console::cleanup(); });
}
struct spinner {
spinner(const std::string & message) {
console::log("%s\n", message.c_str());
console::spinner::start();
}
~spinner() {
console::spinner::stop();
}
};
struct user_turn {
user_turn() {
console::set_display(DISPLAY_TYPE_USER_INPUT);
}
~user_turn() {
console::set_display(DISPLAY_TYPE_RESET);
}
void echo(const std::string & buffer) {
if (buffer.size() > 500) {
console::log("\n> %s ... (truncated)\n", buffer.substr(0, 500).c_str());
} else {
console::log("\n> %s\n", buffer.c_str());
}
}
std::string read_input(bool multiline_input) {
console::log("\n> ");
std::string buffer;
std::string line;
bool another_line = true;
do {
another_line = console::readline(line, multiline_input);
buffer += line;
} while (another_line);
return buffer;
}
};
enum assistant_display_mode {
ASSISTANT_DISPLAY_MODE_REASONING,
ASSISTANT_DISPLAY_MODE_CONTENT,
};
struct assistant_turn {
assistant_display_mode mode = ASSISTANT_DISPLAY_MODE_CONTENT;
assistant_turn() {
console::set_display(DISPLAY_TYPE_RESET);
}
~assistant_turn() {
console::set_display(DISPLAY_TYPE_RESET);
}
void push(assistant_display_mode m, const std::string & buffer) {
if (m != mode) {
switch (m) {
case ASSISTANT_DISPLAY_MODE_CONTENT:
console::set_display(DISPLAY_TYPE_RESET);
break;
case ASSISTANT_DISPLAY_MODE_REASONING:
console::set_display(DISPLAY_TYPE_REASONING);
break;
}
}
mode = m;
console::log("%s", buffer.c_str());
console::flush();
}
};
static void show_error(const std::string & title, const std::string & message = "") {
console::spinner::stop();
console::error("Error: %s\n", title.c_str());
if (!message.empty()) {
console::log("%s\n", message.c_str());
}
}
static void show_message(const std::string & message) {
console::log("%s\n", message.c_str());
}
static void show_banner(const std::vector<std::string> & lines) {
for (const auto & line : lines) {
console::log("%s\n", line.c_str());
}
}
};
+15 -500
View File
@@ -1,20 +1,14 @@
#include "chat.h"
#include "common.h"
#include "arg.h"
#include "console.h"
#include "fit.h"
// #include "log.h"
#include "common.h"
#include "log.h"
#include "server-common.h"
#include "server-context.h"
#include "server-task.h"
#include "cli-context.h"
#include "cli-view.h"
#include <array>
#include <atomic>
#include <algorithm>
#include <filesystem>
#include <fstream>
#include <thread>
#include <string_view>
#include <signal.h>
#if defined(_WIN32)
@@ -25,222 +19,19 @@
#include <windows.h>
#endif
const char * LLAMA_ASCII_LOGO = R"(
)";
static std::atomic<bool> g_is_interrupted = false;
static bool should_stop() {
return g_is_interrupted.load();
}
#if defined (__unix__) || (defined (__APPLE__) && defined (__MACH__)) || defined (_WIN32)
static void signal_handler(int) {
if (g_is_interrupted.load()) {
if (g_cli_interrupted.load()) {
// second Ctrl+C - exit immediately
// make sure to clear colors before exiting (not using LOG or console.cpp here to avoid deadlock)
fprintf(stdout, "\033[0m\n");
fflush(stdout);
std::exit(130);
}
g_is_interrupted.store(true);
g_cli_interrupted.store(true);
}
#endif
struct cli_context {
server_context ctx_server;
json messages = json::array();
std::vector<raw_buffer> input_files;
task_params defaults;
bool verbose_prompt;
// thread for showing "loading" animation
std::atomic<bool> loading_show;
cli_context(const common_params & params) {
defaults.sampling = params.sampling;
defaults.speculative = params.speculative;
defaults.n_keep = params.n_keep;
defaults.n_predict = params.n_predict;
defaults.antiprompt = params.antiprompt;
defaults.stream = true; // make sure we always use streaming mode
defaults.timings_per_token = true; // in order to get timings even when we cancel mid-way
// defaults.return_progress = true; // TODO: show progress
verbose_prompt = params.verbose_prompt;
}
std::string generate_completion(result_timings & out_timings) {
server_response_reader rd = ctx_server.get_response_reader();
auto chat_params = format_chat();
{
// TODO: reduce some copies here in the future
server_task task = server_task(SERVER_TASK_TYPE_COMPLETION);
task.id = rd.get_new_id();
task.index = 0;
task.params = defaults; // copy
task.cli_prompt = chat_params.prompt; // copy
task.cli_files = input_files; // copy
task.cli = true;
// chat template settings
task.params.chat_parser_params = common_chat_parser_params(chat_params);
task.params.chat_parser_params.reasoning_format = COMMON_REASONING_FORMAT_DEEPSEEK;
if (!chat_params.parser.empty()) {
task.params.chat_parser_params.parser.load(chat_params.parser);
}
// Copy the preserved tokens into the sampling params
const llama_vocab * vocab = llama_model_get_vocab(
llama_get_model(ctx_server.get_llama_context()));
for (const auto & token : chat_params.preserved_tokens) {
auto ids = common_tokenize(vocab, token, false, true);
if (ids.size() == 1) {
task.params.sampling.preserved_tokens.insert(ids[0]);
}
}
// reasoning budget sampler
if (!chat_params.thinking_end_tag.empty()) {
task.params.sampling.reasoning_budget_tokens = defaults.sampling.reasoning_budget_tokens;
task.params.sampling.generation_prompt = chat_params.generation_prompt;
if (!chat_params.thinking_start_tag.empty()) {
task.params.sampling.reasoning_budget_start =
common_tokenize(vocab, chat_params.thinking_start_tag, false, true);
}
task.params.sampling.reasoning_budget_end =
common_tokenize(vocab, chat_params.thinking_end_tag, false, true);
task.params.sampling.reasoning_budget_forced =
common_tokenize(vocab, defaults.sampling.reasoning_budget_message + chat_params.thinking_end_tag, false, true);
}
rd.post_task({std::move(task)});
}
if (verbose_prompt) {
console::set_display(DISPLAY_TYPE_PROMPT);
console::log("%s\n\n", chat_params.prompt.c_str());
console::set_display(DISPLAY_TYPE_RESET);
}
// wait for first result
console::spinner::start();
server_task_result_ptr result = rd.next(should_stop);
while (true) {
auto res_partial = dynamic_cast<server_task_result_cmpl_partial *>(result.get());
if (res_partial && res_partial->is_begin) {
// this is the "send 200 status to client" signal in streaming mode
// skip, do not stop the spinner
result = rd.next(should_stop);
} else {
console::spinner::stop();
break;
}
}
std::string curr_content;
bool is_thinking = false;
while (result) {
if (should_stop()) {
break;
}
if (result->is_error()) {
json err_data = result->to_json();
if (err_data.contains("message")) {
console::error("Error: %s\n", err_data["message"].get<std::string>().c_str());
} else {
console::error("Error: %s\n", err_data.dump().c_str());
}
return curr_content;
}
auto res_partial = dynamic_cast<server_task_result_cmpl_partial *>(result.get());
if (res_partial) {
out_timings = std::move(res_partial->timings);
for (const auto & diff : res_partial->oaicompat_msg_diffs) {
if (!diff.content_delta.empty()) {
if (is_thinking) {
console::log("\n[End thinking]\n\n");
console::set_display(DISPLAY_TYPE_RESET);
is_thinking = false;
}
curr_content += diff.content_delta;
console::log("%s", diff.content_delta.c_str());
console::flush();
}
if (!diff.reasoning_content_delta.empty()) {
console::set_display(DISPLAY_TYPE_REASONING);
if (!is_thinking) {
console::log("[Start thinking]\n");
}
is_thinking = true;
console::log("%s", diff.reasoning_content_delta.c_str());
console::flush();
}
}
}
auto res_final = dynamic_cast<server_task_result_cmpl_final *>(result.get());
if (res_final) {
out_timings = std::move(res_final->timings);
break;
}
result = rd.next(should_stop);
}
g_is_interrupted.store(false);
// server_response_reader automatically cancels pending tasks upon destruction
return curr_content;
}
// TODO: support remote files in the future (http, https, etc)
std::string load_input_file(const std::string & fname, bool is_media) {
std::ifstream file = fs_open_ifstream(fname, std::ios::binary);
if (!file) {
return "";
}
if (is_media) {
raw_buffer buf;
buf.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
input_files.push_back(std::move(buf));
return get_media_marker();
} else {
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
return content;
}
}
common_chat_params format_chat() {
auto meta = ctx_server.get_meta();
auto & chat_params = meta.chat_params;
auto caps = common_chat_templates_get_caps(chat_params.tmpls.get());
common_chat_templates_inputs inputs;
inputs.messages = common_chat_msgs_parse_oaicompat(messages);
inputs.tools = {}; // TODO
inputs.tool_choice = COMMON_CHAT_TOOL_CHOICE_NONE;
inputs.json_schema = ""; // TODO
inputs.grammar = ""; // TODO
inputs.use_jinja = chat_params.use_jinja;
inputs.parallel_tool_calls = caps["supports_parallel_tool_calls"];
inputs.add_generation_prompt = true;
inputs.reasoning_format = COMMON_REASONING_FORMAT_DEEPSEEK;
inputs.force_pure_content = chat_params.force_pure_content;
inputs.enable_thinking = chat_params.enable_thinking ? common_chat_templates_support_enable_thinking(chat_params.tmpls.get()) : false;
// Apply chat template to the list of messages
return common_chat_templates_apply(chat_params.tmpls.get(), inputs);
}
};
// TODO?: Make this reusable, enums, docs
static const std::array<std::string_view, 8> cmds = {
"/audio ",
@@ -359,8 +150,6 @@ static std::vector<std::pair<std::string, size_t>> auto_completion_callback(std:
return matches;
}
static constexpr size_t FILE_GLOB_MAX_RESULTS = 100;
// satisfies -Wmissing-declarations
int llama_cli(int argc, char ** argv);
@@ -375,24 +164,7 @@ int llama_cli(int argc, char ** argv) {
return 1;
}
// TODO: maybe support it later?
if (params.conversation_mode == COMMON_CONVERSATION_MODE_DISABLED) {
console::error("--no-conversation is not supported by llama-cli\n");
console::error("please use llama-completion instead\n");
}
// struct that contains llama context and inference
cli_context ctx_cli(params);
llama_backend_init();
llama_numa_init(params.numa);
// TODO: avoid using atexit() here by making `console` a singleton
console::init(params.simple_io, params.use_color);
atexit([]() { console::cleanup(); });
console::set_display(DISPLAY_TYPE_RESET);
console::set_completion_callback(auto_completion_callback);
view::set_completion_callback(auto_completion_callback);
#if defined (__unix__) || (defined (__APPLE__) && defined (__MACH__))
struct sigaction sigint_action;
@@ -408,273 +180,16 @@ int llama_cli(int argc, char ** argv) {
SetConsoleCtrlHandler(reinterpret_cast<PHANDLER_ROUTINE>(console_ctrl_handler), true);
#endif
console::log("\nLoading model... "); // followed by loading animation
console::spinner::start();
if (!ctx_cli.ctx_server.load_model(params)) {
console::spinner::stop();
console::error("\nFailed to load the model\n");
cli_context ctx_cli(params);
if (!ctx_cli.init()) {
ctx_cli.shutdown();
return 1;
}
ctx_cli.defaults.sampling = params.sampling;
int ret = ctx_cli.run();
console::spinner::stop();
console::log("\n");
ctx_cli.shutdown();
std::thread inference_thread([&ctx_cli]() {
ctx_cli.ctx_server.start_loop();
});
auto inf = ctx_cli.ctx_server.get_meta();
std::string modalities = "text";
if (inf.has_inp_image) {
modalities += ", vision";
}
if (inf.has_inp_audio) {
modalities += ", audio";
}
auto add_system_prompt = [&]() {
if (!params.system_prompt.empty()) {
ctx_cli.messages.push_back({
{"role", "system"},
{"content", params.system_prompt}
});
}
};
add_system_prompt();
console::log("\n");
console::log("%s\n", LLAMA_ASCII_LOGO);
console::log("build : %s\n", inf.build_info.c_str());
console::log("model : %s\n", inf.model_name.c_str());
console::log("modalities : %s\n", modalities.c_str());
if (!params.system_prompt.empty()) {
console::log("using custom system prompt\n");
}
console::log("\n");
console::log("available commands:\n");
console::log(" /exit or Ctrl+C stop or exit\n");
console::log(" /regen regenerate the last response\n");
console::log(" /clear clear the chat history\n");
console::log(" /read <file> add a text file\n");
console::log(" /glob <pattern> add text files using globbing pattern\n");
if (inf.has_inp_image) {
console::log(" /image <file> add an image file\n");
}
if (inf.has_inp_audio) {
console::log(" /audio <file> add an audio file\n");
}
if (inf.has_inp_video) {
console::log(" /video <file> add a video file\n");
}
console::log("\n");
// interactive loop
std::string cur_msg;
auto add_text_file = [&](const std::string & fname) -> bool {
std::string marker = ctx_cli.load_input_file(fname, false);
if (marker.empty()) {
console::error("file does not exist or cannot be opened: '%s'\n", fname.c_str());
return false;
}
if (inf.fim_sep_token != LLAMA_TOKEN_NULL) {
cur_msg += common_token_to_piece(ctx_cli.ctx_server.get_llama_context(), inf.fim_sep_token, true);
cur_msg += fname;
cur_msg.push_back('\n');
} else {
cur_msg += "--- File: ";
cur_msg += fname;
cur_msg += " ---\n";
}
cur_msg += marker;
console::log("Loaded text from '%s'\n", fname.c_str());
return true;
};
while (true) {
std::string buffer;
console::set_display(DISPLAY_TYPE_USER_INPUT);
if (params.prompt.empty()) {
console::log("\n> ");
std::string line;
bool another_line = true;
do {
another_line = console::readline(line, params.multiline_input);
buffer += line;
} while (another_line);
} else {
// process input prompt from args
for (auto & fname : params.image) {
std::string marker = ctx_cli.load_input_file(fname, true);
if (marker.empty()) {
console::error("file does not exist or cannot be opened: '%s'\n", fname.c_str());
break;
}
console::log("Loaded media from '%s'\n", fname.c_str());
cur_msg += marker;
}
buffer = params.prompt;
if (buffer.size() > 500) {
console::log("\n> %s ... (truncated)\n", buffer.substr(0, 500).c_str());
} else {
console::log("\n> %s\n", buffer.c_str());
}
params.prompt.clear(); // only use it once
}
console::set_display(DISPLAY_TYPE_RESET);
console::log("\n");
if (should_stop()) {
g_is_interrupted.store(false);
break;
}
// remove trailing newline
if (!buffer.empty() &&buffer.back() == '\n') {
buffer.pop_back();
}
// skip empty messages
if (buffer.empty()) {
continue;
}
bool add_user_msg = true;
// process commands
if (string_starts_with(buffer, "/exit")) {
break;
} else if (string_starts_with(buffer, "/regen")) {
if (ctx_cli.messages.size() >= 2) {
size_t last_idx = ctx_cli.messages.size() - 1;
ctx_cli.messages.erase(last_idx);
add_user_msg = false;
} else {
console::error("No message to regenerate.\n");
continue;
}
} else if (string_starts_with(buffer, "/clear")) {
ctx_cli.messages.clear();
add_system_prompt();
ctx_cli.input_files.clear();
console::log("Chat history cleared.\n");
continue;
} else if (
(string_starts_with(buffer, "/image ") && inf.has_inp_image) ||
(string_starts_with(buffer, "/audio ") && inf.has_inp_audio) ||
(string_starts_with(buffer, "/video ") && inf.has_inp_video)) {
// just in case (bad copy-paste for example), we strip all trailing/leading spaces
std::string fname = string_strip(buffer.substr(7));
std::string marker = ctx_cli.load_input_file(fname, true);
if (marker.empty()) {
console::error("file does not exist or cannot be opened: '%s'\n", fname.c_str());
continue;
}
cur_msg += marker;
console::log("Loaded media from '%s'\n", fname.c_str());
continue;
} else if (string_starts_with(buffer, "/read ")) {
std::string fname = string_strip(buffer.substr(6));
add_text_file(fname);
continue;
} else if (string_starts_with(buffer, "/glob ")) {
std::error_code ec;
size_t count = 0;
auto curdir = std::filesystem::current_path();
std::string pattern = string_strip(buffer.substr(6));
std::filesystem::path rel_path;
auto startglob = pattern.find_first_of("![*?");
if (startglob != std::string::npos && startglob != 0) {
auto endpath = pattern.substr(0, startglob).find_last_of('/');
if (endpath != std::string::npos) {
std::string rel_pattern = pattern.substr(0, endpath);
#if !defined(_WIN32)
if (string_starts_with(rel_pattern, '~')) {
const char * home = std::getenv("HOME");
if (home && home[0]) {
rel_pattern = home + rel_pattern.substr(1);
}
}
#endif
rel_path = rel_pattern;
pattern.erase(0, endpath + 1);
curdir /= rel_path;
}
}
for (const auto & entry : std::filesystem::recursive_directory_iterator(curdir,
std::filesystem::directory_options::skip_permission_denied, ec)) {
if (!entry.is_regular_file()) {
continue;
}
std::string rel = std::filesystem::relative(entry.path(), curdir, ec).string();
if (ec) {
ec.clear();
continue;
}
std::replace(rel.begin(), rel.end(), '\\', '/');
if (!glob_match(pattern, rel)) {
continue;
}
if (!add_text_file((rel_path / rel).string())) {
continue;
}
if (++count >= FILE_GLOB_MAX_RESULTS) {
console::error("Maximum number of globbed files allowed (%zu) reached.\n", FILE_GLOB_MAX_RESULTS);
break;
}
}
continue;
} else {
// not a command
cur_msg += buffer;
}
// generate response
if (add_user_msg) {
ctx_cli.messages.push_back({
{"role", "user"},
{"content", cur_msg}
});
cur_msg.clear();
}
result_timings timings;
std::string assistant_content = ctx_cli.generate_completion(timings);
ctx_cli.messages.push_back({
{"role", "assistant"},
{"content", assistant_content}
});
console::log("\n");
if (params.show_timings) {
console::set_display(DISPLAY_TYPE_INFO);
console::log("\n");
console::log("[ Prompt: %.1f t/s | Generation: %.1f t/s ]\n", timings.prompt_per_second, timings.predicted_per_second);
console::set_display(DISPLAY_TYPE_RESET);
}
if (params.single_turn) {
break;
}
}
console::set_display(DISPLAY_TYPE_RESET);
console::log("\nExiting...\n");
ctx_cli.ctx_server.terminate();
inference_thread.join();
// bump the log level to display timings
common_log_set_verbosity_thold(LOG_LEVEL_INFO);
common_memory_breakdown_print(ctx_cli.ctx_server.get_llama_context());
return 0;
return ret;
}
+3 -3
View File
@@ -204,9 +204,9 @@ Instead of building everything from the ground up (like what most AI agents will
The flow for downloading a new model:
- POST request comes in --> `post_router_models` --> validation
- `server_models::download()` is called
- Sets up a new thread `inst.th` and runs the download inside
- If a stop request comes in, set `stop_download` to `true`
- A new `llama-server` subprocess will be spawned with special `SERVER_CHILD_MODE_DOWNLOAD`
- Child process runs the download and report status back to router via stdin/out
- If a stop request comes in, the router asks the child process to stop (same mechanism as running a model in child process)
- Otherwise, upon completion, we call `load_models()` to refresh the list of models
### Notable Related PRs
+12 -5
View File
@@ -1230,8 +1230,6 @@ print(completion.choices[0].text)
Given a ChatML-formatted json description in `messages`, it returns the predicted completion. Both synchronous and streaming mode are supported, so scripted and interactive applications work fine. While no strong claims of compatibility with OpenAI API spec is being made, in our experience it suffices to support many apps. Only models with a [supported chat template](https://github.com/ggml-org/llama.cpp/wiki/Templates-supported-by-llama_chat_apply_template) can be used optimally with this endpoint. By default, the ChatML template will be used.
If model supports multimodal, you can input the media file via `image_url` content part. We support both base64 and remote URL as input. See OAI documentation for more.
*Options:*
See [OpenAI Chat Completions API documentation](https://platform.openai.com/docs/api-reference/chat). llama.cpp `/completion`-specific features such as `mirostat` are also supported.
@@ -1250,9 +1248,18 @@ The `response_format` parameter supports both plain JSON output (e.g. `{"type":
`parallel_tool_calls` : Whether to enable parallel/multiple tool calls (only supported on some models, verification is based on jinja template).
For multimodal input:
- Content type `image_url` and `input_audio` are the same as OAI schema
- Content type `input_video` is an extension from OAI schema. For now, it only accepts base64 input
For multimodal input (typed content, `messages[i].content[j]`):
- If `type == "image_url"`:
- `image_url.url` can be a remote URL, base64 (raw or URI-encoded via `data:image/...;base64`) or path to local file
- Accepts formats supported by `stb_image` (jpeg, png, tga, bmp, gif, ...)
- If `type == "input_audio"`:
- Either `input_audio.data` or `input_audio.url` can be specified, can be a remote URL, raw base64 or path to local file
- Accepts formats supported by `miniaudio` (mp3, wav, flac)
- `input_audio.format` will be ignored, the file format will be determined automatically
- If `type == "input_video"`:
- Either `input_video.data` or `input_video.url` can be specified, can be a remote URL, raw base64 or path to local file
- Accepts formats supported by `ffmpeg`
- Note: for local file, make sure to set `--media-path`. File path must be prefixed by `file://`
*Examples:*
+37 -25
View File
@@ -817,12 +817,21 @@ json oaicompat_completion_params_parse(const json & body) {
return llama_params;
}
// media_path always end with '/', see arg.cpp
// url can be
// - http(s):// for remote files
// - file:// for local files (only allowed if media_path is set)
// - data: for base64 encoded data with uri scheme (e.g. data:image/png;base64,...)
// - raw base64 encoded data
static void handle_media(
std::vector<raw_buffer> & out_files,
json & media_obj,
const std::string & media_path) {
std::string url = json_value(media_obj, "url", std::string());
const std::string & url,
const std::string & media_path,
bool accept_base64_uri) {
if (!media_path.empty()) {
// should already be enforced by arg.cpp, but checking just in case
GGML_ASSERT(media_path.back() == DIRECTORY_SEPARATOR);
}
if (string_starts_with(url, "http")) {
// download remote image
// TODO @ngxson : maybe make these params configurable
@@ -858,20 +867,28 @@ static void handle_media(
data.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
out_files.push_back(data);
} else {
} else if (accept_base64_uri && string_starts_with(url, "data:")) {
// try to decode base64 image
std::vector<std::string> parts = string_split<std::string>(url, /*separator*/ ',');
if (parts.size() != 2) {
throw std::runtime_error("Invalid url value");
throw std::runtime_error("Invalid uri-encoded base64 value");
} else if (!string_starts_with(parts[0], "data:image/")) {
throw std::runtime_error("Invalid url format: " + parts[0]);
throw std::runtime_error("Invalid uri format: " + parts[0]);
} else if (!string_ends_with(parts[0], "base64")) {
throw std::runtime_error("url must be base64 encoded");
throw std::runtime_error("uri must be base64 encoded");
} else {
auto base64_data = parts[1];
auto decoded_data = base64_decode(base64_data);
out_files.push_back(decoded_data);
}
} else {
// try as raw base64 string
auto decoded_data = base64_decode(url);
if (decoded_data.empty()) {
throw std::runtime_error("Invalid base64 value");
}
out_files.push_back(decoded_data);
}
}
@@ -957,14 +974,15 @@ json oaicompat_chat_params_parse(
}
for (auto & p : content) {
std::string type = json_value(p, "type", std::string());
std::string type = json_value(p, "type", std::string());
if (type == "image_url") {
if (!opt.allow_image) {
throw std::runtime_error("image input is not supported - hint: if this is unexpected, you may need to provide the mmproj");
}
json image_url = json_value(p, "image_url", json::object());
handle_media(out_files, image_url, opt.media_path);
std::string url = json_value(image_url, "url", std::string());
handle_media(out_files, url, opt.media_path, true);
p["type"] = "media_marker";
p["text"] = get_media_marker();
@@ -975,17 +993,11 @@ json oaicompat_chat_params_parse(
throw std::runtime_error("audio input is not supported - hint: if this is unexpected, you may need to provide the mmproj");
}
json input_audio = json_value(p, "input_audio", json::object());
std::string data = json_value(input_audio, "data", std::string());
std::string format = json_value(input_audio, "format", std::string());
// while we also support flac, we don't allow it here so we matches the OAI spec
if (format != "wav" && format != "mp3") {
throw std::invalid_argument("input_audio.format must be either 'wav' or 'mp3'");
}
auto decoded_data = base64_decode(data); // expected to be base64 encoded
out_files.push_back(decoded_data);
// TODO: add audio_url support by reusing handle_media()
// note: don't need to validate "format", it's redundant
json input_audio = json_value(p, "input_audio", json::object());
std::string url = json_value(input_audio, "data",
json_value(input_audio, "url", std::string()));
handle_media(out_files, url, opt.media_path, false);
p["type"] = "media_marker";
p["text"] = get_media_marker();
@@ -996,10 +1008,10 @@ json oaicompat_chat_params_parse(
throw std::runtime_error("video input is not supported - hint: if this is unexpected, you may need to provide the mmproj");
}
json input_video = json_value(p, "input_video", json::object());
std::string data = json_value(input_video, "data", std::string());
auto decoded_data = base64_decode(data); // expected to be base64 encoded
out_files.push_back(decoded_data);
json input_video = json_value(p, "input_video", json::object());
std::string url = json_value(input_video, "data",
json_value(input_video, "url", std::string()));
handle_media(out_files, url, opt.media_path, false);
p["type"] = "media_marker";
p["text"] = get_media_marker();
+6
View File
@@ -931,6 +931,8 @@ private:
bool sleeping = false;
int64_t t_last_load_progress_ms = 0;
void destroy() {
spec.reset();
ctx_dft.reset();
@@ -1244,6 +1246,10 @@ private:
}
if (has_mmproj) {
if (callback_state) {
callback_state(SERVER_STATE_LOADING, {{"stage", "mmproj_model"}});
}
if (!is_resume) {
mtmd_helper_log_set(common_log_default_callback, nullptr);
}
+3 -1
View File
@@ -53,7 +53,7 @@ struct server_context_meta {
};
enum server_state {
// SERVER_STATE_DOWNLOADING,
SERVER_STATE_DOWNLOADING,
SERVER_STATE_LOADING,
SERVER_STATE_READY,
SERVER_STATE_SLEEPING,
@@ -61,6 +61,7 @@ enum server_state {
static std::string server_state_to_str(server_state state) {
switch (state) {
case SERVER_STATE_DOWNLOADING: return "downloading";
case SERVER_STATE_LOADING: return "loading";
case SERVER_STATE_READY: return "ready";
case SERVER_STATE_SLEEPING: return "sleeping";
@@ -69,6 +70,7 @@ static std::string server_state_to_str(server_state state) {
}
static server_state server_state_from_str(const std::string & str) {
if (str == "downloading") return SERVER_STATE_DOWNLOADING;
if (str == "loading") return SERVER_STATE_LOADING;
if (str == "ready") return SERVER_STATE_READY;
if (str == "sleeping") return SERVER_STATE_SLEEPING;
+233 -199
View File
@@ -5,6 +5,7 @@
#include "build-info.h"
#include "preset.h"
#include "download.h"
#include "http.h"
#include <cpp-httplib/httplib.h> // TODO: remove this once we use HTTP client from download.h
#include <sheredom/subprocess.h>
@@ -25,14 +26,7 @@
#include <sstream>
#include <cstring>
#ifdef _WIN32
#include <winsock2.h>
#include <windows.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#ifndef _WIN32
extern char **environ;
#endif
@@ -64,6 +58,17 @@ struct server_subproc {
return sproc.has_value() && subprocess_alive(&sproc.value());
}
void request_exit() {
if (sproc.has_value()) {
FILE * stdin_file = subprocess_stdin(&sproc.value());
if (stdin_file) {
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
fflush(stdin_file);
}
}
stopped.store(true, std::memory_order_relaxed);
}
void terminate() {
if (!sproc.has_value()) {
return;
@@ -323,7 +328,7 @@ void server_models::notify_sse(const std::string & event, const std::string & mo
}
void server_models::load_models() {
// Phase 1: load presets from all sources pure I/O, no lock needed
// Phase 1: load presets from all sources - pure I/O, no lock needed
// 1. cached models
common_presets cached_models = ctx_preset.load_from_cache();
SRV_INF("Loaded %zu cached model presets\n", cached_models.size());
@@ -376,7 +381,7 @@ void server_models::load_models() {
return source_map.count(name) ? source_map.at(name) : SERVER_MODEL_SOURCE_PRESET;
};
// Helpers that read `mapping` must be called while holding the lock.
// Helpers that read `mapping` - must be called while holding the lock.
std::unordered_set<std::string> custom_names;
for (const auto & [name, preset] : custom_presets) custom_names.insert(name);
auto join_set = [](const std::set<std::string> & s) {
@@ -523,7 +528,7 @@ void server_models::load_models() {
}
}
// join outside the lock monitoring thread calls update_status (needs lock)
// join outside the lock - monitoring thread calls update_status (needs lock)
lk.unlock();
for (auto & th : threads_to_join) th.join();
lk.lock();
@@ -622,7 +627,7 @@ void server_models::load_models() {
apply_stop_timeout();
// clear reload flag before unlocking for autoload load() blocks on !is_reloading,
// clear reload flag before unlocking for autoload - load() blocks on !is_reloading,
// so clearing it here (while still locked) prevents a deadlock in the autoload calls below
is_reloading = false;
cv.notify_all();
@@ -693,66 +698,6 @@ std::optional<server_model_meta> server_models::get_meta(const std::string & nam
return std::nullopt;
}
static int get_free_port() {
#ifdef _WIN32
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
return -1;
}
typedef SOCKET native_socket_t;
#define INVALID_SOCKET_VAL INVALID_SOCKET
#define CLOSE_SOCKET(s) closesocket(s)
#else
typedef int native_socket_t;
#define INVALID_SOCKET_VAL -1
#define CLOSE_SOCKET(s) close(s)
#endif
native_socket_t sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET_VAL) {
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
struct sockaddr_in serv_addr;
std::memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(0);
if (bind(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) != 0) {
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
#ifdef _WIN32
int namelen = sizeof(serv_addr);
#else
socklen_t namelen = sizeof(serv_addr);
#endif
if (getsockname(sock, (struct sockaddr*)&serv_addr, &namelen) != 0) {
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
int port = ntohs(serv_addr.sin_port);
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return port;
}
// helper to convert vector<string> to char **
// pointers are only valid as long as the original vector is valid
static std::vector<char *> to_char_ptr_array(const std::vector<std::string> & vec) {
@@ -815,17 +760,23 @@ void server_models::unload_lru() {
}
void server_models::load(const std::string & name) {
if (!has_model(name)) {
throw std::runtime_error("model name=" + name + " is not found");
load(name, load_options{});
}
void server_models::load(const std::string & name, const load_options & opts) {
if (!opts.custom_meta.has_value()) {
if (!has_model(name)) {
throw std::runtime_error("model name=" + name + " is not found");
}
unload_lru();
}
unload_lru();
std::unique_lock<std::mutex> lk(mutex);
// edge case: block until any in-progress reload has finished so we always load
// against the freshest preset and a consistent mapping state
cv.wait(lk, [this]() { return !is_reloading; });
auto meta = mapping[name].meta;
auto meta = opts.custom_meta.has_value() ? *opts.custom_meta : mapping[name].meta;
if (meta.status != SERVER_MODEL_STATUS_UNLOADED) {
SRV_INF("model %s is not ready\n", name.c_str());
return;
@@ -850,7 +801,7 @@ void server_models::load(const std::string & name) {
// prepare new instance info
instance_t inst;
inst.meta = meta;
inst.meta.port = get_free_port();
inst.meta.port = common_http_get_free_port();
inst.meta.status = SERVER_MODEL_STATUS_LOADING;
inst.meta.loaded_info = json{};
inst.meta.last_used = ggml_time_ms();
@@ -869,6 +820,12 @@ void server_models::load(const std::string & name) {
std::vector<std::string> child_env = base_env; // copy
child_env.push_back("LLAMA_SERVER_ROUTER_PORT=" + std::to_string(base_params.port));
if (opts.mode == SERVER_CHILD_MODE_DOWNLOAD) {
inst.meta.status = SERVER_MODEL_STATUS_DOWNLOADING;
child_env.push_back("LLAMA_SERVER_CHILD_MODE=download");
child_env.push_back("LLAMA_ARG_HF_REPO=" + name);
}
SRV_INF("%s", "spawning server instance with args:\n");
for (const auto & arg : child_args) {
SRV_INF(" %s\n", arg.c_str());
@@ -886,13 +843,17 @@ void server_models::load(const std::string & name) {
if (result != 0) {
throw std::runtime_error("failed to spawn server instance");
}
inst.stdin_file = subprocess_stdin(&inst.subproc->get());
}
// start a thread to manage the child process
// captured variables are guaranteed to be destroyed only after the thread is joined
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port, stop_timeout = inst.meta.stop_timeout]() {
inst.th = std::thread([
this, name,
child_proc = inst.subproc,
port = inst.meta.port,
stop_timeout = inst.meta.stop_timeout,
child_mode = opts.mode
]() {
FILE * stdin_file = subprocess_stdin(&child_proc->get());
FILE * stdout_file = subprocess_stdout(&child_proc->get()); // combined stdout/stderr
@@ -925,7 +886,7 @@ void server_models::load(const std::string & name) {
return is_stopping() || child_proc->stopped.load(std::memory_order_acquire);
});
}
// child crashed or finished on its own skip graceful shutdown sequence
// child crashed or finished on its own, skip graceful shutdown sequence
if (child_proc->stopped.load(std::memory_order_acquire)) {
return;
}
@@ -973,10 +934,14 @@ void server_models::load(const std::string & name) {
subprocess_destroy(&child_proc->get());
// update status and exit code
this->update_status(name, {
SERVER_MODEL_STATUS_UNLOADED,
exit_code
});
if (child_mode == SERVER_CHILD_MODE_DOWNLOAD) {
// instance will be cleaned up on next load_models() call
} else {
this->update_status(name, {
SERVER_MODEL_STATUS_UNLOADED,
exit_code
});
}
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
});
@@ -984,7 +949,7 @@ void server_models::load(const std::string & name) {
{
auto & old_instance = mapping[name];
// old process should have exited already, but just in case, we clean it up here
if (old_instance.subproc->is_alive()) {
if (old_instance.subproc && old_instance.subproc->is_alive()) {
SRV_WRN("old process for model name=%s is still alive, this is unexpected\n", name.c_str());
old_instance.subproc->terminate(); // force kill
}
@@ -1001,92 +966,13 @@ void server_models::load(const std::string & name) {
cv.notify_all();
}
// callback for model downloading functionality
struct server_models_download_res : public common_download_callback {
common_params_model model;
common_download_opts opts;
std::function<bool()> should_stop;
std::function<void(const common_download_progress & p)> on_progress;
bool is_ok = false;
bool run() {
try {
common_download_model(model, opts);
is_ok = true;
} catch (const std::exception & e) {
auto model_name = model.get_name();
SRV_ERR("download failed for model name=%s: %s\n", model_name.c_str(), e.what());
is_ok = false;
}
return is_ok;
}
void on_start(const common_download_progress & p) override {
on_progress(p);
}
void on_update(const common_download_progress & p) override {
on_progress(p);
}
void on_done(const common_download_progress &, bool ok) override {
is_ok = ok;
}
bool is_cancelled() const override {
return should_stop();
}
};
void server_models::download(common_params_model && model, common_download_opts && opts) {
std::string name = model.get_name();
GGML_ASSERT(name == model.hf_repo);
std::unique_lock<std::mutex> lk(mutex);
if (mapping.find(name) != mapping.end()) {
throw std::runtime_error("model name=" + name + " already exists");
}
instance_t inst;
inst.meta.name = name;
inst.meta.status = SERVER_MODEL_STATUS_DOWNLOADING;
inst.subproc = std::make_shared<server_subproc>();
auto dl = std::make_unique<server_models_download_res>();
dl->model = model; // copy
dl->opts = opts; // copy
dl->should_stop = [sp = inst.subproc]() {
return sp->stopped.load(std::memory_order_relaxed);
};
dl->on_progress = [this, name](const common_download_progress & p) {
update_download_progress(name, p, false);
};
inst.th = std::thread([this, dl = std::move(dl)]() {
dl->opts.callback = dl.get();
bool ok = dl->run();
auto model_name = dl->model.get_name();
SRV_INF("download finished for model name=%s with status=%s\n",
model_name.c_str(), ok ? "success" : "failure");
update_download_progress(model_name, {}, true, ok);
// need_reload is set inside update_download_progress under the mutex;
// the next load_models() call will clean up this instance
});
mapping[name] = std::move(inst);
notify_sse("status_update", name, {
{"status", server_model_status_to_string(SERVER_MODEL_STATUS_DOWNLOADING)},
});
cv.notify_all();
}
void server_models::unload(const std::string & name) {
std::unique_lock<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
if (it->second.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) {
SRV_INF("cancelling download for model name=%s\n", name.c_str());
it->second.subproc->stopped.store(true, std::memory_order_relaxed);
it->second.subproc->request_exit();
// for convenience, we wait the status change here
wait(lk, name, [](const server_model_meta & new_meta) {
return new_meta.status != SERVER_MODEL_STATUS_DOWNLOADING;
@@ -1198,37 +1084,65 @@ void server_models::update_download_progress(const std::string & name, const com
}
bool server_models::remove(const std::string & name) {
auto meta = get_meta(name);
// do everything under one lock acquisition; avoid get_meta() /
// unload() because they can trigger load_models() which erases
// transient DOWNLOADING / DOWNLOADED entries as a side-effect
std::unique_lock<std::mutex> lk(mutex);
if (!meta.has_value()) {
auto it = mapping.find(name);
if (it == mapping.end()) {
throw std::runtime_error("model name=" + name + " is not found");
}
if (meta->source != SERVER_MODEL_SOURCE_CACHE) {
if (it->second.meta.source != SERVER_MODEL_SOURCE_CACHE) {
throw std::runtime_error("model name=" + name + " is not removable (not from cache)");
}
unload(name); // cancel download or stop running instance
{
std::unique_lock<std::mutex> lk(mutex);
// a cancelled download lands on DOWNLOADED; a stopped instance lands on UNLOADED
wait(lk, name, [](const server_model_meta & new_meta) {
return new_meta.status == SERVER_MODEL_STATUS_UNLOADED
|| new_meta.status == SERVER_MODEL_STATUS_DOWNLOADED;
});
// join before erasing - after status reaches UNLOADED/DOWNLOADED the thread no
// longer acquires this mutex, so joining while holding it is safe
if (mapping[name].th.joinable()) {
mapping[name].th.join();
if (it->second.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) {
// cancel in-flight download
SRV_INF("cancelling download for model name=%s\n", name.c_str());
it->second.subproc->request_exit();
} else if (it->second.meta.is_running()) {
// stop running instance
SRV_INF("stopping model instance name=%s\n", name.c_str());
stopping_models.insert(name);
if (it->second.meta.status == SERVER_MODEL_STATUS_LOADING) {
it->second.subproc->terminate();
}
// remove the model from disk (hold lock to prevent concurrent load)
bool ok = common_download_remove(name);
if (ok) {
mapping.erase(name);
}
SRV_INF("removing model name=%s from cache (%s)\n", name.c_str(), ok ? "succeeded" : "failed");
notify_sse("model_remove", name, {});
return ok;
cv_stop.notify_all();
}
// wait until the monitoring thread finishes
wait(lk, name, [](const server_model_meta & meta) {
return meta.status == SERVER_MODEL_STATUS_UNLOADED
|| meta.status == SERVER_MODEL_STATUS_DOWNLOADED;
});
// re-find after wait - load_models() may have erased the entry during the wait
it = mapping.find(name);
if (it == mapping.end()) {
// load_models() already joined the thread and erased the entry;
// we just need to clean up the cached files on disk
lk.unlock();
bool ok = common_download_remove(name);
SRV_INF("removing model name=%s from cache (%s)\n", name.c_str(), ok ? "succeeded" : "partial");
notify_sse("model_remove", name, {});
return true;
}
// join before erasing - thread no longer acquires this mutex
if (it->second.th.joinable()) {
it->second.th.join();
}
// remove from disk (best-effort: cancelled downloads may have no cached files)
bool ok = common_download_remove(name);
mapping.erase(name);
if (!ok) {
SRV_WRN("removing model name=%s from disk returned false (no cached files?)\n", name.c_str());
}
SRV_INF("removing model name=%s from cache (%s)\n", name.c_str(), ok ? "succeeded" : "partial");
notify_sse("model_remove", name, {});
return true;
}
void server_models::wait(const std::string & name, std::function<bool(const server_model_meta &)> predicate) {
@@ -1243,7 +1157,9 @@ void server_models::wait(std::unique_lock<std::mutex> & lk, const std::string &
return predicate(it->second.meta);
}
return false;
// model was removed from mapping by another code path (e.g. load_models()).
// nothing left to wait for - tell the caller to proceed.
return true;
});
}
@@ -1328,6 +1244,31 @@ void server_models::handle_child_state(const std::string & name, const std::stri
}
switch (state) {
case SERVER_STATE_DOWNLOADING:
{
std::string result = json_value(payload, "result", std::string());
std::string url = json_value(payload, "url", std::string());
auto request_exit = [&]() {
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
return it->second.subproc->request_exit();
}
};
if (result == "download_finished") {
update_download_progress(name, {}, true, true);
request_exit();
} else if (result == "download_failed") {
update_download_progress(name, {}, true, false);
request_exit();
} else if (!url.empty()) {
common_download_progress p;
p.url = url;
p.downloaded = json_value(payload, "downloaded", (size_t)0);
p.total = json_value(payload, "total", (size_t)0);
update_download_progress(name, p, false);
}
} break;
case SERVER_STATE_LOADING:
{
update_status(name, {
@@ -1366,6 +1307,90 @@ bool server_child::is_child() {
return router_port != nullptr;
}
server_child_mode server_child::get_mode() {
const char * mode = std::getenv("LLAMA_SERVER_CHILD_MODE");
std::string mode_str(mode ? mode : "");
if (mode_str == "download") {
return SERVER_CHILD_MODE_DOWNLOAD;
} else {
return SERVER_CHILD_MODE_NORMAL;
}
}
struct server_download_state : public common_download_callback {
server_child * self;
std::function<bool()> should_stop;
std::atomic<int64_t> last_progress_time{0}; // multiple files downloading in different threads
bool is_ok = false;
server_download_state(server_child * s) : self(s) {}
bool run(common_params & params) {
try {
common_params_handle_models(params, LLAMA_EXAMPLE_SERVER, this);
is_ok = true;
} catch (const std::exception & e) {
auto model_name = params.model.get_name();
SRV_ERR("download failed for model name=%s: %s\n", model_name.c_str(), e.what());
is_ok = false;
}
return is_ok;
}
void on_progress(const common_download_progress & p) {
json data = {
{"url", p.url},
{"downloaded", p.downloaded},
{"total", p.total},
};
self->notify_to_router(server_state_to_str(SERVER_STATE_DOWNLOADING), data);
}
void on_start(const common_download_progress & p) override {
on_progress(p);
}
void on_update(const common_download_progress & p) override {
int64_t now = ggml_time_ms();
// throttle progress updates to avoid flooding logs
if (now - last_progress_time.load(std::memory_order_relaxed) >= 100) {
on_progress(p);
last_progress_time.store(now, std::memory_order_relaxed);
}
}
void on_done(const common_download_progress & p, bool) override {
on_progress(p);
}
bool is_cancelled() const override {
return should_stop ? should_stop() : false;
}
};
int server_child::run_download(common_params & params) {
auto cancelled = std::make_shared<std::atomic<bool>>(false);
// monitor stdin for cancellation command from the router
std::thread signal_thread = setup([cancelled](int) {
cancelled->store(true, std::memory_order_relaxed);
});
server_download_state dl(this);
dl.should_stop = [cancelled]() {
return cancelled->load(std::memory_order_relaxed);
};
bool ok = dl.run(params);
notify_to_router(server_state_to_str(SERVER_STATE_DOWNLOADING), {
{"result", ok ? "download_finished" : "download_failed"},
});
// router should send CMD_ROUTER_TO_CHILD_EXIT after receiving the result
if (signal_thread.joinable()) {
signal_thread.join();
}
SRV_INF("download completed %s\n", ok ? "successfully" : "with errors");
return 0;
}
std::thread server_child::setup(const std::function<void(int)> & shutdown_handler) {
// setup thread for monitoring stdin
return std::thread([shutdown_handler]() {
@@ -1639,7 +1664,7 @@ void server_models_routes::init_routes() {
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
return res;
}
if (!model->is_running()) {
if (!model->is_running() && model->status != SERVER_MODEL_STATUS_DOWNLOADING) {
res_err(res, format_error_response("model is not running", ERROR_TYPE_INVALID_REQUEST));
return res;
}
@@ -1680,8 +1705,9 @@ void server_models_routes::init_routes() {
model.hf_repo = name;
opts.bearer_token = params.hf_token;
opts.download_mmproj = true;
opts.download_mtp = true;
// note: we only check main model, no need sidecar here
opts.download_mmproj = false;
opts.download_mtp = false;
// first, only check if the model is valid and can be downloaded
opts.skip_download = true;
@@ -1702,10 +1728,21 @@ void server_models_routes::init_routes() {
throw std::invalid_argument("model validation failed, unable to download");
}
// reject if model already exists
if (models.has_model(name)) {
throw std::invalid_argument("model '" + name + "' already exists");
}
// then, proceed with the actual download
opts.skip_download = false;
SRV_INF("starting download for model '%s'\n", name.c_str());
models.download(std::move(model), std::move(opts));
{
server_models::load_options load_opts;
load_opts.mode = SERVER_CHILD_MODE_DOWNLOAD;
load_opts.custom_meta = server_model_meta{};
load_opts.custom_meta->source = SERVER_MODEL_SOURCE_CACHE;
load_opts.custom_meta->name = name;
models.load(name, load_opts);
}
res_ok(res, {{"success", true}});
return res;
@@ -1719,10 +1756,7 @@ void server_models_routes::init_routes() {
throw std::invalid_argument("model must be a non-empty string");
}
bool ok = models.remove(name);
if (!ok) {
throw std::runtime_error("failed to remove model '" + name + "'");
}
models.remove(name); // throws on error
res_ok(res, {{"success", true}});
return res;
+15 -5
View File
@@ -40,6 +40,11 @@ enum server_model_source {
SERVER_MODEL_SOURCE_CACHE,
};
enum server_child_mode {
SERVER_CHILD_MODE_NORMAL, // load the model and run normally
SERVER_CHILD_MODE_DOWNLOAD, // download the model and exit
};
static std::string server_model_status_to_string(server_model_status status) {
switch (status) {
case SERVER_MODEL_STATUS_DOWNLOADING: return "downloading";
@@ -105,7 +110,6 @@ private:
std::shared_ptr<server_subproc> subproc; // shared between main thread and monitoring thread
std::thread th;
server_model_meta meta;
FILE * stdin_file = nullptr;
};
std::mutex mutex;
@@ -161,16 +165,19 @@ public:
// return a copy of all model metadata (thread-safe)
std::vector<server_model_meta> get_all_meta();
struct load_options {
server_child_mode mode = SERVER_CHILD_MODE_NORMAL;
// used for spawning a downloading child process
std::optional<server_model_meta> custom_meta = std::nullopt;
};
// load and unload model instances
// these functions are thread-safe
void load(const std::string & name);
void load(const std::string & name, const load_options & opts);
void unload(const std::string & name);
void unload_all();
// download a new model, progress is reported via SSE
// to stop the download, call unload()
void download(common_params_model && model, common_download_opts && opts);
struct update_status_args {
server_model_status status;
int exit_code = 0; // only valid if status == UNLOADED
@@ -213,9 +220,12 @@ public:
struct server_child {
// serializes the notify_to_router writes
std::mutex mtx_stdout;
std::atomic<bool> is_finished_downloading = false; // set by run_download
// return true if the current process is a child server instance
bool is_child();
server_child_mode get_mode();
int run_download(common_params & params);
// register the shutdown_handler to be called by the router
// return the monitoring thread (to be joined by the caller)
+6 -3
View File
@@ -591,10 +591,11 @@ json server_task_result_cmpl_final::to_json_oaicompat_resp() {
for (const common_chat_tool_call & tool_call : oaicompat_msg.tool_calls) {
output.push_back(json {
{"id", "fc_" + tool_call.id},
{"type", "function_call"},
{"status", "completed"},
{"arguments", tool_call.arguments},
{"call_id", "fc_" + tool_call.id},
{"call_id", "call_" + tool_call.id},
{"name", tool_call.name},
});
}
@@ -690,10 +691,11 @@ json server_task_result_cmpl_final::to_json_oaicompat_resp_stream() {
for (const common_chat_tool_call & tool_call : oaicompat_msg.tool_calls) {
const json output_item = {
{"id", "fc_" + tool_call.id},
{"type", "function_call"},
{"status", "completed"},
{"arguments", tool_call.arguments},
{"call_id", "fc_" + tool_call.id},
{"call_id", "call_" + tool_call.id},
{"name", tool_call.name}
};
server_sent_events.push_back(json {
@@ -1277,8 +1279,9 @@ json server_task_result_cmpl_partial::to_json_oaicompat_resp() {
{"data", json {
{"type", "response.output_item.added"},
{"item", json {
{"id", "fc_" + diff.tool_call_delta.id},
{"arguments", ""},
{"call_id", "fc_" + diff.tool_call_delta.id},
{"call_id", "call_" + diff.tool_call_delta.id},
{"name", diff.tool_call_delta.name},
{"type", "function_call"},
{"status", "in_progress"},
+22 -4
View File
@@ -21,6 +21,12 @@
#include <windows.h>
#endif
// satisfies -Wmissing-declarations (used by llama command)
int llama_server(int argc, char ** argv);
// to be used via CLI (argc / argv are used by router mode only)
int llama_server(common_params & params, int argc, char ** argv);
static std::function<void(int)> shutdown_handler;
static std::atomic_flag is_terminating = ATOMIC_FLAG_INIT;
@@ -71,9 +77,6 @@ static server_http_context::handler_t ex_wrapper(server_http_context::handler_t
};
}
// satisfies -Wmissing-declarations
int llama_server(int argc, char ** argv);
int llama_server(int argc, char ** argv) {
std::setlocale(LC_NUMERIC, "C");
@@ -89,6 +92,10 @@ int llama_server(int argc, char ** argv) {
llama_backend_init();
llama_numa_init(params.numa);
return llama_server(params, argc, argv);
}
int llama_server(common_params & params, int argc, char ** argv) {
// router server never loads a model and must not touch the GPU
const bool is_router_server = params.model.path.empty()
&& params.model.hf_repo.empty();
@@ -134,6 +141,7 @@ int llama_server(int argc, char ** argv) {
//
// register API routes
server_child child; // only used in non-router mode
server_routes routes(params, ctx_server);
server_tools tools;
@@ -254,11 +262,21 @@ int llama_server(int argc, char ** argv) {
ctx_http.post("/tools", ex_wrapper(tools.handle_post));
}
//
// Handle downloading model
//
if (child.is_child() && child.get_mode() == SERVER_CHILD_MODE_DOWNLOAD) {
return child.run_download(params);
} else if (!is_router_server) {
// single-model mode (NOT spawned by router)
common_params_handle_models(params, LLAMA_EXAMPLE_SERVER);
}
//
// Start the server
//
server_child child; // only used in non-router mode
std::function<void()> clean_up;
if (is_router_server) {
+28 -7
View File
@@ -257,14 +257,25 @@ def test_router_reload_models():
MODEL_DOWNLOAD_ID = "ggml-org/test-model-router-download:F16"
MODEL_DOWNLOAD_TIMEOUT = 300
MODEL_DOWNLOAD_TIMEOUT = 30
def _listen_sse(server: ServerProcess, collected: list, stop: threading.Event):
"""Collect /models/sse events into `collected` until `stop` is set."""
def _listen_sse(
server: ServerProcess, collected: list, stop: threading.Event, ready: threading.Event | None = None
):
"""Collect /models/sse events into `collected` until `stop` is set.
When `ready` is provided, it is set once the streaming response is open,
i.e. the server has accepted the connection and registered us as a
subscriber. Callers that trigger one-shot events (e.g. download_finished)
must wait on `ready` before acting, otherwise the event can be broadcast
before this client is subscribed and be lost.
"""
url = f"http://{server.server_host}:{server.server_port}/models/sse"
try:
with requests.get(url, stream=True, timeout=MODEL_DOWNLOAD_TIMEOUT) as resp:
if ready is not None:
ready.set()
for line_bytes in resp.iter_lines():
if stop.is_set():
break
@@ -294,11 +305,17 @@ def test_router_download_model():
sse_events: list = []
stop = threading.Event()
sse_ready = threading.Event()
sse_thread = threading.Thread(
target=_listen_sse, args=(server, sse_events, stop), daemon=True
target=_listen_sse, args=(server, sse_events, stop, sse_ready), daemon=True
)
sse_thread.start()
# wait for the SSE client to be subscribed before triggering the download,
# otherwise the one-shot download_finished event can be broadcast before
# this client is registered and be lost
assert sse_ready.wait(10), "SSE client failed to connect"
# Trigger the download
res = server.make_request("POST", "/models", data={"model": MODEL_DOWNLOAD_ID})
assert res.status_code == 200
@@ -328,13 +345,17 @@ def test_router_delete_model():
# Ensure the model exists (download it if needed)
if MODEL_DOWNLOAD_ID not in _get_model_ids(is_reload=False):
res = server.make_request("POST", "/models", data={"model": MODEL_DOWNLOAD_ID})
assert res.status_code == 200
sse_events: list = []
stop = threading.Event()
sse_ready = threading.Event()
threading.Thread(
target=_listen_sse, args=(server, sse_events, stop), daemon=True
target=_listen_sse, args=(server, sse_events, stop, sse_ready), daemon=True
).start()
# subscribe before triggering the download so the one-shot
# download_finished event is not lost (see test_router_download_model)
assert sse_ready.wait(10), "SSE client failed to connect"
res = server.make_request("POST", "/models", data={"model": MODEL_DOWNLOAD_ID})
assert res.status_code == 200
finished = _wait_for_sse_event(
sse_events, "download_finished", MODEL_DOWNLOAD_ID, MODEL_DOWNLOAD_TIMEOUT
)
+10
View File
@@ -19,6 +19,10 @@ import type {
ApiErrorResponse,
ApiLlamaCppServerProps,
ApiModelDataEntry,
ApiModelLoadStage,
ApiModelsSseProgress,
ApiModelsSseData,
ApiModelsSseEvent,
ApiModelListResponse,
ApiProcessingState,
ApiRouterModelMeta,
@@ -52,6 +56,7 @@ import type {
// Model types
ModelModalities,
ModelOption,
ModelLoadProgress,
// Settings types
SettingsChatServiceOptions,
SettingsConfigValue,
@@ -83,6 +88,10 @@ declare global {
ApiErrorResponse,
ApiLlamaCppServerProps,
ApiModelDataEntry,
ApiModelLoadStage,
ApiModelsSseProgress,
ApiModelsSseData,
ApiModelsSseEvent,
ApiModelListResponse,
ApiProcessingState,
ApiRouterModelMeta,
@@ -120,6 +129,7 @@ declare global {
// Model types
ModelModalities,
ModelOption,
ModelLoadProgress,
// Settings types
SettingsChatServiceOptions,
SettingsConfigValue,
@@ -10,7 +10,7 @@
import { getMessageEditContext } from '$lib/contexts';
import { useProcessingState } from '$lib/hooks/use-processing-state.svelte';
import { isLoading, isChatStreaming } from '$lib/stores/chat.svelte';
import { copyToClipboard, deriveAgenticSections } from '$lib/utils';
import { copyToClipboard, deriveAgenticSections, modelLoadProgressText } from '$lib/utils';
import { AgenticSectionType } from '$lib/enums';
import { REASONING_TAGS } from '$lib/constants/agentic';
import { tick } from 'svelte';
@@ -185,6 +185,13 @@
let hasNoContent = $derived(!message?.content?.trim());
let isActivelyProcessing = $derived(isCurrentlyLoading || isStreaming);
// during a router auto-load the message has no model yet, so target the selected one
let loadTargetModel = $derived(message.model ?? modelsStore.selectedModelName);
let modelLoadProgress = $derived(
isRouter && loadTargetModel ? modelsStore.getLoadProgress(loadTargetModel) : null
);
let modelLoadingText = $derived(modelLoadProgressText(modelLoadProgress));
let showProcessingInfoTop = $derived(
message?.role === MessageRole.ASSISTANT &&
isActivelyProcessing &&
@@ -220,7 +227,8 @@
<div class="mt-6 w-full max-w-[48rem]" in:fade>
<div class="processing-container">
<span class="processing-text">
{processingState.getPromptProgressText() ??
{modelLoadingText ??
processingState.getPromptProgressText() ??
processingState.getProcessingMessage() ??
'Processing...'}
</span>
@@ -252,7 +260,8 @@
<div class="mt-4 w-full max-w-[48rem]" in:fade>
<div class="processing-container">
<span class="processing-text">
{processingState.getPromptProgressText() ??
{modelLoadingText ??
processingState.getPromptProgressText() ??
processingState.getProcessingMessage() ??
'Processing...'}
</span>
@@ -13,6 +13,7 @@
import type { ModelOption } from '$lib/types/models';
import { ServerModelStatus } from '$lib/enums';
import { modelsStore, routerModels } from '$lib/stores/models.svelte';
import { modelLoadFraction, modelLoadProgressText } from '$lib/utils';
interface Props {
option: ModelOption;
@@ -50,11 +51,15 @@
(serverStatus === ServerModelStatus.LOADED || isSleeping) && !isOperationInProgress
);
let isLoading = $derived(serverStatus === ServerModelStatus.LOADING || isOperationInProgress);
let loadProgress = $derived(isLoading ? modelsStore.getLoadProgress(option.model) : null);
let loadPercent = $derived(Math.round(modelLoadFraction(loadProgress) * 100));
let loadTitle = $derived(modelLoadProgressText(loadProgress));
</script>
<div
class={[
'group flex w-full items-center gap-2 rounded-sm p-2 text-left text-sm transition focus:outline-none',
'group relative flex w-full items-center gap-2 rounded-sm p-2 text-left text-sm transition focus:outline-none',
'cursor-pointer hover:bg-muted focus:bg-muted',
(isSelected || isHighlighted) && 'bg-accent text-accent-foreground',
!(isSelected || isHighlighted) && 'hover:bg-accent hover:text-accent-foreground',
@@ -62,6 +67,7 @@
]}
role="option"
aria-selected={isSelected || isHighlighted}
title={loadTitle}
tabindex="0"
onclick={() => onSelect(option.id)}
onmouseenter={onMouseEnter}
@@ -188,4 +194,15 @@
</div>
{/if}
</div>
{#if isLoading}
<div
class="pointer-events-none absolute inset-x-0 bottom-0 h-0.5 overflow-hidden rounded-b-sm bg-muted"
>
<div
class="h-full bg-primary transition-[width] duration-200 ease-out"
style="width: {loadPercent}%"
></div>
</div>
{/if}
</div>
+2 -1
View File
@@ -1,7 +1,8 @@
export const API_MODELS = {
LIST: '/v1/models',
LOAD: '/models/load',
UNLOAD: '/models/unload'
UNLOAD: '/models/unload',
SSE: '/models/sse'
};
// chat completion routes, the control route drives realtime inference (e.g. end reasoning)
+2
View File
@@ -37,6 +37,8 @@ export * from './mcp-form';
export * from './mcp-resource';
export * from './message-export';
export * from './model-id';
export * from './model-loading';
export * from './sse';
export * from './precision';
export * from './processing-info';
export * from './pwa';
@@ -0,0 +1,14 @@
/**
* Labels shown while a model loads, keyed by the stage reported on /models/sse.
*/
export const MODEL_LOAD_STAGE_LABELS: Record<ApiModelLoadStage, string> = {
text_model: 'Loading weights',
spec_model: 'Loading draft',
mmproj_model: 'Loading projector'
};
/**
* Share of the bar reserved for each load phase after text_model.
* text_model fills the rest, so a plain model reaches 100% on its own.
*/
export const MODEL_LOAD_TAIL_SHARE = 0.1;
+16
View File
@@ -0,0 +1,16 @@
/**
* Server-sent events wire format, shared by the chat stream and the
* /models/sse status feed (text/event-stream).
*/
// blank line between two events
export const SSE_RECORD_SEPARATOR = '\n\n';
// line break inside an event
export const SSE_LINE_SEPARATOR = '\n';
// data field prefix, the value follows after an optional space
export const SSE_DATA_PREFIX = 'data:';
// end-of-stream marker on the chat completion stream
export const SSE_DONE_MARKER = '[DONE]';
+1 -1
View File
@@ -54,7 +54,7 @@ export {
export { ModelModality } from './model.enums';
export { ServerRole, ServerModelStatus } from './server.enums';
export { ServerRole, ServerModelStatus, ServerModelsSseEventType } from './server.enums';
export { ParameterSource, SyncableParameterType, SettingsFieldType } from './settings.enums';
+14
View File
@@ -19,3 +19,17 @@ export enum ServerModelStatus {
SLEEPING = 'sleeping',
FAILED = 'failed'
}
/**
* /models/sse event type enum - discriminates the records broadcast on the
* model status feed in ROUTER mode. Matches the event names emitted by
* tools/server/server-models.cpp from the C++ server.
*/
export enum ServerModelsSseEventType {
STATUS_CHANGE = 'status_change',
MODEL_STATUS = 'model_status',
STATUS_UPDATE = 'status_update',
MODELS_RELOAD = 'models_reload',
MODEL_REMOVE = 'model_remove',
DOWNLOAD_PROGRESS = 'download_progress'
}
+9 -7
View File
@@ -10,7 +10,10 @@ import {
SETTINGS_KEYS,
API_CHAT,
API_SLOTS,
CONTROL_ACTION
CONTROL_ACTION,
SSE_LINE_SEPARATOR,
SSE_DATA_PREFIX,
SSE_DONE_MARKER
} from '$lib/constants';
import {
AttachmentType,
@@ -18,8 +21,7 @@ import {
FileTypeAudio,
MessageRole,
MimeTypeAudio,
ReasoningFormat,
UrlProtocol
ReasoningFormat
} from '$lib/enums';
import type {
ApiChatMessageContentPart,
@@ -642,15 +644,15 @@ export class ChatService {
if (abortSignal?.aborted) break;
chunk += decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
const lines = chunk.split(SSE_LINE_SEPARATOR);
chunk = lines.pop() || '';
for (const line of lines) {
if (abortSignal?.aborted) break;
if (line.startsWith(UrlProtocol.DATA)) {
const data = line.slice(6);
if (data === '[DONE]') {
if (line.startsWith(SSE_DATA_PREFIX)) {
const data = line.slice(SSE_DATA_PREFIX.length).trim();
if (data === SSE_DONE_MARKER) {
streamFinished = true;
continue;
+248 -42
View File
@@ -1,6 +1,7 @@
import { base } from '$app/paths';
import { SvelteMap, SvelteSet } from 'svelte/reactivity';
import { toast } from 'svelte-sonner';
import { ServerModelStatus, ModelModality } from '$lib/enums';
import { ServerModelStatus, ServerModelsSseEventType, ModelModality } from '$lib/enums';
import { ModelsService } from '$lib/services/models.service';
import { PropsService } from '$lib/services/props.service';
import { serverStore, isRouterMode } from '$lib/stores/server.svelte';
@@ -8,11 +9,15 @@ import {
detectThinkingSupport,
detectThinkingSupportWithReason
} from '$lib/utils/chat-template-thinking-detector';
import { TTLCache } from '$lib/utils';
import { TTLCache, getAuthHeaders } from '$lib/utils';
import {
MODEL_PROPS_CACHE_TTL_MS,
MODEL_PROPS_CACHE_MAX_ENTRIES,
FAVORITE_MODELS_LOCALSTORAGE_KEY
FAVORITE_MODELS_LOCALSTORAGE_KEY,
API_MODELS,
SSE_RECORD_SEPARATOR,
SSE_LINE_SEPARATOR,
SSE_DATA_PREFIX
} from '$lib/constants';
import { conversationsStore } from '$lib/stores/conversations.svelte';
@@ -55,6 +60,15 @@ class ModelsStore {
private modelUsage = $state<Map<string, SvelteSet<string>>>(new Map());
private modelLoadingStates = new SvelteMap<string, boolean>();
// /models/sse feed state, the single source of truth for status and load progress
private statusAbort: AbortController | null = null;
private statusReaderActive = false;
private loadProgress = new SvelteMap<string, ModelLoadProgress>();
private statusWaiters = new Map<
string,
{ target: ServerModelStatus; resolve: () => void; reject: (e: Error) => void }
>();
favoriteModelIds = $state<Set<string>>(this.loadFavoritesFromStorage());
/**
@@ -531,7 +545,8 @@ class ModelsStore {
* 1. Model from active conversation's last assistant response (if loaded)
* 2. Model from active conversation's last assistant response (if not loaded)
* 3. First loaded model (not from active conversation)
* 4. First available model
* 4. A favorite model
* 5. First available model
*/
async ensureFirstModelSelected(): Promise<void> {
if (this.selectedModelName) return;
@@ -560,6 +575,13 @@ class ModelsStore {
return;
}
// Try loading a favorite model
const favorite = this.favoriteModelIds.values().next()?.value
if (favorite) {
await this.selectModelById(favorite);
return;
}
// Fall back to the first available model
await this.selectModelById(availableModels[0].id);
}
@@ -626,49 +648,218 @@ class ModelsStore {
*
*/
/**
* WORKAROUND: Polling for model status after load/unload operations.
*
* Currently, `/models/load` and `/models/unload` return success before
* the operation actually completes on the server.
*
* TODO: Remove polling once llama-server properly waits for the operation
* to complete before returning success.
*/
private static readonly STATUS_POLL_INTERVAL = 500;
// reconnect delay after the feed drops or the server is not ready yet
private static readonly SSE_RECONNECT_MS = 1000;
/**
* Poll for expected model status after load/unload operation.
* Keeps polling until the model reaches the expected status or fails.
* Open the /models/sse feed and keep it live with auto reconnect.
* Idempotent and router mode only. The feed drives status and progress,
* so it replaces any post-operation polling.
*/
private async pollForModelStatus(
modelId: string,
expectedStatus: ServerModelStatus
): Promise<void> {
let attempt = 0;
while (true) {
await this.fetchRouterModels();
subscribeStatus(): void {
if (this.statusReaderActive) return;
if (!isRouterMode()) return;
const currentStatus = this.getModelStatus(modelId);
if (currentStatus === expectedStatus) return;
this.statusReaderActive = true;
this.statusAbort = new AbortController();
void this.runStatusReader(this.statusAbort.signal);
}
if (currentStatus === ServerModelStatus.FAILED) {
throw new Error(
`Model failed to ${expectedStatus === ServerModelStatus.LOADED ? 'load' : 'unload'}`
);
/**
* Close the /models/sse feed and drop transient progress.
*/
unsubscribeStatus(): void {
this.statusReaderActive = false;
this.statusAbort?.abort();
this.statusAbort = null;
this.loadProgress.clear();
}
/**
* Current load progress for a model, or null when not loading.
*/
getLoadProgress(modelId: string): ModelLoadProgress | null {
return this.loadProgress.get(modelId) ?? null;
}
/**
* Read the feed and reconnect until unsubscribed. Splits the byte stream
* into SSE records on the blank line boundary.
*/
private async runStatusReader(signal: AbortSignal): Promise<void> {
const decoder = new TextDecoder();
while (!signal.aborted) {
try {
const response = await fetch(`${base}${API_MODELS.SSE}`, {
headers: getAuthHeaders(),
signal
});
if (response.ok && response.body) {
const reader = response.body.getReader();
let buffer = '';
while (!signal.aborted) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let boundary = buffer.indexOf(SSE_RECORD_SEPARATOR);
while (boundary !== -1) {
this.handleStatusRecord(buffer.slice(0, boundary));
buffer = buffer.slice(boundary + SSE_RECORD_SEPARATOR.length);
boundary = buffer.indexOf(SSE_RECORD_SEPARATOR);
}
}
}
} catch {
// network drop or abort falls through to the reconnect delay
}
if (
expectedStatus === ServerModelStatus.LOADED &&
currentStatus === ServerModelStatus.UNLOADED &&
attempt > 2
) {
throw new Error('Model was unloaded unexpectedly during loading');
}
if (signal.aborted) return;
attempt++;
await new Promise((resolve) => setTimeout(resolve, ModelsStore.STATUS_POLL_INTERVAL));
await new Promise((resolve) => setTimeout(resolve, ModelsStore.SSE_RECONNECT_MS));
}
}
/**
* Parse one SSE record. The payload rides in the data lines as a JSON
* envelope that carries its own model, event and data fields.
*/
private handleStatusRecord(record: string): void {
const payload = record
.split(SSE_LINE_SEPARATOR)
.filter((line) => line.startsWith(SSE_DATA_PREFIX))
.map((line) => line.slice(SSE_DATA_PREFIX.length).trim())
.join(SSE_LINE_SEPARATOR);
if (payload.length === 0) return;
let envelope: ApiModelsSseEvent;
try {
envelope = JSON.parse(payload);
} catch {
return;
}
this.applyStatusEvent(envelope);
}
/**
* Route one feed record by event kind. Only the status_* events carry a
* status payload, models_reload triggers a list refresh, model_remove drops
* the row, download_* belong to the download surface, not here.
*/
private applyStatusEvent(event: ApiModelsSseEvent): void {
switch (event.event) {
case ServerModelsSseEventType.STATUS_CHANGE:
case ServerModelsSseEventType.MODEL_STATUS:
case ServerModelsSseEventType.STATUS_UPDATE:
this.applyModelStatus(event);
break;
case ServerModelsSseEventType.MODELS_RELOAD:
void this.fetchRouterModels();
break;
case ServerModelsSseEventType.MODEL_REMOVE:
this.removeRouterModel(event.model);
break;
case ServerModelsSseEventType.DOWNLOAD_PROGRESS:
break;
}
}
/**
* Apply a status envelope: update the model row, track or clear progress,
* settle any pending load or unload awaiter.
*/
private applyModelStatus(event: ApiModelsSseEvent): void {
const model = event.model;
const data = event.data;
if (!model || !data?.status) return;
const status = data.status;
this.setRouterModelStatus(model, status);
if (status === ServerModelStatus.LOADING) {
if (data.progress) this.loadProgress.set(model, data.progress);
} else {
this.loadProgress.delete(model);
}
if (status === ServerModelStatus.LOADED) {
void this.updateModelModalities(model);
}
const failed =
status === ServerModelStatus.FAILED ||
(status === ServerModelStatus.UNLOADED && (data.exit_code ?? 0) !== 0);
if (failed) {
this.rejectStatus(model, new Error(`Model failed: ${this.toDisplayName(model)}`));
return;
}
this.settleStatus(model, status);
}
/**
* Drop a model row reported gone by the feed and settle its awaiters.
*/
private removeRouterModel(modelId: string): void {
if (this.routerModels.findIndex((m) => m.id === modelId) === -1) return;
this.routerModels = this.routerModels.filter((m) => m.id !== modelId);
this.loadProgress.delete(modelId);
this.rejectStatus(modelId, new Error(`Model removed: ${this.toDisplayName(modelId)}`));
}
/**
* Update one model row status in place, reassigning to trigger reactivity.
*/
private setRouterModelStatus(modelId: string, status: ServerModelStatus): void {
const idx = this.routerModels.findIndex((m) => m.id === modelId);
if (idx === -1) return;
const current = this.routerModels[idx];
if (current.status.value === status) return;
const next = [...this.routerModels];
next[idx] = { ...current, status: { ...current.status, value: status } };
this.routerModels = next;
}
/**
* Register an awaiter that resolves when the feed reports target status.
* One operation runs per model at a time, so one awaiter per model is kept.
*/
private waitForStatus(modelId: string, target: ServerModelStatus): Promise<void> {
return new Promise((resolve, reject) => {
this.statusWaiters.set(modelId, { target, resolve, reject });
});
}
/**
* Resolve and drop the awaiter when the model reaches its target status.
*/
private settleStatus(modelId: string, status: ServerModelStatus): void {
const waiter = this.statusWaiters.get(modelId);
if (waiter && waiter.target === status) {
this.statusWaiters.delete(modelId);
waiter.resolve();
}
}
/**
* Reject and drop the awaiter for a model.
*/
private rejectStatus(modelId: string, error: Error): void {
const waiter = this.statusWaiters.get(modelId);
if (waiter) {
this.statusWaiters.delete(modelId);
waiter.reject(error);
}
}
@@ -679,12 +870,18 @@ class ModelsStore {
this.modelLoadingStates.set(modelId, true);
this.error = null;
// the feed drives completion, so it must be live before the request
this.subscribeStatus();
const reachedLoaded = this.waitForStatus(modelId, ServerModelStatus.LOADED);
reachedLoaded.catch(() => {});
try {
await ModelsService.load(modelId);
await this.pollForModelStatus(modelId, ServerModelStatus.LOADED);
await this.updateModelModalities(modelId);
await reachedLoaded;
toast.success(`Model loaded: ${this.toDisplayName(modelId)}`);
} catch (error) {
this.rejectStatus(modelId, error instanceof Error ? error : new Error('load failed'));
this.error = error instanceof Error ? error.message : 'Failed to load model';
toast.error(`Failed to load model: ${this.toDisplayName(modelId)}`);
throw error;
@@ -700,11 +897,17 @@ class ModelsStore {
this.modelLoadingStates.set(modelId, true);
this.error = null;
this.subscribeStatus();
const reachedUnloaded = this.waitForStatus(modelId, ServerModelStatus.UNLOADED);
reachedUnloaded.catch(() => {});
try {
await ModelsService.unload(modelId);
await this.pollForModelStatus(modelId, ServerModelStatus.UNLOADED);
await reachedUnloaded;
toast.info(`Model unloaded: ${this.toDisplayName(modelId)}`);
} catch (error) {
this.rejectStatus(modelId, error instanceof Error ? error : new Error('unload failed'));
this.error = error instanceof Error ? error.message : 'Failed to unload model';
toast.error(`Failed to unload model: ${this.toDisplayName(modelId)}`);
throw error;
@@ -783,6 +986,9 @@ class ModelsStore {
}
clear(): void {
this.unsubscribeStatus();
this.statusWaiters.forEach((waiter) => waiter.reject(new Error('Models store cleared')));
this.statusWaiters.clear();
this.models = [];
this.routerModels = [];
this.loading = false;
+47 -1
View File
@@ -1,4 +1,10 @@
import type { ContentPartType, FileTypeAudio, ServerModelStatus, ServerRole } from '$lib/enums';
import type {
ContentPartType,
FileTypeAudio,
ServerModelStatus,
ServerModelsSseEventType,
ServerRole
} from '$lib/enums';
import type { ChatMessagePromptProgress, ChatRole } from './chat';
export type AudioInputFormat = FileTypeAudio.WAV | FileTypeAudio.MP3;
@@ -96,6 +102,46 @@ export interface ApiModelDataEntry {
meta?: Record<string, unknown> | null;
}
/**
* Load stage reported by the /models/sse feed, in load order.
*/
export type ApiModelLoadStage = 'text_model' | 'spec_model' | 'mmproj_model';
/**
* Load progress snapshot: the full ordered stage plan, the active stage,
* and its fractional value (0.0 -> 1.0).
*/
export interface ApiModelsSseProgress {
stages: ApiModelLoadStage[];
current: ApiModelLoadStage;
value: number;
}
/**
* Status payload carried by a /models/sse envelope.
* exit_code appears on unload.
*/
export interface ApiModelsSseData {
status: ServerModelStatus;
progress?: ApiModelsSseProgress;
exit_code?: number;
}
/**
* Event kind multiplexed on the /models/sse feed.
* Only the status_* events carry a status payload, models_reload signals a
* full list refresh, model_remove drops a row, download_* drive download UI.
*/
/**
* One /models/sse record. event discriminates the kind, model names the
* target instance, data carries the status payload when present.
*/
export interface ApiModelsSseEvent {
model: string;
event: ServerModelsSseEventType;
data: ApiModelsSseData;
}
export interface ApiModelDetails {
name: string;
model: string;
+10 -1
View File
@@ -11,6 +11,10 @@ export type {
ApiChatMessageData,
ApiModelStatus,
ApiModelDataEntry,
ApiModelLoadStage,
ApiModelsSseProgress,
ApiModelsSseData,
ApiModelsSseEvent,
ApiModelDetails,
ApiModelListResponse,
ApiLlamaCppServerProps,
@@ -70,7 +74,12 @@ export type {
} from './database';
// Model types
export type { ModelModalities, ModelOption, ModalityCapabilities } from './models';
export type {
ModelModalities,
ModelOption,
ModelLoadProgress,
ModalityCapabilities
} from './models';
// Settings types
export type {
+12 -1
View File
@@ -1,4 +1,4 @@
import type { ApiModelDataEntry, ApiModelDetails } from '$lib/types/api';
import type { ApiModelDataEntry, ApiModelDetails, ApiModelLoadStage } from '$lib/types/api';
export interface ModelModalities {
vision: boolean;
@@ -20,6 +20,17 @@ export interface ModelOption {
tags?: string[];
}
/**
* Ephemeral UI-only load progress for one model instance.
* Lives only while a load runs, driven by the /models/sse feed.
* stage is absent until the feed reports its first stage.
*/
export interface ModelLoadProgress {
stages: ApiModelLoadStage[];
current: ApiModelLoadStage;
value: number;
}
export interface ParsedModelId {
raw: string;
orgName: string | null;
+3
View File
@@ -44,6 +44,9 @@ export { buildProxiedUrl, buildProxiedHeaders } from './cors-proxy';
// URL utilities
export { extractRootDomain, sanitizeExternalUrl } from './url';
// Progress helpers
export { modelLoadFraction, modelLoadProgressText } from './progress';
// Conversation utilities
export { createMessageCountMap, getMessageCount } from './conversation-utils';
+43
View File
@@ -0,0 +1,43 @@
/**
* Model load progress helpers for the /models/sse surfaces
* (selector row and chat message).
*/
import { MODEL_LOAD_STAGE_LABELS, MODEL_LOAD_TAIL_SHARE } from '$lib/constants';
/**
* Human label for a model load stage.
*/
export function modelLoadStageLabel(stage: ApiModelLoadStage): string {
return MODEL_LOAD_STAGE_LABELS[stage];
}
/**
* Overall load fraction (0.0 -> 1.0) across the declared stage plan.
* text_model fills [0, 1 - tail], each later phase owns one tail slice.
*/
export function modelLoadFraction(progress: ModelLoadProgress | null): number {
if (!progress) return 0;
const { stages, current, value } = progress;
const tailCount = Math.max(stages.length - 1, 0);
const textCeiling = 1 - tailCount * MODEL_LOAD_TAIL_SHARE;
const idx = stages.indexOf(current);
if (idx <= 0) {
return value * textCeiling;
}
return textCeiling + (idx - 1 + value) * MODEL_LOAD_TAIL_SHARE;
}
/**
* Single line describing load progress: active stage label and overall percent.
* Returns null when there is no progress to show.
*/
export function modelLoadProgressText(progress: ModelLoadProgress | null): string | null {
if (!progress) return null;
const label = modelLoadStageLabel(progress.current);
return `${label} ${Math.round(modelLoadFraction(progress) * 100)}%`;
}
+14
View File
@@ -230,6 +230,20 @@
}
});
// Live model status and load progress via the /models/sse feed (router mode)
$effect(() => {
if (!browser) return;
if (!isRouterMode()) return;
untrack(() => {
modelsStore.subscribeStatus();
});
return () => {
modelsStore.unsubscribeStatus();
};
});
// Background MCP server health checks on app load
// Fetch enabled servers from settings and run health checks in background
$effect(() => {