from __future__ import annotations import json import math from typing import Callable, Iterable, TYPE_CHECKING import numpy as np import torch if TYPE_CHECKING: from torch import Tensor from .base import ModelBase, TextModel, gguf, logger @ModelBase.register( "LLaMAForCausalLM", "LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM", "VLlama3ForCausalLM", "LlavaForConditionalGeneration", "VoxtralForConditionalGeneration", "LlamaForCausalLMEagle3", "Eagle3Speculator", "Eagle3DraftModel", "IQuestCoderForCausalLM", "LlamaModel") class LlamaModel(TextModel): model_arch = gguf.MODEL_ARCH.LLAMA undo_permute = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # fix for SmolVLM2, missing `num_attention_heads` in config.json if self.hf_arch == "VLlama3ForCausalLM": self.hparams["num_attention_heads"] = self.hparams.get("num_attention_heads", 32) # Mistral consolidated format has no config.json; origin_hf_arch is HF-only. if self.is_mistral_format: self.origin_hf_arch = None else: hparams = ModelBase.load_hparams(self.dir_model, is_mistral_format=False) self.origin_hf_arch = hparams.get('architectures', [None])[0] # Detect eagle3 draft checkpoint by hparams (some models don't use a distinct HF arch name) if "draft_vocab_size" in self.hparams and self.hparams["num_hidden_layers"] == 1: self.is_eagle3 = True self.model_arch = gguf.MODEL_ARCH.EAGLE3 logger.info("Detected EAGLE-3 draft model, switching to EAGLE3 architecture") # Re-initialize tensor_map with eagle3 architecture self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count) # Update gguf_writer architecture self.gguf_writer.arch = gguf.MODEL_ARCH_NAMES[self.model_arch] self.gguf_writer.add_architecture() if self.target_model_dir is None: raise ValueError( "EAGLE-3 model requires --target-model-dir to be specified. " "Please provide the path to the target model directory to read config.json" ) # Read both eagle3 raw config and target model config with open(self.dir_model / "config.json", 'r', encoding='utf-8') as f: eagle3_raw_config = json.load(f) with open(self.target_model_dir / "config.json", 'r', encoding='utf-8') as f: target_config = json.load(f) if "text_config" in target_config: target_config = {**target_config, **target_config["text_config"]} self.target_vocab_size = target_config["vocab_size"] # target_layers: derived from target model layer count (low/mid/high) target_num_layers = target_config["num_hidden_layers"] target_layers = [2, target_num_layers // 2, target_num_layers - 3] logger.info(f"EAGLE-3: target_layers = {target_layers} (target model has {target_num_layers} layers)") self.gguf_writer.add_array(f"{self.gguf_writer.arch}.target_layers", target_layers) # target_hidden_size: prefer eagle3 config, fallback to target config if eagle3_raw_config.get("target_hidden_size") is not None: target_hidden_size = eagle3_raw_config["target_hidden_size"] src = "EAGLE-3 config" else: target_hidden_size = target_config["hidden_size"] src = "target model config" logger.info(f"EAGLE-3: target_hidden_size = {target_hidden_size} (from {src})") self.gguf_writer.add_uint32(f"{self.gguf_writer.arch}.target_hidden_size", target_hidden_size) # norm_before_residual (RedHat-style eagle3 specific) norm_before_residual = eagle3_raw_config.get("norm_before_residual", False) logger.info(f"EAGLE-3: norm_before_residual = {norm_before_residual}") self.gguf_writer.add_bool(f"{self.gguf_writer.arch}.norm_before_residual", norm_before_residual) def set_vocab(self): # eagle3: use tokenizer from target model if provided original_dir_model = None if getattr(self, 'is_eagle3', False): assert self.target_model_dir is not None logger.info(f"EAGLE-3: Using tokenizer from target model: {self.target_model_dir}") original_dir_model = self.dir_model self.dir_model = self.target_model_dir if self.origin_hf_arch == "GlmasrModel": return self._set_vocab_glmedge() if self.is_mistral_format: return self._set_vocab_mistral() path_tekken_json = self.dir_model / "tekken.json" path_tokenizer_json = self.dir_model / "tokenizer.json" if path_tekken_json.is_file() and not path_tokenizer_json.is_file(): self._set_vocab_mistral() tokenizer_config_file = self.dir_model / 'tokenizer_config.json' if tokenizer_config_file.is_file(): with open(tokenizer_config_file, "r", encoding="utf-8") as f: tokenizer_config_json = json.load(f) if (add_prefix_space := tokenizer_config_json.get("add_prefix_space")) is not None: self.gguf_writer.add_add_space_prefix(add_prefix_space) if tokenizer_config_json.get("tokenizer_class") == "HybridDNATokenizer": return self._set_vocab_hybriddna() try: self._set_vocab_sentencepiece() except FileNotFoundError: try: self._set_vocab_llama_hf() except (FileNotFoundError, TypeError): # Llama 3 self._set_vocab_gpt2() # Apply to CodeLlama only (and ignore for Llama 3 with a vocab size of 128256) if self.hparams.get("vocab_size", 32000) == 32016: special_vocab = gguf.SpecialVocab( self.dir_model, load_merges=False, special_token_types = ['prefix', 'suffix', 'middle', 'eot'] ) special_vocab._set_special_token("prefix", 32007) special_vocab._set_special_token("suffix", 32008) special_vocab._set_special_token("middle", 32009) special_vocab._set_special_token("eot", 32010) special_vocab.add_to_gguf(self.gguf_writer) # Apply to granite small models only if self.hparams.get("vocab_size", 32000) == 49152: self.gguf_writer.add_add_bos_token(False) # eagle3: Restore original dir_model if original_dir_model is not None: self.dir_model = original_dir_model def set_gguf_parameters(self): super().set_gguf_parameters() hparams = self.hparams if not self.is_mistral_format: self.gguf_writer.add_vocab_size(hparams["vocab_size"]) if (rope_dim := hparams.get("head_dim")) is None: rope_dim = hparams["hidden_size"] // hparams["num_attention_heads"] self.gguf_writer.add_rope_dimension_count(rope_dim) @staticmethod def permute(weights: Tensor, n_head: int, n_head_kv: int | None): if n_head_kv is not None and n_head != n_head_kv: n_head = n_head_kv return (weights.reshape(n_head, 2, weights.shape[0] // n_head // 2, *weights.shape[1:]) .swapaxes(1, 2) .reshape(weights.shape)) def _repack_nvfp4(self, name: str, weight: Tensor, scale: Tensor, scale2: Tensor, input_scale: Tensor): # Mirror the BF16 Q/K RoPE permutation site in modify_tensors; the NVFP4 path bypasses it. if self.undo_permute: n_head = self.find_hparam(["n_heads", "num_attention_heads"], optional=True) n_kv_head = self.find_hparam(["n_kv_heads", "num_key_value_heads"], optional=True) if n_head is not None: if name.endswith("q_proj.weight"): weight = LlamaModel.permute(weight, n_head, n_head) scale = LlamaModel.permute(scale, n_head, n_head) elif name.endswith("k_proj.weight"): weight = LlamaModel.permute(weight, n_head, n_kv_head) scale = LlamaModel.permute(scale, n_head, n_kv_head) super()._repack_nvfp4(name, weight, scale, scale2, input_scale) _experts: list[dict[str, Tensor]] | None = None @classmethod def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None: name, gen = item if "text_model." in name: name = name.replace("text_model.", "") # for SmolVLM return super().filter_tensors((name, gen)) def index_tensors(self, remote_hf_model_id: str | None = None) -> dict[str, Callable[[], Tensor]]: tensors = super().index_tensors(remote_hf_model_id) # Handle Eagle3Speculator nested config if "transformer_layer_config" in self.hparams: self.hparams = {**self.hparams, **self.hparams["transformer_layer_config"]} # eagle3 detection if "draft_vocab_size" in self.hparams and self.hparams["num_hidden_layers"] == 1: logger.info("EAGLE-3: renaming midlayer.* / layers.0.* to model.layers.0.*") new_tensors = {} for name, gen in tensors.items(): if name.startswith("midlayer."): new_name = "model.layers.0." + name[len("midlayer."):] new_tensors[new_name] = gen elif name.startswith("layers.0."): # Eagle3Speculator format new_name = "model." + name new_tensors[new_name] = gen else: new_tensors[name] = gen return new_tensors return tensors def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]: # eagle3: special tensors that bypass standard llama mapping if getattr(self, 'is_eagle3', False): if name == "fc.weight": yield (name, data_torch) return if name == "d2t": # store for manual int64 handling in prepare_tensors (avoid F32 conversion) if not hasattr(self, '_eagle3_int_tensors'): self._eagle3_int_tensors = {} self._eagle3_int_tensors[name] = data_torch return if name == "t2d": # not used at runtime, skip return if name.endswith(".hidden_norm.weight"): yield (self.format_tensor_name(gguf.MODEL_TENSOR.ATTN_NORM_2, bid), data_torch) return n_head = self.find_hparam(["n_heads", "num_attention_heads"]) n_kv_head = self.find_hparam(["n_kv_heads", "num_key_value_heads"]) if self.hf_arch == "LlamaModel": name = "model." + name if self.undo_permute: if name.endswith(("q_proj.weight", "q_proj.bias")): data_torch = LlamaModel.permute(data_torch, n_head, n_head) if name.endswith(("k_proj.weight", "k_proj.bias")): data_torch = LlamaModel.permute(data_torch, n_head, n_kv_head) # process the experts separately if name.find("block_sparse_moe.experts") != -1: n_experts = self.hparams["num_local_experts"] assert bid is not None if self._experts is None: self._experts = [{} for _ in range(self.block_count)] self._experts[bid][name] = data_torch if len(self._experts[bid]) >= n_experts * 3: # merge the experts into a single 3d tensor for wid in ["w1", "w2", "w3"]: datas: list[Tensor] = [] for xid in range(n_experts): ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{wid}.weight" datas.append(self._experts[bid][ename]) del self._experts[bid][ename] data_torch = torch.stack(datas, dim=0) merged_name = f"layers.{bid}.feed_forward.experts.{wid}.weight" yield from super().modify_tensors(data_torch, merged_name, bid) return else: return yield from super().modify_tensors(data_torch, name, bid) def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]: if rope_params := self.rope_parameters.get("full_attention", self.rope_parameters): if rope_params.get("rope_type", '').lower() == "llama3": base = rope_params.get("rope_theta", 10000.0) if (dim := self.hparams.get("head_dim")) is None: dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"] freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim)) factor = rope_params.get("factor", 8.0) low_freq_factor = rope_params.get("low_freq_factor", 1.0) high_freq_factor = rope_params.get("high_freq_factor", 4.0) old_context_len = rope_params.get("original_max_position_embeddings", 8192) low_freq_wavelen = old_context_len / low_freq_factor high_freq_wavelen = old_context_len / high_freq_factor # assert low_freq_wavelen != high_freq_wavelen # Errors for Llama4 rope_factors = [] for freq in freqs: wavelen = 2 * math.pi / freq if wavelen < high_freq_wavelen: rope_factors.append(1) elif wavelen > low_freq_wavelen: rope_factors.append(factor) else: smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor) rope_factors.append(1 / ((1 - smooth) / factor + smooth)) yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32)) def prepare_tensors(self): # eagle3: collect d2t original dtype before parent converts tensors to F32 eagle3_original_dtypes = {} if getattr(self, 'is_eagle3', False): for name, data_torch in self.get_tensors(): if name == "d2t": eagle3_original_dtypes[name] = data_torch.dtype super().prepare_tensors() # eagle3: write d2t as absolute target token ids if getattr(self, 'is_eagle3', False) and hasattr(self, '_eagle3_int_tensors'): for name, data_torch in self._eagle3_int_tensors.items(): old_dtype = eagle3_original_dtypes.get(name, data_torch.dtype) data = data_torch.to(torch.int64).cpu().numpy() if name == "d2t": data = data.reshape(-1) data = data + np.arange(data.size, dtype=np.int64) if np.any((data < 0) | (data >= self.target_vocab_size)): raise ValueError(f"EAGLE-3 d2t target ids out of range for target vocab size {self.target_vocab_size}") if np.unique(data).size != data.size: raise ValueError("EAGLE-3 d2t contains duplicate target ids") data_qtype = gguf.GGMLQuantizationType.I64 shape_str = f"{{{', '.join(str(n) for n in reversed(data.shape))}}}" logger.info(f"{name + ',':<30} {old_dtype} --> {data_qtype.name}, shape = {shape_str}") self.gguf_writer.add_tensor(name, data, raw_dtype=data_qtype) if self._experts is not None: # flatten `list[dict[str, Tensor]]` into `list[str]` experts = [k for d in self._experts for k in d.keys()] if len(experts) > 0: raise ValueError(f"Unprocessed experts: {experts}") @ModelBase.register("ArceeForCausalLM") class ArceeModel(LlamaModel): model_arch = gguf.MODEL_ARCH.ARCEE def set_gguf_parameters(self): super().set_gguf_parameters() self._try_set_pooling_type() @ModelBase.register( "Llama4ForConditionalGeneration", "Llama4ForCausalLM", ) class Llama4Model(LlamaModel): model_arch = gguf.MODEL_ARCH.LLAMA4 undo_permute = False def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # IMPORTANT: the normal "intermediate_size" is renamed to "intermediate_size_mlp", we need to undo this self.hparams["intermediate_size_moe"] = self.hparams["intermediate_size"] self.hparams["intermediate_size"] = self.hparams["intermediate_size_mlp"] def set_vocab(self): self._set_vocab_gpt2() def set_gguf_parameters(self): super().set_gguf_parameters() self.gguf_writer.add_interleave_moe_layer_step(self.hparams["interleave_moe_layer_step"]) self.gguf_writer.add_expert_feed_forward_length(self.hparams["intermediate_size_moe"]) if "layer_types" in self.hparams: if all(lt == "full_attention" for lt in self.hparams["layer_types"]): # all layers are full attention (for MobileLLM), disable swa self.gguf_writer.add_sliding_window(0) def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None): # split the gate_up into gate and up if "gate_up_proj" in name: name_up = name.replace("gate_up_proj", "up_proj.weight") name_gate = name.replace("gate_up_proj", "gate_proj.weight") dim_half = data_torch.shape[-1] // 2 gate_proj_weight, up_proj_weight = data_torch.transpose(-1, -2).split(dim_half, dim=-2) yield from super().modify_tensors(gate_proj_weight, name_gate, bid) yield from super().modify_tensors(up_proj_weight, name_up, bid) return if name.endswith("down_proj"): name += ".weight" data_torch = data_torch.transpose(-1, -2) yield from super().modify_tensors(data_torch, name, bid) @ModelBase.register("LlamaBidirectionalModel") class LlamaEmbedNemotronModel(LlamaModel): model_arch = gguf.MODEL_ARCH.LLAMA_EMBED @ModelBase.register("SmolLM3ForCausalLM") class SmolLM3Model(LlamaModel): model_arch = gguf.MODEL_ARCH.SMOLLM3 @ModelBase.register("ApertusForCausalLM") class ApertusModel(LlamaModel): model_arch = gguf.MODEL_ARCH.APERTUS undo_permute = False _alpha_n = {} _alpha_p = {} _beta = {} _eps = {} def modify_tensors(self, data_torch, name, bid): # Handle xIELU activation parameters n_layers = self.hparams["num_hidden_layers"] if name.endswith(".act_fn.alpha_n"): self._alpha_n[bid] = data_torch.to("cpu").float().item() if (len(self._alpha_n) == n_layers): self.gguf_writer.add_xielu_alpha_n([self._alpha_n[k] for k in sorted(self._alpha_n)]) return if name.endswith(".act_fn.alpha_p"): self._alpha_p[bid] = data_torch.to("cpu").float().item() if (len(self._alpha_p) == n_layers): self.gguf_writer.add_xielu_alpha_p([self._alpha_p[k] for k in sorted(self._alpha_p)]) return if name.endswith(".act_fn.beta"): self._beta[bid] = data_torch.to("cpu").float().item() if (len(self._beta) == n_layers): self.gguf_writer.add_xielu_beta([self._beta[k] for k in sorted(self._beta)]) return if name.endswith(".act_fn.eps"): self._eps[bid] = data_torch.to("cpu").float().item() if (len(self._eps) == n_layers): self.gguf_writer.add_xielu_eps([self._eps[k] for k in sorted(self._eps)]) return yield from super().modify_tensors(data_torch, name, bid)