Compare commits

..

10 Commits

Author SHA1 Message Date
Georgi Gerganov 72f80499ee server : headers cleanup 2025-11-24 12:50:50 +02:00
Xuan Son Nguyen 625010d42d move enum stop_type to server-task 2025-11-18 17:28:21 +01:00
Xuan Son Nguyen ca993bad51 rm redundant includes 2025-11-18 15:01:17 +01:00
Xuan Son Nguyen 3b7946034c add server-queue 2025-11-18 14:24:46 +01:00
Xuan Son Nguyen e1a756e934 add server-task, server-common 2025-11-18 14:15:14 +01:00
Chenguang Li bc4064cfea CANN: fix acl_tensor_ptr usage in ASCEND_310P ROPE (#17347)
* cann: fix acl_tensor_ptr usage in ASCEND_310P ROPE implementation

Fix compilation errors in the ASCEND_310P-specific ROPE operation code
by adding .get() calls when passing acl_tensor_ptr smart pointers to
functions expecting raw aclTensor* pointers.

This fixes the code that was missed in the previous refactoring commit
(8981848) which changed ggml_cann_create_tensor() return type from
aclTensor* to acl_tensor_ptr.

* cann: format code
2025-11-18 16:41:52 +08:00
o7si 97cb3fd5ae fix: resolve undefined variable 'svr' compilation error (#17348) 2025-11-18 10:10:47 +02:00
jiahao su ffa277a54c CANN: Add openEuler-cann in build and release (#17192)
Update openEuler version

Remove variable ASCEND_SOC_TYPE

Modify the chip type

Fix case in zip filename

Change "device" to "chip_type"

Modify the value of chip_type
2025-11-18 16:08:55 +08:00
Jeff Bolz da95bf2a85 vulkan: support noncontig i32 copy (#17328) 2025-11-18 07:41:24 +01:00
Xuan-Son Nguyen 0de8878c96 server: split HTTP into its own interface (#17216)
* server: split HTTP into its own interface

* move server-http and httplib to its own file

* add the remaining endpoints

* fix exception/error handling

* renaming

* missing header

* fix missing windows header

* fix error responses from http layer

* fix slot save/restore handler

* fix case where only one stream chunk is returned

* add NOMINMAX

* do not call sink.write on empty data

* use safe_json_to_str for SSE

* clean up

* add some comments

* improve usage of next()

* bring back the "server is listening on" message

* more generic handler

* add req.headers

* move the chat template print to init()

* add req.path

* cont : minor

---------

Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>
2025-11-17 22:05:44 +01:00
15 changed files with 4467 additions and 3725 deletions
+5 -6
View File
@@ -3,7 +3,8 @@
# ==============================================================================
# Define the CANN base image for easier version updates later
ARG CANN_BASE_IMAGE=quay.io/ascend/cann:8.1.rc1-910b-openeuler22.03-py3.10
ARG CHIP_TYPE=910b
ARG CANN_BASE_IMAGE=quay.io/ascend/cann:8.3.rc1.alpha001-${CHIP_TYPE}-openeuler22.03-py3.11
# ==============================================================================
# BUILD STAGE
@@ -11,9 +12,6 @@ ARG CANN_BASE_IMAGE=quay.io/ascend/cann:8.1.rc1-910b-openeuler22.03-py3.10
# ==============================================================================
FROM ${CANN_BASE_IMAGE} AS build
# Define the Ascend chip model for compilation. Default is Ascend910B3
ARG ASCEND_SOC_TYPE=Ascend910B3
# -- Install build dependencies --
RUN yum install -y gcc g++ cmake make git libcurl-devel python3 python3-pip && \
yum clean all && \
@@ -36,13 +34,14 @@ ENV LD_LIBRARY_PATH=${ASCEND_TOOLKIT_HOME}/runtime/lib64/stub:$LD_LIBRARY_PATH
# For brevity, only core variables are listed here. You can paste the original ENV list here.
# -- Build llama.cpp --
# Use the passed ASCEND_SOC_TYPE argument and add general build options
# Use the passed CHIP_TYPE argument and add general build options
ARG CHIP_TYPE
RUN source /usr/local/Ascend/ascend-toolkit/set_env.sh --force \
&& \
cmake -B build \
-DGGML_CANN=ON \
-DCMAKE_BUILD_TYPE=Release \
-DSOC_TYPE=${ASCEND_SOC_TYPE} \
-DSOC_TYPE=ascend${CHIP_TYPE} \
. && \
cmake --build build --config Release -j$(nproc)
+4 -4
View File
@@ -1391,9 +1391,9 @@ jobs:
matrix:
arch: [x86, aarch64]
cann:
- '8.1.RC1.alpha001-910b-openeuler22.03-py3.10'
device:
- 'ascend910b3'
- '8.3.rc1.alpha001-910b-openeuler22.03-py3.11'
chip_type:
- '910b'
build:
- 'Release'
runs-on: ${{ matrix.arch == 'aarch64' && 'ubuntu-24.04-arm' || 'ubuntu-24.04' }}
@@ -1414,7 +1414,7 @@ jobs:
cmake -S . -B build \
-DCMAKE_BUILD_TYPE=${{ matrix.build }} \
-DGGML_CANN=on \
-DSOC_TYPE=${{ matrix.device }}
-DSOC_TYPE=ascend${{ matrix.chip_type }}
cmake --build build -j $(nproc)
# TODO: simplify the following workflows using a matrix
+47
View File
@@ -693,6 +693,52 @@ jobs:
path: llama-${{ steps.tag.outputs.name }}-xcframework.zip
name: llama-${{ steps.tag.outputs.name }}-xcframework
openEuler-cann:
strategy:
matrix:
arch: [x86, aarch64]
chip_type: ['910b', '310p']
build:
- 'Release'
runs-on: ${{ matrix.arch == 'aarch64' && 'ubuntu-24.04-arm' || 'ubuntu-24.04' }}
container: ascendai/cann:${{ matrix.chip_type == '910b' && '8.3.rc1.alpha001-910b-openeuler22.03-py3.11' || '8.3.rc1.alpha001-310p-openeuler22.03-py3.11' }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Dependencies
run: |
yum update -y
yum install -y git gcc gcc-c++ make cmake libcurl-devel
git config --global --add safe.directory "$GITHUB_WORKSPACE"
- name: Build
run: |
export LD_LIBRARY_PATH=${ASCEND_TOOLKIT_HOME}/lib64:${ASCEND_TOOLKIT_HOME}/$(uname -m)-linux/devlib/:${LD_LIBRARY_PATH}
cmake -S . -B build \
-DCMAKE_BUILD_TYPE=${{ matrix.build }} \
-DGGML_CANN=on \
-DSOC_TYPE=ascend${{ matrix.chip_type }}
cmake --build build -j $(nproc)
- name: Determine tag name
id: tag
uses: ./.github/actions/get-tag-name
- name: Pack artifacts
run: |
cp LICENSE ./build/bin/
zip -r llama-${{ steps.tag.outputs.name }}-bin-${{ matrix.chip_type }}-openEuler-${{ matrix.arch }}.zip ./build/bin/*
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
path: llama-${{ steps.tag.outputs.name }}-bin-${{ matrix.chip_type }}-openEuler-${{ matrix.arch }}.zip
name: llama-${{ steps.tag.outputs.name }}-bin-${{ matrix.chip_type }}-openEuler-${{ matrix.arch }}
release:
if: ${{ ( github.event_name == 'push' && github.ref == 'refs/heads/master' ) || github.event.inputs.create_release == 'true' }}
@@ -714,6 +760,7 @@ jobs:
- macOS-arm64
- macOS-x64
- ios-xcode-build
- openEuler-cann
steps:
- name: Clone
+13 -12
View File
@@ -2544,7 +2544,7 @@ void ggml_cann_rope(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
int64_t shifts[] = { 1 };
int64_t dims[] = { 3 };
aclnn_roll(ctx, acl_input_tensor, acl_input_roll_tensor, shifts, dims);
aclnn_roll(ctx, acl_input_tensor.get(), acl_input_roll_tensor.get(), shifts, dims);
// init [-1, 1, -1, 1, ...]
minus_one_scale_buffer = minus_one_scale_allocator.get();
@@ -2564,7 +2564,7 @@ void ggml_cann_rope(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
}
int64_t index_num = src0->ne[0];
float value = -1;
aclnn_index_fill_tensor(ctx, acl_minus_one_tensor, dim, index, index_num, value);
aclnn_index_fill_tensor(ctx, acl_minus_one_tensor.get(), dim, index, index_num, value);
} else {
// roll input: [q0,q1,q2,...] ->
// [q_half,q_half+1,...,q_end,q0,q1,...q_half-1]
@@ -2576,7 +2576,7 @@ void ggml_cann_rope(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
int64_t shifts[] = { src0->ne[0] / 2 };
int64_t dims[] = { 3 };
aclnn_roll(ctx, acl_input_tensor, acl_input_roll_tensor, shifts, dims);
aclnn_roll(ctx, acl_input_tensor.get(), acl_input_roll_tensor.get(), shifts, dims);
// init [-1, -1, -1, 1, 11...]
minus_one_scale_buffer = minus_one_scale_allocator.get();
@@ -2599,7 +2599,7 @@ void ggml_cann_rope(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
first_half_ne, first_half_nb, GGML_MAX_DIMS);
bool inplace = true;
float scale = -1;
aclnn_muls(ctx, acl_first_half_tensor, scale, nullptr, inplace);
aclnn_muls(ctx, acl_first_half_tensor.get(), scale, nullptr, inplace);
}
// TODO: n_dims < ne0
@@ -2620,14 +2620,15 @@ void ggml_cann_rope(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
ggml_cann_create_tensor(input_roll_buffer, ggml_cann_type_mapping(src0->type), ggml_type_size(src0->type),
src0->ne, input_nb, GGML_MAX_DIMS);
aclnn_mul(ctx, acl_input_roll_reshape_tensor, acl_minus_one_tensor, acl_input_roll_mul_scale_tensor);
aclnn_mul(ctx, acl_input_roll_reshape_tensor.get(), acl_minus_one_tensor.get(),
acl_input_roll_mul_scale_tensor.get());
// output
void * output_fp32_buffer;
if (src0->type == GGML_TYPE_F32) {
aclnn_mul(ctx, acl_src, acl_cos_reshape_tensor);
aclnn_mul(ctx, acl_input_roll_mul_scale_tensor, acl_sin_reshape_tensor);
aclnn_add(ctx, acl_src, acl_input_roll_mul_scale_tensor, acl_dst);
aclnn_mul(ctx, acl_src.get(), acl_cos_reshape_tensor.get());
aclnn_mul(ctx, acl_input_roll_mul_scale_tensor.get(), acl_sin_reshape_tensor.get());
aclnn_add(ctx, acl_src.get(), acl_input_roll_mul_scale_tensor.get(), acl_dst.get());
// TODO: ne0 != n_dims in mode2
} else if (src0->type == GGML_TYPE_F16) {
size_t input_fp32_nb[GGML_MAX_DIMS];
@@ -2648,10 +2649,10 @@ void ggml_cann_rope(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
output_fp32_buffer = fp32_allocator.get();
acl_tensor_ptr output_fp32_tensor = ggml_cann_create_tensor(output_fp32_buffer, ACL_FLOAT, sizeof(float),
dst->ne, input_fp32_nb, GGML_MAX_DIMS);
aclnn_mul(ctx, acl_src, acl_cos_reshape_tensor, input_fp32_tensor1);
aclnn_mul(ctx, acl_input_roll_mul_scale_tensor, acl_sin_reshape_tensor, input_fp32_tensor2);
aclnn_add(ctx, input_fp32_tensor1, input_fp32_tensor2, output_fp32_tensor);
aclnn_cast(ctx, output_fp32_tensor, acl_dst, ACL_FLOAT16);
aclnn_mul(ctx, acl_src.get(), acl_cos_reshape_tensor.get(), input_fp32_tensor1.get());
aclnn_mul(ctx, acl_input_roll_mul_scale_tensor.get(), acl_sin_reshape_tensor.get(), input_fp32_tensor2.get());
aclnn_add(ctx, input_fp32_tensor1.get(), input_fp32_tensor2.get(), output_fp32_tensor.get());
aclnn_cast(ctx, output_fp32_tensor.get(), acl_dst.get(), ACL_FLOAT16);
}
return;
#endif
+3 -2
View File
@@ -13644,10 +13644,11 @@ static bool ggml_backend_vk_device_supports_op(ggml_backend_dev_t dev, const ggm
}
// We can handle copying from a type to the same type if it's
// contiguous (memcpy). We use f16 or f32 shaders to do the copy,
// either not quantized or is quantized and contiguous.
// We use f16 or f32 shaders to do the copy,
// so the type/block size must be a multiple of 4.
if (src0_type == src1_type &&
ggml_is_contiguous(op->src[0]) && ggml_is_contiguous(op) &&
(!ggml_is_quantized(src0_type) || (ggml_is_contiguous(op->src[0]) && ggml_is_contiguous(op))) &&
(ggml_type_size(src0_type) % 2) == 0) {
return true;
}
+8 -1
View File
@@ -13,7 +13,14 @@ endif()
set(TARGET_SRCS
server.cpp
utils.hpp
server-http.cpp
server-http.h
server-task.cpp
server-task.h
server-queue.cpp
server-queue.h
server-common.cpp
server-common.h
)
set(PUBLIC_ASSETS
index.html.gz
File diff suppressed because it is too large Load Diff
+349
View File
@@ -0,0 +1,349 @@
#pragma once
#include "common.h"
#include "log.h"
#include "llama.h"
#include "chat.h"
#include "mtmd.h"
#define JSON_ASSERT GGML_ASSERT
#include <nlohmann/json.hpp>
#include <string>
#include <vector>
#include <cinttypes>
#define DEFAULT_OAICOMPAT_MODEL "gpt-3.5-turbo"
const static std::string build_info("b" + std::to_string(LLAMA_BUILD_NUMBER) + "-" + LLAMA_COMMIT);
using json = nlohmann::ordered_json;
#define SLT_INF(slot, fmt, ...) LOG_INF("slot %12.*s: id %2d | task %d | " fmt, 12, __func__, (slot).id, ((slot).task ? (slot).task->id : -1), __VA_ARGS__)
#define SLT_WRN(slot, fmt, ...) LOG_WRN("slot %12.*s: id %2d | task %d | " fmt, 12, __func__, (slot).id, ((slot).task ? (slot).task->id : -1), __VA_ARGS__)
#define SLT_ERR(slot, fmt, ...) LOG_ERR("slot %12.*s: id %2d | task %d | " fmt, 12, __func__, (slot).id, ((slot).task ? (slot).task->id : -1), __VA_ARGS__)
#define SLT_DBG(slot, fmt, ...) LOG_DBG("slot %12.*s: id %2d | task %d | " fmt, 12, __func__, (slot).id, ((slot).task ? (slot).task->id : -1), __VA_ARGS__)
#define SRV_INF(fmt, ...) LOG_INF("srv %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define SRV_WRN(fmt, ...) LOG_WRN("srv %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define SRV_ERR(fmt, ...) LOG_ERR("srv %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define SRV_DBG(fmt, ...) LOG_DBG("srv %12.*s: " fmt, 12, __func__, __VA_ARGS__)
using raw_buffer = std::vector<uint8_t>;
template <typename T>
static T json_value(const json & body, const std::string & key, const T & default_value) {
// Fallback null to default value
if (body.contains(key) && !body.at(key).is_null()) {
try {
return body.at(key);
} catch (NLOHMANN_JSON_NAMESPACE::detail::type_error const & err) {
LOG_WRN("Wrong type supplied for parameter '%s'. Expected '%s', using default value: %s\n", key.c_str(), json(default_value).type_name(), err.what());
return default_value;
}
} else {
return default_value;
}
}
// https://community.openai.com/t/openai-chat-list-of-error-codes-and-types/357791/11
enum error_type {
ERROR_TYPE_INVALID_REQUEST,
ERROR_TYPE_AUTHENTICATION,
ERROR_TYPE_SERVER,
ERROR_TYPE_NOT_FOUND,
ERROR_TYPE_PERMISSION,
ERROR_TYPE_UNAVAILABLE, // custom error
ERROR_TYPE_NOT_SUPPORTED, // custom error
ERROR_TYPE_EXCEED_CONTEXT_SIZE, // custom error
};
// thin wrapper around common_grammar_trigger with (de)serialization functions
struct server_grammar_trigger {
common_grammar_trigger value;
server_grammar_trigger() = default;
server_grammar_trigger(const common_grammar_trigger & value) : value(value) {}
server_grammar_trigger(const json & in) {
value.type = (common_grammar_trigger_type) in.at("type").get<int>();
value.value = in.at("value").get<std::string>();
if (value.type == COMMON_GRAMMAR_TRIGGER_TYPE_TOKEN) {
value.token = (llama_token) in.at("token").get<int>();
}
}
json to_json() const {
json out {
{"type", (int) value.type},
{"value", value.value},
};
if (value.type == COMMON_GRAMMAR_TRIGGER_TYPE_TOKEN) {
out["token"] = (int) value.token;
}
return out;
}
};
json format_error_response(const std::string & message, const enum error_type type);
//
// random string / id
//
std::string random_string();
std::string gen_chatcmplid();
std::string gen_tool_call_id();
//
// lora utils
//
// check whether the given lora set has only aloras activated (empty => false)
bool lora_all_alora(const std::vector<common_adapter_lora_info> & loras);
// if the two sets of loras are different, they require a cache clear unless the
// change is only from aloras to aloras.
bool lora_should_clear_cache(
const std::vector<common_adapter_lora_info> & current,
const std::vector<common_adapter_lora_info> & next);
std::vector<common_adapter_lora_info> parse_lora_request(
const std::vector<common_adapter_lora_info> & lora_base,
const json & data);
bool are_lora_equal(
const std::vector<common_adapter_lora_info> & l1,
const std::vector<common_adapter_lora_info> & l2);
// get the ids of all enabled loras
std::vector<size_t> lora_get_enabled_ids(const std::vector<common_adapter_lora_info> & loras);
//
// server_tokens
//
/**
* server_tokens is a helper to manage the input tokens and image for the server.
* it is made this way to simplify the logic of KV cache management.
*/
struct server_tokens {
bool has_mtmd = false;
private: // disallow accessing these members directly, risking out-of-sync
// map a **start** index in tokens to the image chunk
// note: the order need to be in-sync with tokens
std::map<size_t, mtmd::input_chunk_ptr> map_idx_to_media;
// list of tokens
// if the token is LLAMA_TOKEN_NULL, it indicates that this position is occupied by media chunk
// otherwise, it is a normal text token
// note: a non-text chunk can occupy multiple tokens (aka memory cells) in the token list
// note(2): for M-RoPE, an image can occupy different number of pos; do not assume 1-to-1 mapping tokens <-> pos
llama_tokens tokens;
// for ex. with input of 5 text tokens and 2 images (each image occupies 3 tokens and 2 pos):
// [0] [1] [2] [3] [4] [img0] [img0] [img0] [img1] [img1] [img1]
// idx 0 1 2 3 4 5 6 7 8 9 10
// pos 0 1 2 3 4 5 5 5 7 7 7
// map_idx_to_media will contain: {5, img0}, {8, img1}
public:
server_tokens() = default;
~server_tokens() = default;
// Prevent copying
// TODO: server_tokens should be copyable - remove this:
server_tokens(const server_tokens&) = delete;
server_tokens& operator=(const server_tokens&) = delete;
// Allow moving (usually implicitly generated if members are movable)
server_tokens(server_tokens&&) = default;
server_tokens& operator=(server_tokens&&) = default;
// Allow accessing elements using [] operator
llama_token operator[](size_t index) { return tokens[index]; }
const llama_token& operator[](size_t index) const { return tokens[index]; }
server_tokens(mtmd::input_chunks & mtmd_chunks, bool has_mtmd);
server_tokens(const llama_tokens & tokens, bool has_mtmd);
// for debugging
std::string str() const;
llama_pos pos_next() const;
const mtmd::input_chunk_ptr & find_chunk(size_t idx) const;
void push_back(llama_token tok);
// will create a copy of the chunk if it contains non-text data
void push_back(const mtmd_input_chunk * chunk);
// appends server tokens, updates the media map. copies media chunks.
void push_back(server_tokens & tokens);
// for compatibility with context shift and prompt truncation
void insert(const llama_tokens & inp_tokens);
// for compatibility with speculative decoding, ctx shift, slot save/load
const llama_tokens & get_text_tokens() const;
// for compatibility with speculative decoding
void set_token(llama_pos pos, llama_token id);
size_t size() const { return tokens.size(); }
bool empty() const { return tokens.empty(); }
void clear() {
map_idx_to_media.clear();
tokens.clear();
}
void keep_first(size_t n);
std::string detokenize(const llama_context * ctx, bool special) const;
size_t get_common_prefix(const server_tokens & b) const;
// make sure all text tokens are within the vocab range
bool validate(const struct llama_context * ctx) const;
// encode and decode the image chunk
int32_t process_chunk(
llama_context * ctx,
mtmd_context * mctx,
size_t idx,
llama_pos pos,
int32_t seq_id,
size_t & n_tokens_out) const;
};
//
// tokenizer and input processing utils
//
bool json_is_array_of_numbers(const json & data);
// is array having BOTH numbers & strings?
bool json_is_array_of_mixed_numbers_strings(const json & data);
// does array have any individual integers/tokens?
bool json_is_array_and_contains_numbers(const json & data);
// get value by path(key1 / key2)
json json_get_nested_values(const std::vector<std::string> & paths, const json & js);
/**
* this handles 2 cases:
* - only string, example: "string"
* - mixed string and tokens, example: [12, 34, "string", 56, 78]
*/
llama_tokens tokenize_mixed(const llama_vocab * vocab, const json & json_prompt, bool add_special, bool parse_special);
// return the last index of character that can form a valid string
// if the last character is potentially cut in half, return the index before the cut
// if validate_utf8(text) == text.size(), then the whole text is valid utf8
size_t validate_utf8(const std::string& text);
// process mtmd prompt, return the server_tokens containing both text tokens and media chunks
server_tokens process_mtmd_prompt(mtmd_context * mctx, std::string prompt, std::vector<raw_buffer> files);
/**
* break the input "prompt" object into multiple prompt if needed, then tokenize them
* this supports these cases:
* - "prompt": "string"
* - "prompt": [12, 34, 56]
* - "prompt": [12, 34, "string", 56, 78]
* - "prompt": { "prompt_string": "string", "multimodal_data": [ "base64" ] }
* and multiple prompts (multi-tasks):
* - "prompt": ["string1", "string2"]
* - "prompt": ["string1", [12, 34, 56]]
* - "prompt": [[12, 34, 56], [78, 90, 12]]
* - "prompt": [[12, 34, "string", 56, 78], [12, 34, 56], { "prompt_string": "string", "multimodal_data": [ "base64" ]}]
*/
std::vector<server_tokens> tokenize_input_prompts(
const llama_vocab * vocab,
mtmd_context * mctx,
const json & json_prompt,
bool add_special,
bool parse_special);
//
// OAI utils
//
// used by /completions endpoint
json oaicompat_completion_params_parse(const json & body);
struct oaicompat_parser_options {
bool use_jinja;
bool prefill_assistant;
common_reasoning_format reasoning_format;
std::map<std::string,std::string> chat_template_kwargs;
common_chat_templates * tmpls;
bool allow_image;
bool allow_audio;
bool enable_thinking = true;
};
// used by /chat/completions endpoint
json oaicompat_chat_params_parse(
json & body, /* openai api json semantics */
const oaicompat_parser_options & opt,
std::vector<raw_buffer> & out_files);
// TODO: move it to server-task.cpp
json format_embeddings_response_oaicompat(const json & request, const json & embeddings, bool use_base64 = false);
// TODO: move it to server-task.cpp
json format_response_rerank(
const json & request,
const json & ranks,
bool is_tei_format,
std::vector<std::string> & texts,
int top_n);
//
// other utils
//
std::vector<llama_token_data> get_token_probabilities(llama_context * ctx, int idx);
std::string safe_json_to_str(const json & data);
std::string tokens_to_str(llama_context * ctx, const llama_tokens & tokens);
// format incomplete utf-8 multibyte character for output
std::string tokens_to_output_formatted_string(const llama_context * ctx, const llama_token token);
// format server-sent event (SSE), return the formatted string to send
// note: if data is a json array, it will be sent as multiple events, one per item
std::string format_sse(const json & data);
bool is_valid_utf8(const std::string & str);
//
// formatting output responses
// TODO: move these to server-task.cpp
//
llama_tokens format_prompt_infill(
const llama_vocab * vocab,
const json & input_prefix,
const json & input_suffix,
const json & input_extra,
const int n_batch,
const int n_predict,
const int n_ctx,
const bool spm_infill,
const llama_tokens & tokens_prompt);
// format rerank task: [BOS]query[EOS][SEP]doc[EOS].
server_tokens format_prompt_rerank(
const struct llama_model * model,
const struct llama_vocab * vocab,
mtmd_context * mctx,
const std::string & query,
const std::string & doc);
+387
View File
@@ -0,0 +1,387 @@
#include "common.h"
#include "server-http.h"
#include "server-common.h"
#include <cpp-httplib/httplib.h>
#include <functional>
#include <string>
#include <thread>
// auto generated files (see README.md for details)
#include "index.html.gz.hpp"
#include "loading.html.hpp"
//
// HTTP implementation using cpp-httplib
//
class server_http_context::Impl {
public:
std::unique_ptr<httplib::Server> srv;
};
server_http_context::server_http_context()
: pimpl(std::make_unique<server_http_context::Impl>())
{}
server_http_context::~server_http_context() = default;
static void log_server_request(const httplib::Request & req, const httplib::Response & res) {
// skip GH copilot requests when using default port
if (req.path == "/v1/health") {
return;
}
// reminder: this function is not covered by httplib's exception handler; if someone does more complicated stuff, think about wrapping it in try-catch
SRV_INF("request: %s %s %s %d\n", req.method.c_str(), req.path.c_str(), req.remote_addr.c_str(), res.status);
SRV_DBG("request: %s\n", req.body.c_str());
SRV_DBG("response: %s\n", res.body.c_str());
}
bool server_http_context::init(const common_params & params) {
path_prefix = params.api_prefix;
port = params.port;
hostname = params.hostname;
auto & srv = pimpl->srv;
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
if (params.ssl_file_key != "" && params.ssl_file_cert != "") {
LOG_INF("Running with SSL: key = %s, cert = %s\n", params.ssl_file_key.c_str(), params.ssl_file_cert.c_str());
srv.reset(
new httplib::SSLServer(params.ssl_file_cert.c_str(), params.ssl_file_key.c_str())
);
} else {
LOG_INF("Running without SSL\n");
srv.reset(new httplib::Server());
}
#else
if (params.ssl_file_key != "" && params.ssl_file_cert != "") {
LOG_ERR("Server is built without SSL support\n");
return false;
}
srv.reset(new httplib::Server());
#endif
srv->set_default_headers({{"Server", "llama.cpp"}});
srv->set_logger(log_server_request);
srv->set_exception_handler([](const httplib::Request &, httplib::Response & res, const std::exception_ptr & ep) {
// this is fail-safe; exceptions should already handled by `ex_wrapper`
std::string message;
try {
std::rethrow_exception(ep);
} catch (const std::exception & e) {
message = e.what();
} catch (...) {
message = "Unknown Exception";
}
res.status = 500;
res.set_content(message, "text/plain");
LOG_ERR("got exception: %s\n", message.c_str());
});
srv->set_error_handler([](const httplib::Request &, httplib::Response & res) {
if (res.status == 404) {
res.set_content(
safe_json_to_str(json {
{"error", {
{"message", "File Not Found"},
{"type", "not_found_error"},
{"code", 404}
}}
}),
"application/json; charset=utf-8"
);
}
// for other error codes, we skip processing here because it's already done by res->error()
});
// set timeouts and change hostname and port
srv->set_read_timeout (params.timeout_read);
srv->set_write_timeout(params.timeout_write);
if (params.api_keys.size() == 1) {
auto key = params.api_keys[0];
std::string substr = key.substr(std::max((int)(key.length() - 4), 0));
LOG_INF("%s: api_keys: ****%s\n", __func__, substr.c_str());
} else if (params.api_keys.size() > 1) {
LOG_INF("%s: api_keys: %zu keys loaded\n", __func__, params.api_keys.size());
}
//
// Middlewares
//
auto middleware_validate_api_key = [api_keys = params.api_keys](const httplib::Request & req, httplib::Response & res) {
static const std::unordered_set<std::string> public_endpoints = {
"/health",
"/v1/health",
"/models",
"/v1/models",
"/api/tags"
};
// If API key is not set, skip validation
if (api_keys.empty()) {
return true;
}
// If path is public or is static file, skip validation
if (public_endpoints.find(req.path) != public_endpoints.end() || req.path == "/") {
return true;
}
// Check for API key in the header
auto auth_header = req.get_header_value("Authorization");
std::string prefix = "Bearer ";
if (auth_header.substr(0, prefix.size()) == prefix) {
std::string received_api_key = auth_header.substr(prefix.size());
if (std::find(api_keys.begin(), api_keys.end(), received_api_key) != api_keys.end()) {
return true; // API key is valid
}
}
// API key is invalid or not provided
res.status = 401;
res.set_content(
safe_json_to_str(json {
{"error", {
{"message", "Invalid API Key"},
{"type", "authentication_error"},
{"code", 401}
}}
}),
"application/json; charset=utf-8"
);
LOG_WRN("Unauthorized: Invalid API Key\n");
return false;
};
auto middleware_server_state = [this](const httplib::Request & req, httplib::Response & res) {
bool ready = is_ready.load();
if (!ready) {
auto tmp = string_split<std::string>(req.path, '.');
if (req.path == "/" || tmp.back() == "html") {
res.set_content(reinterpret_cast<const char*>(loading_html), loading_html_len, "text/html; charset=utf-8");
res.status = 503;
} else if (req.path == "/models" || req.path == "/v1/models" || req.path == "/api/tags") {
// allow the models endpoint to be accessed during loading
return true;
} else {
res.status = 503;
res.set_content(
safe_json_to_str(json {
{"error", {
{"message", "Loading model"},
{"type", "unavailable_error"},
{"code", 503}
}}
}),
"application/json; charset=utf-8"
);
}
return false;
}
return true;
};
// register server middlewares
srv->set_pre_routing_handler([middleware_validate_api_key, middleware_server_state](const httplib::Request & req, httplib::Response & res) {
res.set_header("Access-Control-Allow-Origin", req.get_header_value("Origin"));
// If this is OPTIONS request, skip validation because browsers don't include Authorization header
if (req.method == "OPTIONS") {
res.set_header("Access-Control-Allow-Credentials", "true");
res.set_header("Access-Control-Allow-Methods", "GET, POST");
res.set_header("Access-Control-Allow-Headers", "*");
res.set_content("", "text/html"); // blank response, no data
return httplib::Server::HandlerResponse::Handled; // skip further processing
}
if (!middleware_server_state(req, res)) {
return httplib::Server::HandlerResponse::Handled;
}
if (!middleware_validate_api_key(req, res)) {
return httplib::Server::HandlerResponse::Handled;
}
return httplib::Server::HandlerResponse::Unhandled;
});
int n_threads_http = params.n_threads_http;
if (n_threads_http < 1) {
// +2 threads for monitoring endpoints
n_threads_http = std::max(params.n_parallel + 2, (int32_t) std::thread::hardware_concurrency() - 1);
}
LOG_INF("%s: using %d threads for HTTP server\n", __func__, n_threads_http);
srv->new_task_queue = [n_threads_http] { return new httplib::ThreadPool(n_threads_http); };
//
// Web UI setup
//
if (!params.webui) {
LOG_INF("Web UI is disabled\n");
} else {
// register static assets routes
if (!params.public_path.empty()) {
// Set the base directory for serving static files
bool is_found = srv->set_mount_point(params.api_prefix + "/", params.public_path);
if (!is_found) {
LOG_ERR("%s: static assets path not found: %s\n", __func__, params.public_path.c_str());
return 1;
}
} else {
// using embedded static index.html
srv->Get(params.api_prefix + "/", [](const httplib::Request & req, httplib::Response & res) {
if (req.get_header_value("Accept-Encoding").find("gzip") == std::string::npos) {
res.set_content("Error: gzip is not supported by this browser", "text/plain");
} else {
res.set_header("Content-Encoding", "gzip");
// COEP and COOP headers, required by pyodide (python interpreter)
res.set_header("Cross-Origin-Embedder-Policy", "require-corp");
res.set_header("Cross-Origin-Opener-Policy", "same-origin");
res.set_content(reinterpret_cast<const char*>(index_html_gz), index_html_gz_len, "text/html; charset=utf-8");
}
return false;
});
}
}
return true;
}
bool server_http_context::start() {
// Bind and listen
auto & srv = pimpl->srv;
bool was_bound = false;
bool is_sock = false;
if (string_ends_with(std::string(hostname), ".sock")) {
is_sock = true;
LOG_INF("%s: setting address family to AF_UNIX\n", __func__);
srv->set_address_family(AF_UNIX);
// bind_to_port requires a second arg, any value other than 0 should
// simply get ignored
was_bound = srv->bind_to_port(hostname, 8080);
} else {
LOG_INF("%s: binding port with default address family\n", __func__);
// bind HTTP listen port
if (port == 0) {
int bound_port = srv->bind_to_any_port(hostname);
was_bound = (bound_port >= 0);
if (was_bound) {
port = bound_port;
}
} else {
was_bound = srv->bind_to_port(hostname, port);
}
}
if (!was_bound) {
LOG_ERR("%s: couldn't bind HTTP server socket, hostname: %s, port: %d\n", __func__, hostname.c_str(), port);
return false;
}
// run the HTTP server in a thread
thread = std::thread([this]() { pimpl->srv->listen_after_bind(); });
srv->wait_until_ready();
listening_address = is_sock ? string_format("unix://%s", hostname.c_str())
: string_format("http://%s:%d", hostname.c_str(), port);
return true;
}
void server_http_context::stop() const {
if (pimpl->srv) {
pimpl->srv->stop();
}
}
static void set_headers(httplib::Response & res, const std::map<std::string, std::string> & headers) {
for (const auto & [key, value] : headers) {
res.set_header(key, value);
}
}
static std::map<std::string, std::string> get_params(const httplib::Request & req) {
std::map<std::string, std::string> params;
for (const auto & [key, value] : req.params) {
params[key] = value;
}
for (const auto & [key, value] : req.path_params) {
params[key] = value;
}
return params;
}
static std::map<std::string, std::string> get_headers(const httplib::Request & req) {
std::map<std::string, std::string> headers;
for (const auto & [key, value] : req.headers) {
headers[key] = value;
}
return headers;
}
static void process_handler_response(server_http_res_ptr & response, httplib::Response & res) {
if (response->is_stream()) {
res.status = response->status;
set_headers(res, response->headers);
std::string content_type = response->content_type;
// convert to shared_ptr as both chunked_content_provider() and on_complete() need to use it
std::shared_ptr<server_http_res> r_ptr = std::move(response);
const auto chunked_content_provider = [response = r_ptr](size_t, httplib::DataSink & sink) -> bool {
std::string chunk;
bool has_next = response->next(chunk);
if (!chunk.empty()) {
// TODO: maybe handle sink.write unsuccessful? for now, we rely on is_connection_closed()
sink.write(chunk.data(), chunk.size());
SRV_DBG("http: streamed chunk: %s\n", chunk.c_str());
}
if (!has_next) {
sink.done();
SRV_DBG("%s", "http: stream ended\n");
}
return has_next;
};
const auto on_complete = [response = r_ptr](bool) mutable {
response.reset(); // trigger the destruction of the response object
};
res.set_chunked_content_provider(content_type, chunked_content_provider, on_complete);
} else {
res.status = response->status;
set_headers(res, response->headers);
res.set_content(response->data, response->content_type);
}
}
void server_http_context::get(const std::string & path, const server_http_context::handler_t & handler) const {
pimpl->srv->Get(path_prefix + path, [handler](const httplib::Request & req, httplib::Response & res) {
server_http_res_ptr response = handler(server_http_req{
get_params(req),
get_headers(req),
req.path,
req.body,
req.is_connection_closed
});
process_handler_response(response, res);
});
}
void server_http_context::post(const std::string & path, const server_http_context::handler_t & handler) const {
pimpl->srv->Post(path_prefix + path, [handler](const httplib::Request & req, httplib::Response & res) {
server_http_res_ptr response = handler(server_http_req{
get_params(req),
get_headers(req),
req.path,
req.body,
req.is_connection_closed
});
process_handler_response(response, res);
});
}
+78
View File
@@ -0,0 +1,78 @@
#pragma once
#include <atomic>
#include <functional>
#include <map>
#include <string>
#include <thread>
struct common_params;
// generator-like API for HTTP response generation
// this object response with one of the 2 modes:
// 1) normal response: `data` contains the full response body
// 2) streaming response: each call to next(output) generates the next chunk
// when next(output) returns false, no more data after the current chunk
// note: some chunks can be empty, in which case no data is sent for that chunk
struct server_http_res {
std::string content_type = "application/json; charset=utf-8";
int status = 200;
std::string data;
std::map<std::string, std::string> headers;
// TODO: move this to a virtual function once we have proper polymorphism support
std::function<bool(std::string &)> next = nullptr;
bool is_stream() const {
return next != nullptr;
}
virtual ~server_http_res() = default;
};
// unique pointer, used by set_chunked_content_provider
// httplib requires the stream provider to be stored in heap
using server_http_res_ptr = std::unique_ptr<server_http_res>;
struct server_http_req {
std::map<std::string, std::string> params; // path_params + query_params
std::map<std::string, std::string> headers; // reserved for future use
std::string path; // reserved for future use
std::string body;
const std::function<bool()> & should_stop;
std::string get_param(const std::string & key, const std::string & def = "") const {
auto it = params.find(key);
if (it != params.end()) {
return it->second;
}
return def;
}
};
struct server_http_context {
class Impl;
std::unique_ptr<Impl> pimpl;
std::thread thread; // server thread
std::atomic<bool> is_ready = false;
std::string path_prefix;
std::string hostname;
int port;
server_http_context();
~server_http_context();
bool init(const common_params & params);
bool start();
void stop() const;
// note: the handler should never throw exceptions
using handler_t = std::function<server_http_res_ptr(const server_http_req & req)>;
void get(const std::string & path, const handler_t & handler) const;
void post(const std::string & path, const handler_t & handler) const;
// for debugging
std::string listening_address;
};
+268
View File
@@ -0,0 +1,268 @@
#include "server-task.h"
#include "server-queue.h"
#include "log.h"
#include <chrono>
#define QUE_INF(fmt, ...) LOG_INF("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define QUE_WRN(fmt, ...) LOG_WRN("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define QUE_ERR(fmt, ...) LOG_ERR("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define QUE_DBG(fmt, ...) LOG_DBG("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_INF(fmt, ...) LOG_INF("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_WRN(fmt, ...) LOG_WRN("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_ERR(fmt, ...) LOG_ERR("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_DBG(fmt, ...) LOG_DBG("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
//
// server_queue
//
int server_queue::post(server_task && task, bool front) {
std::unique_lock<std::mutex> lock(mutex_tasks);
GGML_ASSERT(task.id != -1);
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
const int task_id = task.id;
QUE_DBG("new task, id = %d, front = %d\n", task_id, front);
if (front) {
queue_tasks.push_front(std::move(task));
} else {
queue_tasks.push_back(std::move(task));
}
condition_tasks.notify_one();
return task_id;
}
int server_queue::post(std::vector<server_task> && tasks, bool front) {
std::unique_lock<std::mutex> lock(mutex_tasks);
for (auto & task : tasks) {
if (task.id == -1) {
task.id = id++;
}
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
QUE_DBG("new task, id = %d/%d, front = %d\n", task.id, (int) tasks.size(), front);
if (front) {
queue_tasks.push_front(std::move(task));
} else {
queue_tasks.push_back(std::move(task));
}
}
condition_tasks.notify_one();
return 0;
}
void server_queue::defer(server_task && task) {
std::unique_lock<std::mutex> lock(mutex_tasks);
QUE_DBG("defer task, id = %d\n", task.id);
queue_tasks_deferred.push_back(std::move(task));
condition_tasks.notify_one();
}
int server_queue::get_new_id() {
std::unique_lock<std::mutex> lock(mutex_tasks);
int new_id = id++;
return new_id;
}
void server_queue::on_new_task(std::function<void(server_task &&)> callback) {
callback_new_task = std::move(callback);
}
void server_queue::on_update_slots(std::function<void(void)> callback) {
callback_update_slots = std::move(callback);
}
void server_queue::pop_deferred_task() {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!queue_tasks_deferred.empty()) {
queue_tasks.emplace_front(std::move(queue_tasks_deferred.front()));
queue_tasks_deferred.pop_front();
}
condition_tasks.notify_one();
}
void server_queue::terminate() {
std::unique_lock<std::mutex> lock(mutex_tasks);
running = false;
condition_tasks.notify_all();
}
void server_queue::start_loop() {
running = true;
while (true) {
QUE_DBG("%s", "processing new tasks\n");
while (true) {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!running) {
QUE_DBG("%s", "terminate\n");
return;
}
if (queue_tasks.empty()) {
lock.unlock();
break;
}
server_task task = std::move(queue_tasks.front());
queue_tasks.pop_front();
lock.unlock();
QUE_DBG("processing task, id = %d\n", task.id);
callback_new_task(std::move(task));
}
// all tasks in the current loop is processed, slots data is now ready
QUE_DBG("%s", "update slots\n");
callback_update_slots();
QUE_DBG("%s", "waiting for new tasks\n");
{
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!running) {
QUE_DBG("%s", "terminate\n");
return;
}
if (queue_tasks.empty()) {
condition_tasks.wait(lock, [&]{
return (!queue_tasks.empty() || !running);
});
}
}
}
}
void server_queue::cleanup_pending_task(int id_target) {
// no need lock because this is called exclusively by post()
auto rm_func = [id_target](const server_task & task) {
return task.id == id_target;
};
queue_tasks.erase(
std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func),
queue_tasks.end());
queue_tasks_deferred.erase(
std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func),
queue_tasks_deferred.end());
}
//
// server_response
//
void server_response::add_waiting_task_id(int id_task) {
RES_DBG("add task %d to waiting list. current waiting = %d (before add)\n", id_task, (int) waiting_task_ids.size());
std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.insert(id_task);
}
void server_response::add_waiting_tasks(const std::vector<server_task> & tasks) {
std::unique_lock<std::mutex> lock(mutex_results);
for (const auto & task : tasks) {
RES_DBG("add task %d to waiting list. current waiting = %d (before add)\n", task.id, (int) waiting_task_ids.size());
waiting_task_ids.insert(task.id);
}
}
void server_response::remove_waiting_task_id(int id_task) {
RES_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.erase(id_task);
// make sure to clean up all pending results
queue_results.erase(
std::remove_if(queue_results.begin(), queue_results.end(), [id_task](const server_task_result_ptr & res) {
return res->id == id_task;
}),
queue_results.end());
}
void server_response::remove_waiting_task_ids(const std::unordered_set<int> & id_tasks) {
std::unique_lock<std::mutex> lock(mutex_results);
for (const auto & id_task : id_tasks) {
RES_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
waiting_task_ids.erase(id_task);
}
}
server_task_result_ptr server_response::recv(const std::unordered_set<int> & id_tasks) {
while (true) {
std::unique_lock<std::mutex> lock(mutex_results);
condition_results.wait(lock, [&]{
if (!running) {
RES_DBG("%s : queue result stop\n", __func__);
std::terminate(); // we cannot return here since the caller is HTTP code
}
return !queue_results.empty();
});
for (size_t i = 0; i < queue_results.size(); i++) {
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
server_task_result_ptr res = std::move(queue_results[i]);
queue_results.erase(queue_results.begin() + i);
return res;
}
}
}
// should never reach here
}
server_task_result_ptr server_response::recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout) {
while (true) {
std::unique_lock<std::mutex> lock(mutex_results);
for (int i = 0; i < (int) queue_results.size(); i++) {
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
server_task_result_ptr res = std::move(queue_results[i]);
queue_results.erase(queue_results.begin() + i);
return res;
}
}
std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout));
if (!running) {
RES_DBG("%s : queue result stop\n", __func__);
std::terminate(); // we cannot return here since the caller is HTTP code
}
if (cr_res == std::cv_status::timeout) {
return nullptr;
}
}
// should never reach here
}
server_task_result_ptr server_response::recv(int id_task) {
std::unordered_set<int> id_tasks = {id_task};
return recv(id_tasks);
}
void server_response::send(server_task_result_ptr && result) {
RES_DBG("sending result for task id = %d\n", result->id);
std::unique_lock<std::mutex> lock(mutex_results);
for (const auto & id_task : waiting_task_ids) {
if (result->id == id_task) {
RES_DBG("task id = %d pushed to result queue\n", result->id);
queue_results.emplace_back(std::move(result));
condition_results.notify_all();
return;
}
}
}
void server_response::terminate() {
running = false;
condition_results.notify_all();
}
+110
View File
@@ -0,0 +1,110 @@
#pragma once
#include "server-task.h"
#include <condition_variable>
#include <deque>
#include <mutex>
#include <unordered_set>
struct server_queue {
private:
int id = 0;
bool running;
// queues
std::deque<server_task> queue_tasks;
std::deque<server_task> queue_tasks_deferred;
std::mutex mutex_tasks;
std::condition_variable condition_tasks;
// callback functions
std::function<void(server_task &&)> callback_new_task;
std::function<void(void)> callback_update_slots;
public:
// Add a new task to the end of the queue
int post(server_task && task, bool front = false);
// multi-task version of post()
int post(std::vector<server_task> && tasks, bool front = false);
// Add a new task, but defer until one slot is available
void defer(server_task && task);
// Get the next id for creating a new task
int get_new_id();
// Register function to process a new task
void on_new_task(std::function<void(server_task &&)> callback);
// Register the function to be called when all slots data is ready to be processed
void on_update_slots(std::function<void(void)> callback);
// Call when the state of one slot is changed, it will move one task from deferred to main queue
void pop_deferred_task();
// end the start_loop routine
void terminate();
/**
* Main loop consists of these steps:
* - Wait until a new task arrives
* - Process the task (i.e. maybe copy data into slot)
* - Check if multitask is finished
* - Update all slots
*/
void start_loop();
// for metrics
size_t queue_tasks_deferred_size() {
std::unique_lock<std::mutex> lock(mutex_tasks);
return queue_tasks_deferred.size();
}
private:
void cleanup_pending_task(int id_target);
};
struct server_response {
private:
bool running = true;
// for keeping track of all tasks waiting for the result
std::unordered_set<int> waiting_task_ids;
// the main result queue (using ptr for polymorphism)
std::vector<server_task_result_ptr> queue_results;
std::mutex mutex_results;
std::condition_variable condition_results;
public:
// add the id_task to the list of tasks waiting for response
void add_waiting_task_id(int id_task);
void add_waiting_tasks(const std::vector<server_task> & tasks);
// when the request is finished, we can remove task associated with it
void remove_waiting_task_id(int id_task);
// remove multiple tasks from waiting list
void remove_waiting_task_ids(const std::unordered_set<int> & id_tasks);
// This function blocks the thread until there is a response for one of the id_tasks
server_task_result_ptr recv(const std::unordered_set<int> & id_tasks);
// same as recv(), but have timeout in seconds
// if timeout is reached, nullptr is returned
server_task_result_ptr recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout);
// single-task version of recv()
server_task_result_ptr recv(int id_task);
// Send a new result to a waiting id_task
void send(server_task_result_ptr && result);
// terminate the waiting loop
void terminate();
};
File diff suppressed because it is too large Load Diff
+453
View File
@@ -0,0 +1,453 @@
#pragma once
#include "common.h"
#include "llama.h"
#include <string>
#include <unordered_set>
#include <list>
// TODO: prevent including the whole server-common.h as we only use server_tokens
#include "server-common.h"
using json = nlohmann::ordered_json;
enum server_task_type {
SERVER_TASK_TYPE_COMPLETION,
SERVER_TASK_TYPE_EMBEDDING,
SERVER_TASK_TYPE_RERANK,
SERVER_TASK_TYPE_INFILL,
SERVER_TASK_TYPE_CANCEL,
SERVER_TASK_TYPE_NEXT_RESPONSE,
SERVER_TASK_TYPE_METRICS,
SERVER_TASK_TYPE_SLOT_SAVE,
SERVER_TASK_TYPE_SLOT_RESTORE,
SERVER_TASK_TYPE_SLOT_ERASE,
SERVER_TASK_TYPE_SET_LORA,
};
// TODO: change this to more generic "response_format" to replace the "format_response_*" in server-common
enum oaicompat_type {
OAICOMPAT_TYPE_NONE,
OAICOMPAT_TYPE_CHAT,
OAICOMPAT_TYPE_COMPLETION,
OAICOMPAT_TYPE_EMBEDDING,
};
enum stop_type {
STOP_TYPE_NONE,
STOP_TYPE_EOS,
STOP_TYPE_WORD,
STOP_TYPE_LIMIT,
};
struct task_params {
bool stream = true;
bool include_usage = false;
bool cache_prompt = true; // remember the prompt to avoid reprocessing all prompt
bool return_tokens = false;
bool return_progress = false;
int32_t n_keep = 0; // number of tokens to keep from initial prompt
int32_t n_discard = 0; // number of tokens after n_keep that may be discarded when shifting context, 0 defaults to half
int32_t n_predict = -1; // new tokens to predict
int32_t n_indent = 0; // minimum line indentation for the generated text in number of whitespace characters
int64_t t_max_prompt_ms = -1; // TODO: implement
int64_t t_max_predict_ms = -1; // if positive, limit the generation phase to this time limit
std::vector<common_adapter_lora_info> lora;
std::vector<std::string> antiprompt;
std::vector<std::string> response_fields;
bool timings_per_token = false;
bool post_sampling_probs = false;
struct common_params_sampling sampling;
struct common_params_speculative speculative;
// OAI-compat fields
bool verbose = false;
oaicompat_type oaicompat = OAICOMPAT_TYPE_NONE;
std::string oaicompat_model;
std::string oaicompat_cmpl_id;
common_chat_syntax oaicompat_chat_syntax;
// Embeddings
int32_t embd_normalize = 2; // (-1=none, 0=max absolute int16, 1=taxicab, 2=Euclidean/L2, >2=p-norm)
json format_logit_bias(const std::vector<llama_logit_bias> & logit_bias) const;
json to_json(bool only_metrics = false) const;
};
struct server_task {
int id = -1; // to be filled by server_queue
int index = -1; // used when there are multiple prompts (batch request)
// used by SERVER_TASK_TYPE_CANCEL
int id_target = -1;
int id_slot = -1;
// used by SERVER_TASK_TYPE_INFERENCE
task_params params;
server_tokens tokens;
server_task_type type;
// used by SERVER_TASK_TYPE_SLOT_SAVE, SERVER_TASK_TYPE_SLOT_RESTORE, SERVER_TASK_TYPE_SLOT_ERASE
struct slot_action {
int slot_id;
std::string filename;
std::string filepath;
};
slot_action slot_action;
// used by SERVER_TASK_TYPE_METRICS
bool metrics_reset_bucket = false;
// used by SERVER_TASK_TYPE_SET_LORA
std::vector<common_adapter_lora_info> set_lora;
server_task() = default;
server_task(server_task_type type) : type(type) {}
int32_t n_tokens() const {
return tokens.size();
}
static task_params params_from_json_cmpl(
const llama_context * ctx,
const common_params & params_base,
const json & data);
// utility function
static std::unordered_set<int> get_list_id(const std::vector<server_task> & tasks) {
std::unordered_set<int> ids(tasks.size());
for (size_t i = 0; i < tasks.size(); i++) {
ids.insert(tasks[i].id);
}
return ids;
}
};
struct result_timings {
int32_t cache_n = -1;
int32_t prompt_n = -1;
double prompt_ms;
double prompt_per_token_ms;
double prompt_per_second;
int32_t predicted_n = -1;
double predicted_ms;
double predicted_per_token_ms;
double predicted_per_second;
// Optional speculative metrics - only included when > 0
int32_t draft_n = 0;
int32_t draft_n_accepted = 0;
json to_json() const;
};
struct result_prompt_progress {
int32_t total = 0;
int32_t cache = 0;
int32_t processed = 0;
int64_t time_ms = 0;
json to_json() const;
};
struct server_task_result {
int id = -1;
int id_slot = -1;
virtual bool is_error() {
// only used by server_task_result_error
return false;
}
virtual bool is_stop() {
// only used by server_task_result_cmpl_*
return true;
}
virtual int get_index() {
return -1;
}
virtual json to_json() = 0;
virtual ~server_task_result() = default;
};
// using shared_ptr for polymorphism of server_task_result
using server_task_result_ptr = std::unique_ptr<server_task_result>;
struct completion_token_output {
llama_token tok;
float prob;
std::string text_to_send;
struct prob_info {
llama_token tok;
std::string txt;
float prob;
};
std::vector<prob_info> probs;
json to_json(bool post_sampling_probs) const;
static json probs_vector_to_json(const std::vector<completion_token_output> & probs, bool post_sampling_probs);
static float logarithm(float x);
static std::vector<unsigned char> str_to_bytes(const std::string & str);
};
struct server_task_result_cmpl_final : server_task_result {
int index = 0;
std::string content;
llama_tokens tokens;
bool stream;
bool include_usage;
result_timings timings;
std::string prompt;
bool truncated;
int32_t n_decoded;
int32_t n_prompt_tokens;
int32_t n_tokens_cached;
bool has_new_line;
std::string stopping_word;
stop_type stop = STOP_TYPE_NONE;
bool post_sampling_probs;
std::vector<completion_token_output> probs_output;
std::vector<std::string> response_fields;
task_params generation_params;
// OAI-compat fields
bool verbose = false;
oaicompat_type oaicompat = OAICOMPAT_TYPE_NONE;
std::string oaicompat_model;
std::string oaicompat_cmpl_id;
common_chat_msg oaicompat_msg;
std::vector<common_chat_msg_diff> oaicompat_msg_diffs;
virtual int get_index() override {
return index;
}
virtual bool is_stop() override {
return true; // in stream mode, final responses are considered stop
}
virtual json to_json() override;
json to_json_non_oaicompat();
json to_json_oaicompat();
json to_json_oaicompat_chat();
json to_json_oaicompat_chat_stream();
};
struct server_task_result_cmpl_partial : server_task_result {
int index = 0;
std::string content;
llama_tokens tokens;
int32_t n_decoded;
int32_t n_prompt_tokens;
bool post_sampling_probs;
bool is_progress = false;
completion_token_output prob_output;
result_timings timings;
result_prompt_progress progress;
// OAI-compat fields
bool verbose = false;
oaicompat_type oaicompat = OAICOMPAT_TYPE_NONE;
std::string oaicompat_model;
std::string oaicompat_cmpl_id;
std::vector<common_chat_msg_diff> oaicompat_msg_diffs;
virtual int get_index() override {
return index;
}
virtual bool is_stop() override {
return false; // in stream mode, partial responses are not considered stop
}
virtual json to_json() override;
json to_json_non_oaicompat();
json to_json_oaicompat();
json to_json_oaicompat_chat();
};
struct server_task_result_embd : server_task_result {
int index = 0;
std::vector<std::vector<float>> embedding;
int32_t n_tokens;
// OAI-compat fields
oaicompat_type oaicompat = OAICOMPAT_TYPE_NONE;
virtual int get_index() override {
return index;
}
virtual json to_json() override;
json to_json_non_oaicompat();
json to_json_oaicompat();
};
struct server_task_result_rerank : server_task_result {
int index = 0;
float score = -1e6;
int32_t n_tokens;
virtual int get_index() override {
return index;
}
virtual json to_json() override;
};
struct server_task_result_error : server_task_result {
int index = 0;
error_type err_type = ERROR_TYPE_SERVER;
std::string err_msg;
// for ERROR_TYPE_EXCEED_CONTEXT_SIZE
int32_t n_prompt_tokens = 0;
int32_t n_ctx = 0;
virtual bool is_error() override {
return true;
}
virtual json to_json() override;
};
struct server_task_result_metrics : server_task_result {
int n_idle_slots;
int n_processing_slots;
int n_tasks_deferred;
int64_t t_start;
// TODO: somehow reuse server_metrics in the future, instead of duplicating the fields
uint64_t n_prompt_tokens_processed_total = 0;
uint64_t t_prompt_processing_total = 0;
uint64_t n_tokens_predicted_total = 0;
uint64_t t_tokens_generation_total = 0;
uint64_t n_tokens_max = 0;
uint64_t n_prompt_tokens_processed = 0;
uint64_t t_prompt_processing = 0;
uint64_t n_tokens_predicted = 0;
uint64_t t_tokens_generation = 0;
uint64_t n_decode_total = 0;
uint64_t n_busy_slots_total = 0;
// while we can also use std::vector<server_slot> this requires copying the slot object which can be quite messy
// therefore, we use json to temporarily store the slot.to_json() result
json slots_data = json::array();
virtual json to_json() override;
};
struct server_task_result_slot_save_load : server_task_result {
std::string filename;
bool is_save; // true = save, false = load
size_t n_tokens;
size_t n_bytes;
double t_ms;
virtual json to_json() override;
};
struct server_task_result_slot_erase : server_task_result {
size_t n_erased;
virtual json to_json() override;
};
struct server_task_result_apply_lora : server_task_result {
virtual json to_json() override;
};
struct server_prompt_checkpoint {
llama_pos pos_min;
llama_pos pos_max;
std::vector<uint8_t> data;
size_t size() const {
return data.size();
}
};
struct server_prompt {
server_tokens tokens;
std::vector<uint8_t> data;
std::list<server_prompt_checkpoint> checkpoints;
size_t size() const {
size_t res = data.size();
for (const auto & checkpoint : checkpoints) {
res += checkpoint.size();
}
return res;
}
int n_tokens() const {
return tokens.size();
}
};
struct server_prompt_cache {
server_prompt_cache(int32_t limit_size_mib, size_t limit_tokens) {
this->limit_size = 1024ull*1024ull*(limit_size_mib < 0 ? 0 : limit_size_mib);
this->limit_tokens = limit_tokens;
}
std::list<server_prompt> states;
// in bytes, 0 = no limit
size_t limit_size = 0;
// in tokens, 0 = no limit
size_t limit_tokens = 0;
size_t size() const;
size_t n_tokens() const;
server_prompt * alloc(const server_prompt & prompt, size_t state_size);
bool load(server_prompt & prompt, const server_tokens & tokens_new, llama_context * ctx, int32_t id_slot);
void update();
};
+770 -2767
View File
File diff suppressed because it is too large Load Diff