xffl.distributed.distributed_state

Attributes

logger

Default xFFL logger

Classes

DistributedState

This dataclass traces all the distributed environment parameters

Functions

create_device_mesh(→ torch.Tensor)

Creates a Tensor of distributed process ranks with the specified dimensions

Module Contents

xffl.distributed.distributed_state.logger: logging.Logger

Default xFFL logger

class xffl.distributed.distributed_state.DistributedState

This dataclass traces all the distributed environment parameters

backend: torch.distributed.distributed_c10d.Backend | None = None

Communication backend

master_addr: str | None = None

Rendez-vous address

master_port: int | None = None

Rendez-vous port

rank: int | None = None

Global rank

world_size: int | None = None

Global world size

node_local_rank: int | None = None

Rank of the process inside the local computing node

node_local_size: int | None = None

node size of a computing node

node_rank: int | None = None

Rank of the computing node with respect to all the other ones

node_world_size: int | None = None

Global number of computing nodes involved in the training process

replica_local_rank: int | None = None

Rank of the process inside the local replica sharding group

replica_local_size: int | None = None

Group size of a replica sharding group

replica_rank: int | None = None

Rank of the replica group with respect to all the other ones (eventually, inside the federated group)

replica_world_size: Tuple[int, Ellipsis] | None = None

Global number of replica sharding groups involved in the training process (eventually, inside the federated groups)

federated_local_rank: int | None = None

Rank of the process inside the local federated group

federated_local_size: Tuple[int, Ellipsis] | None = None

Group size of a federated group (eventually, list of group sizes if the federation is asymmetric)

federated_rank: int | None = None

Federated group rank with respect to all the other ones

federated_world_size: int | None = None

Global number of federated groups involved in the training process

fsdp_mesh: torch.distributed.device_mesh.DeviceMesh | None = None

FSDP device mesh

hsdp_mesh: torch.distributed.device_mesh.DeviceMesh | None = None

HSDP device mesh

is_sender: bool | None = None

True if the rank should communicate (All-Gather) across network cells, False otherwise

receive_from: int | None = None

The rank from which to receive the averaged parameters (Broadcast)

federated_group: Tuple[torch.distributed.ProcessGroup, Ellipsis] | None = None

Process group collecting ranks holding the same model’s shard across federated groups

replica_group: Tuple[torch.distributed.ProcessGroup, Ellipsis] | None = None

Process group collecting ranks holding the same model’s shard inside federated groups

federation: torch.distributed.ProcessGroup | None = None

Process group collecting all ranks participating in the same federated group

device_type: torch.device | None = None

Chosen deployment device

current_device: torch.device | int | None = None

Specific device currently in use by the process

init_device: torch.device | None = None

Chosen initialization device

meta_initialization: bool | None = None

True if meta initialization is enabled, False otherwise

streams: Tuple[torch.cuda.Stream, Ellipsis] | None = None

Pool of available CUDA streams

__str__()
set_global(backend: torch.distributed.distributed_c10d.Backend, device_type: torch.device, master_addr: str, master_port: int, rank: int, world_size: int) None

Set global process group information.

Parameters:
  • backend (Backend) – Communication backend to use

  • device_type (Literal["cpu", "cuda"]) – Type of device to use

  • master_addr (str) – Address of the master node for the rendez-vous

  • master_port (int) – Port of the master node for the rendez-vous

  • rank (int) – Global process rank

  • world_size (int) – Global world size

set_exec_device(current_device: torch.device | int, streams: int | None = None) None

Set the devices of the distributed process group.

Parameters:
  • current_device (torch.device | int) – Training device

  • streams (int) – Number of CUDA streams to instantiate, defaults to 4

set_init_device(init_device: torch.device | None, meta_initialization: bool = False) None

Set the devices of the distributed process group.

Parameters:
  • init_device (torch.device) – Initialization device

  • meta_initialization (bool) – If meta device initialization is required

set_node(node_local_rank: int, node_local_size: int, node_rank: int, node_world_size: int) None

Set the process’ information relative to the local node.

Parameters:
  • node_local_rank (int) – Local compute node rank

  • node_local_size (int) – World size of the local compute node

  • node_rank (int) – Rank of the local compute node among all the available nodes in the training

  • node_world_size (int) – Number of compute nodes involved in the training process

is_node_setup() bool

Checks if the local compute node information is set up.

Returns:

True if the local compute node information is set up, False otherwise

Return type:

bool

_get_global_fsdp_mesh() torch.distributed.device_mesh.DeviceMesh | None

Returns a standard global FSDP device mesh. Do not call this method if global FSDP is not required.

Returns:

A global FSDP device mesh if the distributed PyTorch environment is initialized, None otherwise

Return type:

Optional[DeviceMesh]

set_fsdp(mesh: torch.distributed.device_mesh.DeviceMesh | None = None) None

Enable PyTorch’s FSDP functionality. If no mesh specified, FSDP will be enabled on the global process group.

Parameters:

mesh (Optional[DeviceMesh]) – An FSDP device mesh, defaults to None

is_fsdp_setup() bool

Checks if FSDP is set up.

Returns:

True if FSDP is set up, False otherwise

Return type:

bool

_set_global_hsdp_mesh() torch.distributed.device_mesh.DeviceMesh | None

Returns a global HSD device mesh. Do not call this method if global HSD device is not required.

Returns:

A global HSDP device mesh if the distributed PyTorch environment is initialized, None otherwise

Return type:

Optional[DeviceMesh]

set_hsdp(hsdp: int) None

Enable global PyTorch’s HSDP functionality.

Parameters:

hsdp (int) – Size of an HSDP replica

_partial_hsdp_setup(hsdp: int) None

Initialize PyTorch’s HSDP parameters without creating the device mesh.

Parameters:

hsdp (int) – Size of an HSDP replica

_partial_hsdp_setup_manual(replica_local_rank: int, replica_local_size: int, replica_rank: int, replica_world_size: Tuple[int, Ellipsis]) None

Partial set up of PyTorch’s HSDP functionality; to complete it is necessary to instantiate also the HSDP device mesh.

Parameters:
  • replica_local_rank (int) – Rank of the current process within its model replica

  • replica_local_size (int) – Local world size of a model replica

  • replica_rank (int) – Rank of the current model replica among the current federated group replica world size

  • replica_world_size (Tuple[int,...]) – Number of replicas available for each federated group

is_hsdp_setup() bool

Checks if HSDP is set up. Does not check the HSDP device mesh.

Returns:

True if HSDP is set up, False otherwise

Return type:

bool

unset_hsdp() None

Unsets all HSDP related variables.

_set_rank_role() None
set_federated_scaling(federated_group_size: Tuple[int], hsdp: int | None = None) None
_get_communicating_processes(federated_rank: int) Tuple[int, Ellipsis]
_set_symmetric_federated_scaling(federated_group_size: Tuple[int]) None

Create the federated scaling process groups

Parameters:

federated_group_size (int) – Number of processes making up one federated group

_set_asymmetric_federated_scaling(federated_group_size: Tuple[int]) None

Create the federated scaling process groups

This process groups bring together all the ranks handling corresponding model’s shards. E.g.: if a model is sharded among four processes and replicated across two process groups (i.e., device_mesh=[[0,1,2,3],[4,5,6,7]]) then the federated scaling process groups correspond to the groups of processes having the same local rank (i.e., [[0,4][1,5][2,6][3,7]])

Parameters:

federated_group_size (int) – Number of processes making up one federated group

unset_federated_scaling() None

Unset Federated Scaling parameters

is_federated_scaling_setup() bool

Checks if Federated Scaling is set up.

Returns:

True if Federated Scaling is set up, False otherwise

Return type:

bool

create_process_group(ranks: Tuple[int, Ellipsis] | torch.Tensor, group_desc: str | None) torch.distributed.ProcessGroup

Creates a new process group with the specified ranks

Only the interested rank can enter this method

Parameters:
  • ranks (Tuple[int, ...]) – Ranks making up the group

  • group_desc (Optional[str]) – Description of the process group

Returns:

Process group handle

Return type:

ProcessGroup

xffl.distributed.distributed_state.create_device_mesh(mesh_shape: Tuple[int, Ellipsis]) torch.Tensor

Creates a Tensor of distributed process ranks with the specified dimensions

Parameters:

mesh_shape (Tuple[int, ...]) – Dimensions of the mesh

Returns:

Tensor of ranks

Return type:

torch.Tensor