# SPDX-License-Identifier: Apache-2.0# SPDX-FileCopyrightText: Copyright contributors to the vLLM project"""Test pause/resume with Data Parallel (DP) via HTTP API.This example demonstrates coordinated pause/resume across multiple DP ranks.The pause synchronizes across all DP engines via all-reduce.Prerequisites: Start a vLLM server with data parallelism: $ VLLM_SERVER_DEV_MODE=1 vllm serve facebook/opt-125m \ --enforce-eager \ --data-parallel-size 4 \ --tensor-parallel-size 1 Then run this script: $ python data_parallel_pause_resume.pyThe test verifies pause works by:1. Starting a streaming generation request2. Pausing the server mid-generation3. Sleeping for PAUSE_DURATION seconds4. Resuming the server5. Verifying there was a gap in token generation matching the pause duration"""importargparseimportthreadingimporttimeimportrequestsfromopenaiimportOpenAIBASE_URL="http://localhost:8000"MODEL_NAME="facebook/opt-125m"PAUSE_DURATION=3.0defpause_generation(base_url:str,mode:str="keep")->None:"""Pause generation via HTTP endpoint."""url=f"{base_url}/pause"response=requests.post(url,params={"mode":mode},timeout=60)response.raise_for_status()print("Server paused")defresume_generation(base_url:str)->None:"""Resume generation via HTTP endpoint."""url=f"{base_url}/resume"response=requests.post(url,timeout=60)response.raise_for_status()print("Server resumed")defmain():parser=argparse.ArgumentParser()parser.add_argument("--base-url",default=BASE_URL)parser.add_argument("--model",default=MODEL_NAME)args=parser.parse_args()client=OpenAI(base_url=f"{args.base_url}/v1",api_key="EMPTY",)prompt="Write a long story about a dragon. Once upon a time"token_times:list[float]=[]pause_token_idx=0pause_triggered=threading.Event()defgenerator_thread():"""Stream tokens and record timestamps."""stream=client.completions.create(model=args.model,prompt=prompt,max_tokens=50,stream=True,)forchunkinstream:ifchunk.choices[0].text:token_times.append(time.monotonic())token_count=len(token_times)print(f"Token {token_count}: {chunk.choices[0].text!r}")# Signal controller after some tokensiftoken_count>=5andnotpause_triggered.is_set():pause_triggered.set()defcontroller_thread():"""Pause and resume the server."""nonlocalpause_token_idx# Wait for some tokenspause_triggered.wait()print(f"\nPausing server (keep mode) at token {len(token_times)}...")pause_generation(args.base_url,mode="keep")pause_token_idx=len(token_times)print(f"Sleeping for {PAUSE_DURATION}s...")time.sleep(PAUSE_DURATION)print("Resuming server...")resume_generation(args.base_url)print("Resumed!\n")# Run both threadsgen_thread=threading.Thread(target=generator_thread)ctrl_thread=threading.Thread(target=controller_thread)gen_thread.start()ctrl_thread.start()gen_thread.join()ctrl_thread.join()# Check gap at the pause pointifpause_token_idx<len(token_times):pause_gap=token_times[pause_token_idx]-token_times[pause_token_idx-1]print(f"\nGap after pause (token {pause_token_idx} -> "f"{pause_token_idx+1}): {pause_gap:.3f}s")ifpause_gap>=PAUSE_DURATION*0.9:print("Test passed! Pause synchronized across DP ranks.")else:print(f"Test failed! Expected ~{PAUSE_DURATION}s gap, got {pause_gap:.3f}s")else:print("Test failed! No tokens were generated after resuming.")if__name__=="__main__":main()