# SPDX-License-Identifier: Apache-2.0# SPDX-FileCopyrightText: Copyright contributors to the vLLM projectfromtypingimportAnyimportmsgspecimportzmqfrommsgspec.msgpackimportDecoderfromvllm.v1.core.kv_cache_utilsimportExternalBlockHash## Types copied from vllm.distributed.kv_events#classEventBatch(msgspec.Struct,array_like=True,omit_defaults=True,gc=False):ts:floatevents:list[Any]classKVCacheEvent(msgspec.Struct,array_like=True,omit_defaults=True,gc=False,tag=True):"""Base class for all KV cache-related events"""classBlockStored(KVCacheEvent):block_hashes:list[ExternalBlockHash]parent_block_hash:ExternalBlockHash|Nonetoken_ids:list[int]block_size:intlora_id:int|Nonemedium:str|NoneclassBlockRemoved(KVCacheEvent):block_hashes:list[ExternalBlockHash]medium:str|NoneclassAllBlocksCleared(KVCacheEvent):passclassKVEventBatch(EventBatch):events:list[BlockStored|BlockRemoved|AllBlocksCleared]defprocess_event(event_batch):print(f"Received event batch at {event_batch.ts}:")foreventinevent_batch.events:print(f" - {event}")defmain():decoder=Decoder(type=KVEventBatch)last_seq=-1context=zmq.Context()# Set up the main subscription socketsub=context.socket(zmq.SUB)sub.connect("tcp://localhost:5557")topic="kv-events"sub.setsockopt_string(zmq.SUBSCRIBE,topic)# Initialize replay socketreplay=context.socket(zmq.REQ)replay.connect("tcp://localhost:5558")poller=zmq.Poller()poller.register(replay,zmq.POLLIN)print("Listening for KV cache events on topic:",topic)whileTrue:try:ifsub.poll(50):_,seq_bytes,payload=sub.recv_multipart()seq=int.from_bytes(seq_bytes,"big")iflast_seq>=0andseq>last_seq+1:missed=seq-last_seq-1print(f"Missed {missed} messages (last: {last_seq}, current: {seq})")replay.send((last_seq+1).to_bytes(8,"big"))whilepoller.poll(timeout=200):seq_bytes,replay_payload=replay.recv_multipart()ifnotreplay_payload:# End of replay marker is sent as an empty frame# for the payloadbreakreplay_seq=int.from_bytes(seq_bytes,"big")ifreplay_seq>last_seq:event_batch=decoder.decode(replay_payload)process_event(event_batch)last_seq=replay_seqifreplay_seq>=seq-1:breakevent_batch=decoder.decode(payload)process_event(event_batch)# ... do other periodic work or check for shutdown ...exceptKeyboardInterrupt:print("Interrupted")breakexceptExceptionase:print("Error decoding message:",e)if__name__=="__main__":main()