Files
llama.cpp/conversion/deepseek.py
T
Aman Gupta 8c146a8366 DeepSeek V4 (#24162)
* convert: add dsv4 conversion

* add basic setup

* add llm_graph_input_dsv4

* add save-load state

* add sinkhorn eps - correction by @fairydreaming

* add rope fix

* cleanup dead code

* fix bugs

* support pro model: added by @fairydreaming

* remove redundant V cache

* Chat template

* remove debugging leftovers

* Add mechanism for inlining templates based on architecture

* s/deepseek-v4-flash/deepseek4/g

* s/deepseek-v4-flash/deepseek4/g continued

* enable graph reuse

* enable FA

* fix test llama archs

* rename

* compatibility with antirez ds4 GGUFs

* simplified set_gguf_parameters() by calling super class method, replaced moe.score_func with expert_gating_func.

* reserve worst-case kv-cache

* revert max split inputs

* address review comments

* add padding to enable FA

* pad only the final value of plan.n_kv to 256

* remove built-in cpp chat template

* cont: remove cpp built-in template

* rm outdated test

* replace ggml_view_3d() with ggml_reshape_3d()

Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>

* only support n_seq=1 for now

* remove unused var

* cont: remove unused var

* use scale bias

* use correct ptr for can_reuse

* remove gen-chat-inline-templates.py

* simplify graph reuse

* cont: cleanup

* remove unused inputs

* enable partial checkpointing

* add correct shape for kq_mask + set llama_model_n_swa to 0 for dsv4

* precompute source_idx + add comment about dummy write

* support multi-seq

* remove restored_trim_pos

* use split_equal when possible

* fix indent

* address review comments

* use LLM_KV

* fix ci

---------

Co-authored-by: Piotr Wilkin <piotr.wilkin@syndatis.com>
Co-authored-by: Stanisław Szymczyk <sszymczy@gmail.com>
Co-authored-by: Xuan Son Nguyen <son@huggingface.co>
Co-authored-by: fairydreaming <166155368+fairydreaming@users.noreply.github.com>
Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>
2026-06-29 16:58:51 +08:00

777 lines
35 KiB
Python

from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any, Callable, Iterable, TYPE_CHECKING
import numpy as np
import torch
if TYPE_CHECKING:
from torch import Tensor
from .base import LazyTorchTensor, MmprojModel, ModelBase, TextModel, gguf, logger
from .qwen import QwenModel
@ModelBase.register("DeepseekOCRForCausalLM", "UnlimitedOCRForCausalLM")
class DeepseekOCRVisionModel(MmprojModel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clip_projector_type = gguf.VisionProjectorType.DEEPSEEKOCR
def set_gguf_parameters(self):
super().set_gguf_parameters()
hparams = self.hparams
self.gguf_writer.add_clip_projector_type(self.clip_projector_type)
# default values below are taken from HF tranformers code
self.gguf_writer.add_vision_attention_layernorm_eps(hparams.get("layer_norm_eps", 1e-6))
self.gguf_writer.add_vision_use_gelu(True)
# calculate proj_scale_factor (used by tinygemma3 test model)
image_seq_length = self.preprocessor_config.get("image_seq_length", 256)
n_per_side = int(image_seq_length ** 0.5)
image_size = self.hparams["image_size"]
patch_size = self.hparams["patch_size"]
proj_scale_factor = (image_size // patch_size) // n_per_side
if proj_scale_factor > 0 and proj_scale_factor != 4:
# we only need to write this if it's not the default value
# in this case, we are converting a test model
self.gguf_writer.add_vision_projector_scale_factor(proj_scale_factor)
# @bluebread: there's no window_size in config but just add it here anyway
self.gguf_writer.add_vision_window_size(self.hparams.get("window_size", 14))
# SAM configuration
sam_hparams = hparams['sam']
self.gguf_writer.add_vision_sam_layers_count(sam_hparams['layers'])
self.gguf_writer.add_vision_sam_embedding_length(sam_hparams['width'])
self.gguf_writer.add_vision_sam_head_count(sam_hparams['heads'])
def get_vision_config(self) -> dict[str, Any]:
vision_config: dict[str, Any] | None = self.global_config.get("vision_config")
if not vision_config:
raise ValueError("DeepseekOCR model requires 'vision_config' in the model configuration, but it was not found")
vision_config['sam'] = vision_config['width']['sam_vit_b']
if vision_config['width'].get('clip-l-14-224') is not None:
vision_config.update(vision_config['width']['clip-l-14-224'])
if isinstance(vision_config['width'], int):
vision_config['hidden_size'] = vision_config['width']
if vision_config.get('heads') is not None:
vision_config['num_heads'] = vision_config['heads']
vision_config['intermediate_size'] = vision_config['heads'] * 4
return vision_config
def tensor_force_quant(self, name, new_name, bid, n_dims):
for nq_name in ('.embeddings.', 'pos_embed', '.rel_pos_h', '.rel_pos_w', '.neck.', '.net_'):
if nq_name in name:
return gguf.GGMLQuantizationType.F32
return super().tensor_force_quant(name, new_name, bid, n_dims)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
if name.endswith("view_seperator"):
data_torch = data_torch.unsqueeze(0)
yield from super().modify_tensors(data_torch, name, bid)
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, gen = item
# Only process vision-related tensors, skip language model tensors
# Vision components: sam_model, vision_model, projector, image_newline, view_seperator
# Language model components to skip: lm_head, embed_tokens, layers, norm
if name.startswith(("lm_head.", "model.embed_tokens.", "model.layers.", "model.norm.")):
return None
if name.endswith("pos_embed") or name.endswith("rel_pos_h") or name.endswith("rel_pos_w"):
name += ".weight"
return super().filter_tensors((name, gen))
@ModelBase.register("DeepseekOCR2ForCausalLM")
class DeepseekOCR2VisionModel(DeepseekOCRVisionModel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clip_projector_type = gguf.VisionProjectorType.DEEPSEEKOCR2
def set_gguf_parameters(self):
# the vision tower's qwen2 encoder is built from fixed defaults,
# see build_qwen2_decoder_as_encoder() in deepencoderv2.py
if self.hparams.get("patch_size") is None:
self.hparams["patch_size"] = 16
if self.hparams.get("intermediate_size") is None:
self.hparams["intermediate_size"] = 4864
if self.hparams.get("num_attention_heads") is None:
self.hparams["num_attention_heads"] = 14
super().set_gguf_parameters()
# qwen2 encoder is GQA: 14 Q heads, 2 KV heads
self.gguf_writer.add_vision_head_count_kv(2)
def get_vision_config(self) -> dict[str, Any]:
vision_config = super().get_vision_config()
vision_config['hidden_size'] = vision_config['width']['qwen2-0-5b']['dim']
if vision_config.get('layers') is None:
vision_config['layers'] = 24
return vision_config
@ModelBase.register("DeepseekForCausalLM")
class DeepseekModel(TextModel):
model_arch = gguf.MODEL_ARCH.DEEPSEEK
def set_vocab(self):
try:
self._set_vocab_sentencepiece()
except FileNotFoundError:
self._set_vocab_gpt2()
def set_gguf_parameters(self):
super().set_gguf_parameters()
hparams = self.hparams
if (rope_dim := hparams.get("head_dim")) is None:
rope_dim = hparams["hidden_size"] // hparams["num_attention_heads"]
self.gguf_writer.add_rope_dimension_count(rope_dim)
self.gguf_writer.add_rope_scaling_type(gguf.RopeScalingType.NONE)
self.gguf_writer.add_leading_dense_block_count(hparams["first_k_dense_replace"])
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
self.gguf_writer.add_expert_feed_forward_length(hparams["moe_intermediate_size"])
self.gguf_writer.add_expert_weights_scale(1.0)
self.gguf_writer.add_expert_count(hparams["n_routed_experts"])
self.gguf_writer.add_expert_shared_count(hparams["n_shared_experts"])
_experts: list[dict[str, Tensor]] | None = None
@staticmethod
def permute(weights: Tensor, n_head: int, n_head_kv: int | None):
if n_head_kv is not None and n_head != n_head_kv:
n_head = n_head_kv
return (weights.reshape(n_head, 2, weights.shape[0] // n_head // 2, *weights.shape[1:])
.swapaxes(1, 2)
.reshape(weights.shape))
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
n_head = self.hparams["num_attention_heads"]
n_kv_head = self.hparams.get("num_key_value_heads")
if name.endswith(("q_proj.weight", "q_proj.bias")):
data_torch = DeepseekModel.permute(data_torch, n_head, n_head)
if name.endswith(("k_proj.weight", "k_proj.bias")):
data_torch = DeepseekModel.permute(data_torch, n_head, n_kv_head)
# process the experts separately
if name.find("mlp.experts") != -1:
n_experts = self.hparams["n_routed_experts"]
assert bid is not None
if self._experts is None:
self._experts = [{} for _ in range(self.block_count)]
self._experts[bid][name] = data_torch
if len(self._experts[bid]) >= n_experts * 3:
# merge the experts into a single 3d tensor
for w_name in ["down_proj", "gate_proj", "up_proj"]:
datas: list[Tensor] = []
for xid in range(n_experts):
ename = f"model.layers.{bid}.mlp.experts.{xid}.{w_name}.weight"
datas.append(self._experts[bid][ename])
del self._experts[bid][ename]
data_torch = torch.stack(datas, dim=0)
merged_name = f"model.layers.{bid}.mlp.experts.{w_name}.weight"
yield from super().modify_tensors(data_torch, merged_name, bid)
return
else:
return
yield from super().modify_tensors(data_torch, name, bid)
def prepare_tensors(self):
super().prepare_tensors()
if self._experts is not None:
# flatten `list[dict[str, Tensor]]` into `list[str]`
experts = [k for d in self._experts for k in d.keys()]
if len(experts) > 0:
raise ValueError(f"Unprocessed experts: {experts}")
@ModelBase.register(
"DeepseekV2ForCausalLM",
"DeepseekV3ForCausalLM",
"DeepseekOCRForCausalLM",
"UnlimitedOCRForCausalLM",
"KimiVLForConditionalGeneration",
"KimiK25ForConditionalGeneration",
"YoutuForCausalLM",
"YoutuVLForConditionalGeneration",
)
class DeepseekV2Model(TextModel):
model_arch = gguf.MODEL_ARCH.DEEPSEEK2
# TODO @ngxson : remove this when we support MTP for deepseek models
skip_mtp = True
merge_expert = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
hparams: dict = ModelBase.load_hparams(self.dir_model, is_mistral_format=False)
self.origin_hf_arch = hparams.get('architectures', [None])[0]
# special handling for Deepseek OCR
if self.origin_hf_arch in ("DeepseekOCRForCausalLM", "DeepseekOCR2ForCausalLM", "UnlimitedOCRForCausalLM"):
self.model_arch = gguf.MODEL_ARCH.DEEPSEEK2OCR
self.gguf_writer.arch = gguf.MODEL_ARCH_NAMES[self.model_arch]
self.gguf_writer.add_architecture()
# default jinja template
self.gguf_writer.add_chat_template("{% for m in messages %}{{m['content']}}{% endfor %}")
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, _ = item
# DeepSeek-OCR vision encoder (SAM + DeepSeek-OCR-2 qwen2 tower)
if "sam_model" in name or "qwen2_model" in name:
return None
return super().filter_tensors(item)
def set_vocab(self):
try:
self._set_vocab_gpt2()
return
except Exception:
pass
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(self.dir_model, trust_remote_code=True)
tokpre = self.get_vocab_base_pre(tokenizer)
if tokpre == "kimi-k2":
# Build merges list using the approach similar to HunYuanMoE
merges = []
vocab = {}
mergeable_ranks = tokenizer.model._mergeable_ranks # ty: ignore[unresolved-attribute]
for token, rank in mergeable_ranks.items():
vocab[QwenModel.token_bytes_to_string(token)] = rank
if len(token) == 1:
continue
merged = QwenModel.bpe(mergeable_ranks, token, max_rank=rank)
if len(merged) == 2:
merges.append(' '.join(map(QwenModel.token_bytes_to_string, merged)))
# Build token list
vocab_size = self.hparams["vocab_size"]
special_tokens = tokenizer.special_tokens # ty: ignore[unresolved-attribute]
reverse_vocab = {id_ : encoded_tok for encoded_tok, id_ in {**vocab, **special_tokens}.items()}
tokens: list[str] = []
toktypes: list[int] = []
for i in range(vocab_size):
if i not in reverse_vocab:
tokens.append(f"[PAD{i}]")
toktypes.append(gguf.TokenType.UNUSED)
else:
token = reverse_vocab[i]
tokens.append(token)
if i in special_tokens.values():
toktypes.append(gguf.TokenType.CONTROL)
else:
toktypes.append(gguf.TokenType.NORMAL)
self.gguf_writer.add_tokenizer_model("gpt2")
self.gguf_writer.add_tokenizer_pre(tokpre)
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_types(toktypes)
self.gguf_writer.add_token_merges(merges)
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=False)
special_vocab.add_to_gguf(self.gguf_writer)
else:
raise NotImplementedError(f"Deepseek pre-tokenizer {tokpre!r} is not supported yet!")
def set_gguf_parameters(self):
is_ocr = (self.model_arch == gguf.MODEL_ARCH.DEEPSEEK2OCR)
if is_ocr:
self.hparams['rope_theta'] = self.hparams.get('rope_theta', 10000.0)
else:
# note: deepseek2 using MLA converts into MQA (ie: GQA with 1 group)
self.hparams["num_key_value_heads"] = 1
self.hparams['rms_norm_eps'] = self.hparams.get('rms_norm_eps', 1e-6)
super().set_gguf_parameters()
hparams = self.hparams
# first_k_dense_replace: number of leading layers using dense FFN instead of MoE
# For non-MoE models (like Youtu), set to n_layer to use dense FFN for all layers
# For MoE models (like DeepSeek-V2), this is the number of leading non-MoE layers
has_moe = hparams.get("n_routed_experts") is not None
first_k_dense_replace = hparams.get("first_k_dense_replace")
if first_k_dense_replace is None:
# Default: if no MoE, all layers are dense; if MoE, none are dense
first_k_dense_replace = hparams["num_hidden_layers"] if not has_moe else 0
self.gguf_writer.add_leading_dense_block_count(first_k_dense_replace)
kv_lora_rank = hparams.get("kv_lora_rank", 512)
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
if "q_lora_rank" in hparams and hparams["q_lora_rank"] is not None:
self.gguf_writer.add_q_lora_rank(hparams["q_lora_rank"])
# note: deepseek2 using MLA converts into MQA with larger heads, then decompresses to MHA
if not is_ocr:
self.gguf_writer.add_kv_lora_rank(kv_lora_rank)
self.gguf_writer.add_key_length(kv_lora_rank + hparams["qk_rope_head_dim"])
self.gguf_writer.add_value_length(kv_lora_rank)
self.gguf_writer.add_key_length_mla(hparams["qk_nope_head_dim"] + hparams["qk_rope_head_dim"])
self.gguf_writer.add_value_length_mla(hparams["v_head_dim"])
# MoE parameters (required by C++ code for DEEPSEEK2 arch)
# For non-MoE models like Youtu, use intermediate_size as expert_feed_forward_length
moe_intermediate_size = self.find_hparam(["moe_intermediate_size", "intermediate_size"], optional=False)
self.gguf_writer.add_expert_feed_forward_length(moe_intermediate_size)
if (n_routed_experts := hparams.get("n_routed_experts")) is not None:
self.gguf_writer.add_expert_count(n_routed_experts)
# expert_shared_count is required by C++ code, default to 0 for non-MoE models
n_shared_experts = hparams.get("n_shared_experts", 0)
self.gguf_writer.add_expert_shared_count(n_shared_experts)
# When not set, C++ code will use scale_w = false to skip the no-op scaling
if (routed_scaling_factor := hparams.get("routed_scaling_factor")) is not None:
self.gguf_writer.add_expert_weights_scale(routed_scaling_factor)
if (norm_topk_prob := hparams.get("norm_topk_prob")) is not None and norm_topk_prob:
self.gguf_writer.add_expert_weights_norm(norm_topk_prob)
self.gguf_writer.add_rope_dimension_count(hparams["qk_rope_head_dim"])
# Unlimited-OCR sliding window; written for metadata, the decoder ignores it (full MHA)
if is_ocr:
sliding_window = hparams.get("sliding_window_size") or hparams.get("sliding_window")
if sliding_window:
self.gguf_writer.add_sliding_window(sliding_window)
if (rope_mscale_all := self.rope_parameters.get("mscale_all_dim")) is not None:
# [TAG_DEEPSEEK2_YARN_LOG_MUL_FIX]
# note: for legacy reasons, this is not consistent with the other usages of self.gguf_writer.add_rope_scaling_yarn_log_mul
# ref https://github.com/ggml-org/llama.cpp/pull/17945
self.gguf_writer.add_rope_scaling_yarn_log_mul(0.1 * rope_mscale_all)
_experts: list[dict[str, Tensor]] | None = None
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
# skip lm_head.weight if tie_word_embeddings is True
if self.hparams.get("tie_word_embeddings", False):
if name == "lm_head.weight" or name == "model.lm_head.weight":
logger.info("Skipping tied output layer 'lm_head.weight' (will use token_embd.weight)")
return
# skip Multi-Token Prediction (MTP) layers
if self.skip_mtp:
block_count = self.hparams["num_hidden_layers"]
match = re.match(r"model.layers.(\d+)", name)
if match and int(match.group(1)) >= block_count:
return
# process the experts separately
if self.merge_expert and name.find("mlp.experts") != -1:
n_experts = self.hparams["n_routed_experts"]
assert bid is not None
if self._experts is None:
self._experts = [{} for _ in range(self.block_count)]
self._experts[bid][name] = data_torch
if len(self._experts[bid]) >= n_experts * 3:
# merge the experts into a single 3d tensor
for w_name in ["down_proj", "gate_proj", "up_proj"]:
datas: list[Tensor] = []
for xid in range(n_experts):
ename = f"model.layers.{bid}.mlp.experts.{xid}.{w_name}.weight"
datas.append(self._experts[bid][ename])
del self._experts[bid][ename]
data_torch = torch.stack(datas, dim=0)
merged_name = f"model.layers.{bid}.mlp.experts.{w_name}.weight"
yield from super().modify_tensors(data_torch, merged_name, bid)
return
else:
return
# note: MLA with the absorption optimization, needs these two split and k_b_proj transposed
if name.endswith("kv_b_proj.weight"):
name_kb = name.replace("kv_b_proj", "k_b_proj")
name_vb = name.replace("kv_b_proj", "v_b_proj")
n_head_kv = self.hparams["num_key_value_heads"]
v_head_dim = self.hparams["v_head_dim"]
qk_nope_head_dim = self.hparams["qk_nope_head_dim"]
assert data_torch.shape[0] == n_head_kv * (v_head_dim + qk_nope_head_dim)
kv_b = data_torch.view(n_head_kv, v_head_dim + qk_nope_head_dim, data_torch.shape[-1])
k_b, v_b = torch.split(kv_b, [qk_nope_head_dim, v_head_dim], dim=1)
k_b = k_b.transpose(1, 2)
yield from super().modify_tensors(k_b, name_kb, bid)
yield from super().modify_tensors(v_b, name_vb, bid)
return
yield from super().modify_tensors(data_torch, name, bid)
def prepare_tensors(self):
super().prepare_tensors()
if self._experts is not None:
# flatten `list[dict[str, Tensor]]` into `list[str]`
experts = [k for d in self._experts for k in d.keys()]
if len(experts) > 0:
raise ValueError(f"Unprocessed experts: {experts}")
@ModelBase.register("DeepseekV32ForCausalLM")
class DeepseekV32Model(DeepseekV2Model):
model_arch = gguf.MODEL_ARCH.DEEPSEEK32
skip_mtp = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.block_count = self.hparams["num_hidden_layers"] + self.hparams.get("num_nextn_predict_layers", 0)
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
def set_vocab(self):
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(self.dir_model)
assert getattr(tokenizer, "add_bos_token", False), "Change value of add_bos_token to true in tokenizer_config.json file."
self._set_vocab_gpt2()
def set_gguf_parameters(self):
super().set_gguf_parameters()
# NextN/MTP prediction layers
if (num_nextn_predict_layers := self.hparams.get("num_nextn_predict_layers")) is not None:
self.gguf_writer.add_nextn_predict_layers(num_nextn_predict_layers)
# DSA indexer parameters
self.gguf_writer.add_indexer_head_count(self.hparams["index_n_heads"])
self.gguf_writer.add_indexer_key_length(self.hparams["index_head_dim"])
self.gguf_writer.add_indexer_top_k(self.hparams["index_topk"])
@ModelBase.register("DeepseekV4ForCausalLM")
class DeepseekV4Model(TextModel):
model_arch = gguf.MODEL_ARCH.DEEPSEEK4
_skipped_mtp_tensors = 0
def __init__(self, *args, **kwargs):
type(self)._skipped_mtp_tensors = 0
super().__init__(*args, **kwargs)
with open(self.dir_model / "config.json", "r", encoding="utf-8") as f:
raw_hparams = json.load(f)
for key, value in raw_hparams.items():
self.hparams.setdefault(key, value)
self.block_count = self.hparams["num_hidden_layers"]
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
self._dsv4_fp8_dequantized: set[str] = set()
self._dsv4_bf16_tensors: set[str] = set()
self._dsv4_f32_tensors: set[str] = set()
self._dsv4_mxfp4_generated = False
self._collect_source_dtypes()
if type(self)._skipped_mtp_tensors:
logger.info("Skipping %d DeepSeek-V4 MTP tensor(s) for conversion v0", type(self)._skipped_mtp_tensors)
# add a default chat template; if the model has a built-in template, it will be overridden later
template_path = Path(__file__).parent.parent / "models" / "templates" / "deepseek-ai-DeepSeek-V4.jinja"
if template_path.is_file():
with open(template_path, "r", encoding="utf-8") as f:
self.gguf_writer.add_chat_template(f.read())
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, _ = item
if name.startswith("mtp."):
cls._skipped_mtp_tensors += 1
return None
return super().filter_tensors(item)
@staticmethod
def _float8_dtypes() -> tuple[torch.dtype, ...]:
return tuple(
dtype for dtype in (
getattr(torch, "float8_e4m3fn", None),
getattr(torch, "float8_e5m2", None),
) if dtype is not None
)
@staticmethod
def _e8m0_to_float(scale: Tensor) -> Tensor:
torch_float8_e8m0 = getattr(torch, "float8_e8m0fnu", None)
if torch_float8_e8m0 is not None and scale.dtype == torch_float8_e8m0:
return scale.float()
bits = scale.view(torch.uint8).float()
return torch.exp2(bits - 127.0)
def _collect_source_dtypes(self) -> None:
for name, gen in self.model_tensors.items():
dtype = gen().dtype
if dtype == torch.bfloat16:
self._dsv4_bf16_tensors.add(name)
elif dtype == torch.float32:
self._dsv4_f32_tensors.add(name)
def set_gguf_parameters(self):
super().set_gguf_parameters()
hparams = self.hparams
self.gguf_writer.add_rope_dimension_count(hparams["qk_rope_head_dim"])
self.gguf_writer.add_q_lora_rank(hparams["q_lora_rank"])
self.gguf_writer.add_sliding_window(hparams["sliding_window"])
self.gguf_writer.add_expert_feed_forward_length(hparams["moe_intermediate_size"])
self.gguf_writer.add_expert_shared_count(hparams["n_shared_experts"])
self.gguf_writer.add_expert_weights_scale(hparams["routed_scaling_factor"])
self.gguf_writer.add_expert_weights_norm(hparams["norm_topk_prob"])
self.gguf_writer.add_swiglu_clamp_exp([hparams["swiglu_limit"]] * self.block_count)
self.gguf_writer.add_swiglu_clamp_shexp([hparams["swiglu_limit"]] * self.block_count)
self.gguf_writer.add_indexer_head_count(hparams["index_n_heads"])
self.gguf_writer.add_indexer_key_length(hparams["index_head_dim"])
self.gguf_writer.add_indexer_top_k(hparams["index_topk"])
self.gguf_writer.add_attention_output_group_count(hparams["o_groups"])
self.gguf_writer.add_attention_output_lora_rank(hparams["o_lora_rank"])
self.gguf_writer.add_attention_compress_ratios(hparams["compress_ratios"])
self.gguf_writer.add_attention_compress_rope_freq_base(hparams["compress_rope_theta"])
self.gguf_writer.add_hyper_connection_count(hparams["hc_mult"])
self.gguf_writer.add_hyper_connection_sinkhorn_iterations(hparams["hc_sinkhorn_iters"])
self.gguf_writer.add_hyper_connection_epsilon(hparams["hc_eps"])
self.gguf_writer.add_hash_layer_count(hparams["num_hash_layers"])
def dequant_model(self):
fp8_dtypes = self._float8_dtypes()
tensors_to_remove: list[str] = []
def dequant_fp8_weight(weight: Tensor, scale: Tensor) -> Tensor:
out_features, in_features = weight.shape
scale_f = self._e8m0_to_float(scale)
scale_f = scale_f.repeat_interleave(128, 0)[:out_features]
scale_f = scale_f.repeat_interleave(128, 1)[:, :in_features]
return weight.float() * scale_f
for name in list(self.model_tensors.keys()):
if not name.endswith(".scale"):
continue
weight_name = name.removesuffix(".scale") + ".weight"
if weight_name not in self.model_tensors:
continue
weight = self.model_tensors[weight_name]
scale = self.model_tensors[name]
if weight().dtype not in fp8_dtypes:
continue
self.model_tensors[weight_name] = lambda w=weight, s=scale: dequant_fp8_weight(w(), s())
self._dsv4_fp8_dequantized.add(weight_name)
tensors_to_remove.append(name)
for name in tensors_to_remove:
del self.model_tensors[name]
@staticmethod
def _pack_mxfp4_blocks(weight: Tensor, scale: Tensor) -> np.ndarray:
packed = weight.contiguous().view(torch.uint8)
scale_u8 = scale.contiguous().view(torch.uint8)
out_features, packed_cols = packed.shape
logical_cols = packed_cols * 2
if logical_cols % 32 != 0:
raise ValueError(f"MXFP4 source row has {logical_cols} values, expected a multiple of 32")
n_blocks = logical_cols // 32
if tuple(scale_u8.shape) != (out_features, n_blocks):
raise ValueError(f"MXFP4 scale shape {tuple(scale_u8.shape)} does not match {(out_features, n_blocks)}")
src = packed.reshape(out_features, n_blocks, 16)
low = src & 0x0F
high = (src >> 4) & 0x0F
# The safetensors bytes store adjacent values as low/high nibbles.
# ggml MXFP4 blocks store values 0..15 in low nibbles and 16..31 in high nibbles.
vals = torch.stack((low, high), dim=-1).reshape(out_features, n_blocks, 32)
qs = vals[:, :, :16] | (vals[:, :, 16:] << 4)
raw = torch.cat((scale_u8.unsqueeze(-1), qs.to(torch.uint8)), dim=-1)
return raw.reshape(out_features, n_blocks * 17).cpu().numpy()
def _write_mxfp4_expert_tensor(self, bid: int, proj: str, tensor_key: gguf.MODEL_TENSOR) -> list[str]:
n_experts = self.hparams["n_routed_experts"]
data: np.ndarray | None = None
consumed: list[str] = []
for eid in range(n_experts):
weight_name = f"layers.{bid}.ffn.experts.{eid}.{proj}.weight"
scale_name = f"layers.{bid}.ffn.experts.{eid}.{proj}.scale"
if weight_name not in self.model_tensors or scale_name not in self.model_tensors:
raise KeyError(f"Missing routed expert tensors for {weight_name}")
weight = LazyTorchTensor.to_eager(self.model_tensors[weight_name]())
scale = LazyTorchTensor.to_eager(self.model_tensors[scale_name]())
packed = self._pack_mxfp4_blocks(weight, scale)
if data is None:
data = np.empty((n_experts, *packed.shape), dtype=packed.dtype)
data[eid] = packed
consumed.extend((weight_name, scale_name))
assert data is not None
new_name = self.format_tensor_name(tensor_key, bid)
shape = gguf.quant_shape_from_byte_shape(data.shape, gguf.GGMLQuantizationType.MXFP4)
logger.info(f"{new_name}: repacked routed experts to MXFP4, shape = {{{', '.join(str(n) for n in reversed(shape))}}}")
self.gguf_writer.add_tensor(new_name, data, raw_dtype=gguf.GGMLQuantizationType.MXFP4)
return consumed
def _write_hash_routing_tensors(self) -> list[str]:
consumed: list[str] = []
for bid in range(self.hparams["num_hash_layers"]):
name = f"layers.{bid}.ffn.gate.tid2eid"
if name not in self.model_tensors:
raise KeyError(f"Missing hash routing tensor {name}")
data_torch = LazyTorchTensor.to_eager(self.model_tensors[name]())
data = data_torch.to(torch.int32).cpu().numpy()
new_name = self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE_TID2EID, bid, ".weight")
logger.info(f"{new_name}: converted hash routing table to I32, shape = {{{', '.join(str(n) for n in reversed(data.shape))}}}")
self.gguf_writer.add_tensor(new_name, data)
consumed.append(name)
return consumed
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
if self._dsv4_mxfp4_generated:
return ()
consumed: list[str] = self._write_hash_routing_tensors()
for bid in range(self.block_count):
consumed.extend(self._write_mxfp4_expert_tensor(bid, "w1", gguf.MODEL_TENSOR.FFN_GATE_EXP))
consumed.extend(self._write_mxfp4_expert_tensor(bid, "w2", gguf.MODEL_TENSOR.FFN_DOWN_EXP))
consumed.extend(self._write_mxfp4_expert_tensor(bid, "w3", gguf.MODEL_TENSOR.FFN_UP_EXP))
for name in consumed:
del self.model_tensors[name]
self._dsv4_mxfp4_generated = True
return ()
def _format_dsv4_tensor_name(self, key: gguf.MODEL_TENSOR, bid: int | None, suffix: str = ".weight") -> str:
return self.format_tensor_name(key, bid, suffix)
def _map_dsv4_tensor_name(self, name: str, bid: int | None) -> tuple[gguf.MODEL_TENSOR, str]:
root_map: dict[str, tuple[gguf.MODEL_TENSOR, str]] = {
"embed.weight": (gguf.MODEL_TENSOR.TOKEN_EMBD, ".weight"),
"norm.weight": (gguf.MODEL_TENSOR.OUTPUT_NORM, ".weight"),
"head.weight": (gguf.MODEL_TENSOR.OUTPUT, ".weight"),
"hc_head_fn": (gguf.MODEL_TENSOR.HC_HEAD_FN, ".weight"),
"hc_head_base": (gguf.MODEL_TENSOR.HC_HEAD_BASE, ".weight"),
"hc_head_scale": (gguf.MODEL_TENSOR.HC_HEAD_SCALE, ".weight"),
}
if name in root_map:
return root_map[name]
match = re.match(r"layers\.(\d+)\.(.+)$", name)
if match is None:
raise ValueError(f"Unsupported DeepSeek-V4 tensor {name!r}")
layer = int(match.group(1))
if bid != layer:
raise ValueError(f"Tensor {name!r} parsed bid {bid} but layer name has {layer}")
layer_map: dict[str, tuple[gguf.MODEL_TENSOR, str]] = {
"hc_attn_fn": (gguf.MODEL_TENSOR.HC_ATTN_FN, ".weight"),
"hc_attn_base": (gguf.MODEL_TENSOR.HC_ATTN_BASE, ".weight"),
"hc_attn_scale": (gguf.MODEL_TENSOR.HC_ATTN_SCALE, ".weight"),
"hc_ffn_fn": (gguf.MODEL_TENSOR.HC_FFN_FN, ".weight"),
"hc_ffn_base": (gguf.MODEL_TENSOR.HC_FFN_BASE, ".weight"),
"hc_ffn_scale": (gguf.MODEL_TENSOR.HC_FFN_SCALE, ".weight"),
"attn.attn_sink": (gguf.MODEL_TENSOR.ATTN_SINKS, ".weight"),
"attn.wq_a.weight": (gguf.MODEL_TENSOR.ATTN_Q_A, ".weight"),
"attn.wq_b.weight": (gguf.MODEL_TENSOR.ATTN_Q_B, ".weight"),
"attn.q_norm.weight": (gguf.MODEL_TENSOR.ATTN_Q_A_NORM, ".weight"),
"attn.wkv.weight": (gguf.MODEL_TENSOR.ATTN_KV, ".weight"),
"attn.kv_norm.weight": (gguf.MODEL_TENSOR.ATTN_KV_NORM, ".weight"),
"attn.wo_a.weight": (gguf.MODEL_TENSOR.ATTN_OUT_A, ".weight"),
"attn.wo_b.weight": (gguf.MODEL_TENSOR.ATTN_OUT_B, ".weight"),
"attn.compressor.ape": (gguf.MODEL_TENSOR.ATTN_COMPRESSOR_APE, ".weight"),
"attn.compressor.wkv.weight": (gguf.MODEL_TENSOR.ATTN_COMPRESSOR_WKV, ".weight"),
"attn.compressor.wgate.weight": (gguf.MODEL_TENSOR.ATTN_COMPRESSOR_WGATE, ".weight"),
"attn.compressor.norm.weight": (gguf.MODEL_TENSOR.ATTN_COMPRESSOR_NORM, ".weight"),
"attn.indexer.wq_b.weight": (gguf.MODEL_TENSOR.INDEXER_ATTN_Q_B, ".weight"),
"attn.indexer.weights_proj.weight": (gguf.MODEL_TENSOR.INDEXER_PROJ, ".weight"),
"attn.indexer.compressor.ape": (gguf.MODEL_TENSOR.INDEXER_COMPRESSOR_APE, ".weight"),
"attn.indexer.compressor.wkv.weight": (gguf.MODEL_TENSOR.INDEXER_COMPRESSOR_WKV, ".weight"),
"attn.indexer.compressor.wgate.weight": (gguf.MODEL_TENSOR.INDEXER_COMPRESSOR_WGATE, ".weight"),
"attn.indexer.compressor.norm.weight": (gguf.MODEL_TENSOR.INDEXER_COMPRESSOR_NORM, ".weight"),
"attn_norm.weight": (gguf.MODEL_TENSOR.ATTN_NORM, ".weight"),
"ffn_norm.weight": (gguf.MODEL_TENSOR.FFN_NORM, ".weight"),
"ffn.gate.weight": (gguf.MODEL_TENSOR.FFN_GATE_INP, ".weight"),
"ffn.gate.bias": (gguf.MODEL_TENSOR.FFN_EXP_PROBS_B, ".bias"),
"ffn.gate.tid2eid": (gguf.MODEL_TENSOR.FFN_GATE_TID2EID, ".weight"),
"ffn.shared_experts.w1.weight": (gguf.MODEL_TENSOR.FFN_GATE_SHEXP, ".weight"),
"ffn.shared_experts.w2.weight": (gguf.MODEL_TENSOR.FFN_DOWN_SHEXP, ".weight"),
"ffn.shared_experts.w3.weight": (gguf.MODEL_TENSOR.FFN_UP_SHEXP, ".weight"),
}
tensor_name = match.group(2)
if tensor_name in layer_map:
return layer_map[tensor_name]
if re.match(r"ffn\.experts\.\d+\.w[123]\.(weight|scale)$", tensor_name):
return gguf.MODEL_TENSOR.FFN_GATE_EXP, ".weight"
raise ValueError(f"Unsupported DeepSeek-V4 tensor {name!r}")
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
if re.match(r"layers\.\d+\.ffn\.experts\.\d+\.w[123]\.(weight|scale)$", name):
return []
tensor_key, suffix = self._map_dsv4_tensor_name(name, bid)
if tensor_key == gguf.MODEL_TENSOR.FFN_GATE_TID2EID:
return []
return [(self._format_dsv4_tensor_name(tensor_key, bid, suffix), data_torch)]
def tensor_force_quant(self, name: str, new_name: str, bid: int | None, n_dims: int) -> gguf.GGMLQuantizationType | bool:
del new_name, bid # unused
if name in self._dsv4_fp8_dequantized and n_dims >= 2:
return gguf.GGMLQuantizationType.Q8_0
if name in self._dsv4_f32_tensors:
return gguf.GGMLQuantizationType.F32
if name in self._dsv4_bf16_tensors and n_dims >= 2:
return gguf.GGMLQuantizationType.BF16
return False
def prepare_tensors(self):
super().prepare_tensors()
self._is_mxfp4 = True
self.ftype = gguf.LlamaFileType.MOSTLY_MXFP4_MOE