class CPUOffloadingSpec(OffloadingSpec):
def __init__(self, vllm_config: VllmConfig):
super().__init__(vllm_config)
num_cpu_blocks = self.extra_config.get("num_cpu_blocks")
if not num_cpu_blocks:
raise Exception(
"num_cpu_blocks must be specified in kv_connector_extra_config"
)
self.num_cpu_blocks: int = num_cpu_blocks
# scheduler-side
self._manager: Optional[OffloadingManager] = None
# worker-side
self._handler: Optional[OffloadingHandler] = None
def get_manager(self) -> OffloadingManager:
if not self._manager:
kv_events_config = self.vllm_config.kv_events_config
enable_events = (
kv_events_config is not None and kv_events_config.enable_kv_cache_events
)
self._manager = LRUOffloadingManager(
CPUBackend(
block_size=self.offloaded_block_size, num_blocks=self.num_cpu_blocks
),
enable_events=enable_events,
)
return self._manager
def get_handlers(
self, kv_caches: dict[str, torch.Tensor]
) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], OffloadingHandler]]:
if not self._handler:
if not current_platform.is_cuda():
raise Exception(
"CPU Offloading is currently only supported on CUDA GPUs"
)
layer_names = list(kv_caches.keys())
layers = get_layers_from_vllm_config(
self.vllm_config, AttentionLayerBase, layer_names
)
attn_backends = {
layer_name: layers[layer_name].get_attn_backend()
for layer_name in layer_names
}
self._handler = CpuGpuOffloadingHandler(
attn_backends=attn_backends,
gpu_block_size=self.gpu_block_size,
cpu_block_size=self.offloaded_block_size,
num_cpu_blocks=self.num_cpu_blocks,
gpu_caches=kv_caches,
)
assert self._handler is not None
yield GPULoadStoreSpec, CPULoadStoreSpec, self._handler
yield CPULoadStoreSpec, GPULoadStoreSpec, self._handler