xffl.distributed.distributed_state¶
Attributes¶
Default xFFL logger |
Classes¶
This dataclass traces all the distributed environment parameters |
Functions¶
|
Creates a Tensor of distributed process ranks with the specified dimensions |
Module Contents¶
- xffl.distributed.distributed_state.logger: logging.Logger¶
Default xFFL logger
- class xffl.distributed.distributed_state.DistributedState¶
This dataclass traces all the distributed environment parameters
- backend: torch.distributed.distributed_c10d.Backend | None = None¶
Communication backend
- master_addr: str | None = None¶
Rendez-vous address
- master_port: int | None = None¶
Rendez-vous port
- rank: int | None = None¶
Global rank
- world_size: int | None = None¶
Global world size
- node_local_rank: int | None = None¶
Rank of the process inside the local computing node
- node_local_size: int | None = None¶
node size of a computing node
- node_rank: int | None = None¶
Rank of the computing node with respect to all the other ones
- node_world_size: int | None = None¶
Global number of computing nodes involved in the training process
- replica_local_rank: int | None = None¶
Rank of the process inside the local replica sharding group
- replica_local_size: int | None = None¶
Group size of a replica sharding group
- replica_rank: int | None = None¶
Rank of the replica group with respect to all the other ones (eventually, inside the federated group)
- replica_world_size: Tuple[int, Ellipsis] | None = None¶
Global number of replica sharding groups involved in the training process (eventually, inside the federated groups)
- federated_local_rank: int | None = None¶
Rank of the process inside the local federated group
- federated_local_size: Tuple[int, Ellipsis] | None = None¶
Group size of a federated group (eventually, list of group sizes if the federation is asymmetric)
- federated_rank: int | None = None¶
Federated group rank with respect to all the other ones
- federated_world_size: int | None = None¶
Global number of federated groups involved in the training process
- fsdp_mesh: torch.distributed.device_mesh.DeviceMesh | None = None¶
FSDP device mesh
- hsdp_mesh: torch.distributed.device_mesh.DeviceMesh | None = None¶
HSDP device mesh
- is_sender: bool | None = None¶
True if the rank should communicate (All-Gather) across network cells, False otherwise
- receive_from: int | None = None¶
The rank from which to receive the averaged parameters (Broadcast)
- federated_group: Tuple[torch.distributed.ProcessGroup, Ellipsis] | None = None¶
Process group collecting ranks holding the same model’s shard across federated groups
- replica_group: Tuple[torch.distributed.ProcessGroup, Ellipsis] | None = None¶
Process group collecting ranks holding the same model’s shard inside federated groups
- federation: torch.distributed.ProcessGroup | None = None¶
Process group collecting all ranks participating in the same federated group
- device_type: torch.device | None = None¶
Chosen deployment device
- current_device: torch.device | int | None = None¶
Specific device currently in use by the process
- init_device: torch.device | None = None¶
Chosen initialization device
- meta_initialization: bool | None = None¶
True if meta initialization is enabled, False otherwise
- streams: Tuple[torch.cuda.Stream, Ellipsis] | None = None¶
Pool of available CUDA streams
- __str__()¶
- set_global(backend: torch.distributed.distributed_c10d.Backend, device_type: torch.device, master_addr: str, master_port: int, rank: int, world_size: int) None ¶
Set global process group information.
- Parameters:
backend (Backend) – Communication backend to use
device_type (Literal["cpu", "cuda"]) – Type of device to use
master_addr (str) – Address of the master node for the rendez-vous
master_port (int) – Port of the master node for the rendez-vous
rank (int) – Global process rank
world_size (int) – Global world size
- set_exec_device(current_device: torch.device | int, streams: int | None = None) None ¶
Set the devices of the distributed process group.
- Parameters:
current_device (torch.device | int) – Training device
streams (int) – Number of CUDA streams to instantiate, defaults to 4
- set_init_device(init_device: torch.device | None, meta_initialization: bool = False) None ¶
Set the devices of the distributed process group.
- Parameters:
init_device (torch.device) – Initialization device
meta_initialization (bool) – If meta device initialization is required
- set_node(node_local_rank: int, node_local_size: int, node_rank: int, node_world_size: int) None ¶
Set the process’ information relative to the local node.
- Parameters:
node_local_rank (int) – Local compute node rank
node_local_size (int) – World size of the local compute node
node_rank (int) – Rank of the local compute node among all the available nodes in the training
node_world_size (int) – Number of compute nodes involved in the training process
- is_node_setup() bool ¶
Checks if the local compute node information is set up.
- Returns:
True if the local compute node information is set up, False otherwise
- Return type:
bool
- _get_global_fsdp_mesh() torch.distributed.device_mesh.DeviceMesh | None ¶
Returns a standard global FSDP device mesh. Do not call this method if global FSDP is not required.
- Returns:
A global FSDP device mesh if the distributed PyTorch environment is initialized, None otherwise
- Return type:
Optional[DeviceMesh]
- set_fsdp(mesh: torch.distributed.device_mesh.DeviceMesh | None = None) None ¶
Enable PyTorch’s FSDP functionality. If no mesh specified, FSDP will be enabled on the global process group.
- Parameters:
mesh (Optional[DeviceMesh]) – An FSDP device mesh, defaults to None
- is_fsdp_setup() bool ¶
Checks if FSDP is set up.
- Returns:
True if FSDP is set up, False otherwise
- Return type:
bool
- _set_global_hsdp_mesh() torch.distributed.device_mesh.DeviceMesh | None ¶
Returns a global HSD device mesh. Do not call this method if global HSD device is not required.
- Returns:
A global HSDP device mesh if the distributed PyTorch environment is initialized, None otherwise
- Return type:
Optional[DeviceMesh]
- set_hsdp(hsdp: int) None ¶
Enable global PyTorch’s HSDP functionality.
- Parameters:
hsdp (int) – Size of an HSDP replica
- _partial_hsdp_setup(hsdp: int) None ¶
Initialize PyTorch’s HSDP parameters without creating the device mesh.
- Parameters:
hsdp (int) – Size of an HSDP replica
- _partial_hsdp_setup_manual(replica_local_rank: int, replica_local_size: int, replica_rank: int, replica_world_size: Tuple[int, Ellipsis]) None ¶
Partial set up of PyTorch’s HSDP functionality; to complete it is necessary to instantiate also the HSDP device mesh.
- Parameters:
replica_local_rank (int) – Rank of the current process within its model replica
replica_local_size (int) – Local world size of a model replica
replica_rank (int) – Rank of the current model replica among the current federated group replica world size
replica_world_size (Tuple[int,...]) – Number of replicas available for each federated group
- is_hsdp_setup() bool ¶
Checks if HSDP is set up. Does not check the HSDP device mesh.
- Returns:
True if HSDP is set up, False otherwise
- Return type:
bool
- unset_hsdp() None ¶
Unsets all HSDP related variables.
- _set_rank_role() None ¶
- set_federated_scaling(federated_group_size: Tuple[int], hsdp: int | None = None) None ¶
- _get_communicating_processes(federated_rank: int) Tuple[int, Ellipsis] ¶
- _set_symmetric_federated_scaling(federated_group_size: Tuple[int]) None ¶
Create the federated scaling process groups
- Parameters:
federated_group_size (int) – Number of processes making up one federated group
- _set_asymmetric_federated_scaling(federated_group_size: Tuple[int]) None ¶
Create the federated scaling process groups
This process groups bring together all the ranks handling corresponding model’s shards. E.g.: if a model is sharded among four processes and replicated across two process groups (i.e., device_mesh=[[0,1,2,3],[4,5,6,7]]) then the federated scaling process groups correspond to the groups of processes having the same local rank (i.e., [[0,4][1,5][2,6][3,7]])
- Parameters:
federated_group_size (int) – Number of processes making up one federated group
- unset_federated_scaling() None ¶
Unset Federated Scaling parameters
- is_federated_scaling_setup() bool ¶
Checks if Federated Scaling is set up.
- Returns:
True if Federated Scaling is set up, False otherwise
- Return type:
bool
- create_process_group(ranks: Tuple[int, Ellipsis] | torch.Tensor, group_desc: str | None) torch.distributed.ProcessGroup ¶
Creates a new process group with the specified ranks
Only the interested rank can enter this method
- Parameters:
ranks (Tuple[int, ...]) – Ranks making up the group
group_desc (Optional[str]) – Description of the process group
- Returns:
Process group handle
- Return type:
ProcessGroup
- xffl.distributed.distributed_state.create_device_mesh(mesh_shape: Tuple[int, Ellipsis]) torch.Tensor ¶
Creates a Tensor of distributed process ranks with the specified dimensions
- Parameters:
mesh_shape (Tuple[int, ...]) – Dimensions of the mesh
- Returns:
Tensor of ranks
- Return type:
torch.Tensor