vllm.distributed.utils
USE_SCHED_YIELD
module-attribute
¶
USE_SCHED_YIELD = (
version_info[:3] >= (3, 11, 1)
or version_info[:2] == (3, 10)
and version_info[2] >= 8
)
StatelessProcessGroup
dataclass
¶
A dataclass to hold a metadata store, and the rank, world_size of the group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.
Source code in vllm/distributed/utils.py
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
|
broadcast_recv_src_counter
class-attribute
instance-attribute
¶
entries
class-attribute
instance-attribute
¶
recv_src_counter
class-attribute
instance-attribute
¶
send_dst_counter
class-attribute
instance-attribute
¶
__init__
¶
__init__(
rank: int,
world_size: int,
store: Store,
socket: Optional[socket],
data_expiration_seconds: int = 3600,
send_dst_counter: dict[int, int] = dict(),
recv_src_counter: dict[int, int] = dict(),
broadcast_send_counter: int = 0,
broadcast_recv_src_counter: dict[int, int] = dict(),
entries: deque[tuple[str, float]] = deque(),
) -> None
__post_init__
¶
Source code in vllm/distributed/utils.py
all_gather_obj
¶
All gather an object from all ranks.
Source code in vllm/distributed/utils.py
barrier
¶
barrier(timeout: float = 30.0)
A robust barrier to synchronize all ranks.
Uses a multi-phase approach to ensure all processes reach the barrier before proceeding:
-
Each process signals it has reached the barrier
-
Each process signals that it has confirmed the arrival of all other ranks.
-
Rank 0 waits for all other ranks to signal their departure to ensure that all ranks have departed the barrier first.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout
|
float
|
Maximum time in seconds to wait for each phase (in seconds) |
30.0
|
Raises:
Type | Description |
---|---|
RuntimeError
|
If coordination fails or times out |
Source code in vllm/distributed/utils.py
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
|
broadcast_obj
¶
Broadcast an object from a source rank to all other ranks. It does not clean up after all ranks have received the object. Use it for limited times, e.g., for initialization.
Source code in vllm/distributed/utils.py
create
staticmethod
¶
create(
host: str,
port: int,
rank: int,
world_size: int,
data_expiration_seconds: int = 3600,
store_timeout: int = 300,
) -> StatelessProcessGroup
A replacement for torch.distributed.init_process_group
that does not
pollute the global state.
If we have process A and process B called torch.distributed.init_process_group
to form a group, and then we want to form another group with process A, B, C,
D, it is not possible in PyTorch, because process A and process B have already
formed a group, and process C and process D cannot join that group. This
function is a workaround for this issue.
torch.distributed.init_process_group
is a global call, while this function
is a stateless call. It will return a StatelessProcessGroup
object that can be
used for exchanging metadata. With this function, process A and process B
can call StatelessProcessGroup.create
to form a group, and then process A, B,
C, and D can call StatelessProcessGroup.create
to form another group.
Source code in vllm/distributed/utils.py
expire_data
¶
Expire data that is older than data_expiration_seconds
seconds.
Source code in vllm/distributed/utils.py
recv_obj
¶
Receive an object from a source rank.
send_obj
¶
Send an object to a destination rank.
Source code in vllm/distributed/utils.py
divide
¶
Ensure that numerator is divisible by the denominator and return the division value.
ensure_divisibility
¶
Ensure that numerator is divisible by the denominator.
get_pp_indices
¶
Try to evenly distribute layers across partitions.
If the number of layers is not divisible by the number of partitions, the remaining layers are evenly distributed across all but the last partition. The last partition is excluded because it often contains an additional norm layer and we are attempting to balance compute.
If pp_size > 2
and the number of remaining layers is
0 < x <= pp_size - 2
then the remaining layers are evenly distributed
across the middle partitions. The first and last partitions are excluded
because they contain the input and output embeddings respectively and we
are attempting to reduce maximum memory consumption across partitions.
Source code in vllm/distributed/utils.py
init_gloo_process_group
¶
init_gloo_process_group(
backend: Backend,
prefix_store: PrefixStore,
group_rank: int,
group_size: int,
timeout: timedelta,
) -> ProcessGroup
Stateless init ProcessGroup with gloo backend compatible with different torch versions.
Source code in vllm/distributed/utils.py
sched_yield
¶
split_tensor_along_last_dim
¶
split_tensor_along_last_dim(
tensor: Tensor,
num_partitions: int,
contiguous_split_chunks: bool = False,
) -> Sequence[Tensor]
Split a tensor along its last dimension.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tensor
|
Tensor
|
input tensor. |
required |
num_partitions
|
int
|
number of partitions to split the tensor |
required |
contiguous_split_chunks
|
bool
|
If True, make each chunk contiguous in memory. |
False
|
Returns:
Type | Description |
---|---|
Sequence[Tensor]
|
A list of Tensors |
Source code in vllm/distributed/utils.py
stateless_destroy_torch_distributed_process_group
¶
Destroy ProcessGroup returned by stateless_init_torch_distributed_process_group().
Source code in vllm/distributed/utils.py
stateless_init_torch_distributed_process_group
¶
stateless_init_torch_distributed_process_group(
host: str,
port: int,
rank: int,
world_size: int,
backend: str,
) -> ProcessGroup
A replacement for torch.distributed.init_process_group
that does not
pollute the global state. The created ProcessGroup object can be used for
some operations such as allreduce
, because it does not depend on the
global rank. However, some operations such as broadcast
cannot be used
because it depends on the global rank.
TODO: ask for help from PyTorch team if we need the broadcast
operation.¶
This function is useful when we are not sure about the total number of processes in the process group. For example, we may have process 1, 2, ..., 8 who want to communicate, and process 9 might be the same process as process 1, or it might be a different process; process 10 might be the same process as process 5, or it might be a different process. In this case, how can we reliably form a communication channel within process 9 and 10, without affecting the communication channel within process 1, 2, ..., 8?
One possible solution is to figure out if process 9 and 10 are the same as process 1 and 5 beforehand, and then form a communication channel based on the information, adjusting the ranks and world_size etc. However, figuring out the information is not always easy, and it will interfere with the main communication channel.
Our solution is to always form a communication channel with process 1, 2, ..., 8, and then use this function to form another communication channel with process 9 and 10. This way, regardless of whether process 9 and 10 are the same as process 1 and 5, the main communication channel is always formed with process 1, 2, ..., 8, and the additional communication channel is formed with process 9 and 10.