
    sg                    $   d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
mZmZmZmZmZmZmZ ddlZddlmZ ddlmZmZmZ ddlmZ ddlmZ dgZ ej>                  e       Z!d	e
d
e"dejF                  de
fdZ$dejJ                  de"fdZ&ejN                  jP                   ejF                  d      fde
de)de*dejF                  de
f
dZ+ G d de      Z, G d d      Z- G d dej\                        Z/ G d d      Z0 G d dee      Z1y)zZero Redundancy Optimizer.    N)chain)AnyCallableDictListOptionalSetTypeUnion)JoinJoinableJoinHook)functional_optim_map)	OptimizerZeroRedundancyOptimizervaluenon_blockingdevicereturnc           
         t        | t        j                        r| j                  ||      S t        | t        t
        f      r8| D cg c]  }t        |||       }}t        | t              r|S t        |      S t        | t        j                  j                        r0| j                         D ci c]  \  }}|t        |||       c}}S | S c c}w c c}}w )aQ  
    Recursively searches lists, tuples, dicts and copies tensors to device if possible.

    Non-tensor values are passed as-is in the result.

    .. note:  These are all copies, so if there are two objects that reference
    the same object, then after this call, there will be two different objects
    referenced on the device.
    )r   r   r   )
isinstancetorchTensortolisttuple_recursive_copy_to_devicecollectionsabcMappingitems)r   r   r   valvalueskeys         d/var/www/html/venv/lib/python3.12/site-packages/torch/distributed/optim/zero_redundancy_optimizer.pyr   r      s     %&xx\x::%$' 
 &cVT
 
 $E40vCeFmC%001
 "KKM	
 S *,v 
 	
 L

s   C3Cparamc                     | j                   S )z]Return if a parameter is trainable, where trainability is equivalent to requiring a gradient.)requires_grad)r'   s    r&   _is_trainabler*   A   s        cpuobjsrc_rankgroupc                    t        j                         |k(  rt        j                         }t	        j
                  | |       t        |j                               }t	        j                  t        |      g      j                  |      }t	        j                  |      j                  |      }t        j                  |||d       t        j                  |||d       | S t	        j                  dg      j                  |      }t        j                  |||d       t	        j                  t        |j                               gt        j                   |      }t        j                  |||d       t        j                  |j#                         j%                               }t	        j&                  ||d      } | S )a>  
    Broadcasts an object to the given group.

    It will be sending the object if called from the source rank and receiving
    the object otherwise.

    Arguments:
        obj: object to broadcast; only used if called on the source rank.
        src_rank (int): source rank.
        group (``ProcessGroup``, optional): group used for the broadcast
            (default: ``dist.group.WORLD``).
        device (``torch.device``, optional): device to send from or receive
            to (default: ``torch.device("cpu")``).

    Returns:
        The broadcasted object.
    F)srcr/   async_opr   dtyper   )map_locationweights_only)distget_rankioBytesIOr   save	bytearray	getbuffer
LongTensorlenr   
ByteTensor	broadcastemptyintitemuint8r,   numpyload)	r-   r.   r/   r   bufferdatalength_tensordata_send_tensordata_recv_tensors	            r&   _broadcast_objectrM   F   sP   . }}("

3))+,((#d)588@ ++D144V<}(%%P'XUUS J ((!-008}(%%P ;;##%&'u{{6
 	'XUUS,00288:;jjf5IJr+   c                   $     e Zd Z fdZd Z xZS )_ZeROJoinHookc                 ^    t        |t              sJ d       || _        t        |           y )NzRZeRO join hook requires passing in a ZeroRedundancyOptimizer instance as the state)r   r   zerosuper__init__)selfrQ   	__class__s     r&   rS   z_ZeROJoinHook.__init__t   s4    $ 78 	
$	
8 	r+   c                 8    | j                   j                          y)z
        Perform an optimizer step.

        This step updates the joined process's shard of
        the parameters and broadcasts those parameters.
        N)rQ   steprT   s    r&   	main_hookz_ZeROJoinHook.main_hook|   s     			r+   )__name__
__module____qualname__rS   rY   __classcell__rU   s   @r&   rO   rO   s   s    r+   rO   c                   >    e Zd ZdZdedeej                     defdZy)_DDPBucketAssignmenta  
    Represent a :class:`DistributedDataParallel` bucket assignment.

    This means that a (possibly non-strict) subset of the parameters corresponding to
    a DDP bucket assigned to a rank to update.

    Attributes:
        bucket_index (int): index of the bucket determined by the DDP gradient
            bucket all-reduce order.
        parameters (List[torch.Tensor]): model parameters in the bucket
            assigned to this rank.
        offset (int): offset into the :class:`GradBucket` 's :meth:`parameters`
            giving the index of the first element in the passed-in
            ``parameters``; this equivalently indexes into the
            :class:`GradBucket` 's :meth:`gradients`.
        device (torch.device): device on which the parameters are stored.
        tensor (torch.Tensor): flattened tensor giving the data of the
            parameter subset assigned to the rank.
    bucket_index
parametersoffsetc                     || _         || _        || _        t        | j                        dk(  rt	        d      | j                  d   j
                  | _        d | _        y )Nr   Empty bucket assignment)ra   rb   rc   r?   
ValueErrorr   tensor)rT   ra   rb   rc   s       r&   rS   z_DDPBucketAssignment.__init__   sT     )$t1$677$(OOA$6$=$=.2r+   N)	rZ   r[   r\   __doc__rC   r   r   r   rS    r+   r&   r`   r`      s1    (33 &3 	3r+   r`   c                       e Zd ZdZdZdZdZy)_OverlapStatusah  
    Define possible statuses that :class:`ZeroRedundancyOptimizer` can be in when overlapping with :class:`DistributedDataParallel`.

    Attributes:
        ``UNINITIALIZED``: The ZeRO instance is effectively uninitialized and
            is waiting for DDP to finalize its bucketing.
        ``DDP_HAS_REBUILT_BUCKETS``: DDP has rebuilt its buckets, meaning that
            its bucketing is finalized. The ZeRO instance can now collect the
            necessary information about the DDP bucketing.
        ``INITIALIZED``: The ZeRO instance is fully initialized and can now
            optimize parameters.
    r         N)rZ   r[   r\   rh   UNINITIALIZEDDDP_HAS_REBUILT_BUCKETSINITIALIZEDri   r+   r&   rk   rk      s     MKr+   rk   c                   (    e Zd ZdZddZddZddZy)_OverlapInfoaP
  
    Information needed by :class:`ZeroRedundancyOptimizer` to overlap with :class:`DistributedDataParallel`.

    Arguments:
        world_size (int): world size of the process group being used.

    Attributes:
        shard_buckets (bool): if ``True``, then the assignment of each
            :class:`DistributedDataParallel` bucket is partitioned across
            possibly multiple :class:`ZeroRedundancyOptimizer` instances (i.e.
            across possibly multiple ranks) to approximate uniformity following
            a threshold given by the total parameter size divided by the world
            size; if ``False``, then each bucket is wholly assigned to a single
            :class:`ZeroRedundancyOptimizer` instance (i.e. to a single rank);
            this should be set to the value passed into the hook constructor.
        status (_OverlapStatus): current status; see :class:`_OverlapStatus`
            for more information.
        params_per_bucket (List[List[torch.Tensor]]): ``params_per_bucket[i]``
            gives the model parameters in the ``i``th bucket.
        params_per_rank (List[List[torch.Tensor]]): ``params_per_rank[i]``
            gives the model parameters assigned to the ``i``th rank, where the
            parameters are grouped by increasing bucket indices.
        offsets (Dict[int, int]): maps from bucket index to the offset in
            ``self.params_per_rank[rank]`` giving the index of the first
            parameter in that bucket, where ``rank`` is this process's own
            rank; the keys of this :class:`dict` are the bucket indices
            assigned to this rank.
        num_bucket_assignments (int): total number of bucket assignments across
            all ranks; this is equal to the number of
            :class:`DistributedDataParallel` gradient buckets if
            ``shard_buckets=False`` and possibly greater otherwise.
        total_size (int, optional): total size of all buckets (i.e. sum of
            ``param.numel()`` for all ``param`` across all buckets) if
            ``shard_buckets=True``; otherwise, ``None``.
        broadcast_handles (List[Work]): :class:`list` of async work handles for
            the parameter broadcasts.
        bucket_index_to_future (Dict[int, torch.futures.Future]):
            :class:`dict` mapping bucket index to the corresponding all-reduce
            future.
        bucket_index_to_bucket (Dict[int, dist.GradBucket]): :class:`dict`
            mapping bucket index to the corresponding bucket.
        bucket_indices_seen (List[int]): :class:`list` of the bucket indices
            seen on this iteration.
    Nc                     t         j                  | _        d| _        g | _        t        |      D cg c]  }g  c}| _        i | _        g | _        d| _	        d | _
        g | _        g | _        i | _        i | _        y c c}w )NFr   )rk   rn   statusshard_bucketsparams_per_bucketrangeparams_per_rankoffsetsassigned_ranks_per_bucketnum_bucket_assignments
total_sizebroadcast_handlesbucket_indices_seenbucket_index_to_futurebucket_index_to_bucket)rT   
world_size_s      r&   rS   z_OverlapInfo.__init__   s    &4&B&B#( <>FKJFW9X"9X')9;&+,#)- -/.0 GI#BD# :Ys   	A;c                    t        | j                        | j                  k(  sJ dt        j                                 | j                  D cg c]  }|j                          }}| j                  j                          yc c}w )a  
        Wait for all parameter broadcasts.

        This function should be called once all broadcasts have been scheduled,
        meaning ``self.broadcast_handles`` is filled. This clears ``self.broadcast_handles``
        in preparation for the next iteration.
        z.Missing at least one broadcast handle on rank N)r?   r}   r{   r7   r8   waitclear)rT   xr   s      r&   wait_for_broadcastsz _OverlapInfo.wait_for_broadcasts   ss     &&'4+F+FF	N;DMMO;LM	NF#556!QVVX66$$& 7s   Bc                     | j                   j                          | j                  j                          | j                  j                          y)z
        Clear the data structures that are modified per-iteration.

        This function should be called at the end of an iteration.
        N)r~   r   r   r   rX   s    r&   clear_per_iter_infoz _OverlapInfo.clear_per_iter_info  s:     	  &&(##))+##))+r+   r   N)rZ   r[   r\   rh   rS   r   r   ri   r+   r&   rr   rr      s    +ZE(',r+   rr   c                       e Zd ZdZ	 	 	 d=dee   dee   dededef
dZ	d>d
Z
deeef   d	df fdZd?ded	dfdZdeeej$                        d	dfdZdeeef   deeej$                        d	dfdZ	 d@deeeej$                           d	eee      fdZed	eej$                  ef   fd       Zed	eej$                  ef   fd       Zed	eej$                     fd       ZdefdZd Zed	eej8                  eeej$                        f   fd       Z	 d@dee   deee      d	efdZdedeej$                     ded ed!eee      d	dfd"Z ed	eeee!f      fd#       Z"	 	 dAd$eeeej$                           d%ee#g e$f      d&ed	ee$   fd'Z%	 d@d%ee#g e$f      d&ed	ee$   fd(Z&d) Z'ed	ej8                  fd*       Z(ed	efd+       Z)d,eeef   d	df fd-Z*d	eeef   f fd.Z+e,d/eeeef      d0eeeef      d	dfd1       Z-d>d2Z.d>d3Z/d4ed	e0eej$                     ee1   f   fd5Z2d>d6Z3d	ee   fd7Z4d>d8Z5d>d9Z6ded	efd:Z7d; Z8ded	efd<Z9 xZ:S )Br   a'  
    Wrap an arbitrary :class:`optim.Optimizer <torch.optim.Optimizer>` and shards its states across ranks in the group.

    The sharing is done as described by ZeRO_.

    The local optimizer instance in each rank is only
    responsible for updating approximately ``1 / world_size`` parameters and
    hence only needs to keep ``1 / world_size`` optimizer states. After
    parameters are updated locally, each rank will broadcast its parameters to
    all other peers to keep all model replicas in the same state.
    ``ZeroRedundancyOptimizer`` can be used in conjunction with
    :class:`torch.nn.parallel.DistributedDataParallel` to reduce per-rank peak
    memory consumption.

    ``ZeroRedundancyOptimizer`` uses a sorted-greedy algorithm to pack a number
    of parameters at each rank. Each parameter belongs to a single rank and is
    not divided among ranks. The partition is arbitrary and might not match the
    the parameter registration or usage order.

    Arguments:
        params (``Iterable``): an ``Iterable`` of :class:`torch.Tensor` s
            or :class:`dict` s giving all parameters, which will be sharded
            across ranks.

    Keyword Args:
        optimizer_class (:class:`torch.nn.Optimizer`): the class of the local
            optimizer.
        process_group (``ProcessGroup``, optional): ``torch.distributed``
            ``ProcessGroup`` (default: ``dist.group.WORLD`` initialized by
            :meth:`torch.distributed.init_process_group`).
        parameters_as_bucket_view (bool, optional): if ``True``, parameters are
            packed into buckets to speed up communication, and ``param.data``
            fields point to bucket views at different offsets; if ``False``,
            each individual parameter is communicated separately, and each
            ``params.data`` stays intact (default: ``False``).
        overlap_with_ddp (bool, optional): if ``True``, :meth:`step` is
            overlapped with :class:`DistributedDataParallel` 's gradient
            synchronization; this requires (1) either a functional optimizer
            for the ``optimizer_class`` argument or one with a functional
            equivalent and (2) registering a DDP communication hook
            constructed from one of the functions in ``ddp_zero_hook.py``;
            parameters are packed into buckets matching those in
            :class:`DistributedDataParallel`, meaning that the
            ``parameters_as_bucket_view`` argument is ignored.
            If ``False``, :meth:`step` runs disjointly after the backward pass
            (per normal).
            (default: ``False``)
        **defaults: any trailing arguments, which are forwarded to the local
            optimizer.

    Example::

        >>> # xdoctest: +SKIP
        >>> import torch.nn as nn
        >>> from torch.distributed.optim import ZeroRedundancyOptimizer
        >>> from torch.nn.parallel import DistributedDataParallel as DDP
        >>> model = nn.Sequential(*[nn.Linear(2000, 2000).to(rank) for _ in range(20)])
        >>> ddp = DDP(model, device_ids=[rank])
        >>> opt = ZeroRedundancyOptimizer(
        >>>     ddp.parameters(),
        >>>     optimizer_class=torch.optim.Adam,
        >>>     lr=0.01
        >>> )
        >>> ddp(inputs).sum().backward()
        >>> opt.step()

    .. warning::
        Currently, ``ZeroRedundancyOptimizer`` requires that all of the
        passed-in parameters are the same dense type.

    .. warning::
        If you pass ``overlap_with_ddp=True``, be wary of the following: Given
        the way that overlapping :class:`DistributedDataParallel` with
        :class:`ZeroRedundancyOptimizer` is currently implemented, the first
        two or three training iterations do not perform parameter updates in
        the optimizer step, depending on if ``static_graph=False`` or
        ``static_graph=True``, respectively. This is because it needs
        information about the gradient bucketing strategy used by
        :class:`DistributedDataParallel`, which is not finalized until the
        second forward pass if ``static_graph=False`` or until the third
        forward pass if ``static_graph=True``. To adjust for this, one option
        is to prepend dummy inputs.

    .. warning:: ZeroRedundancyOptimizer is experimental and subject to change.

    .. _ZeRO: https://arxiv.org/abs/1910.02054

    Noptimizer_classprocess_groupparameters_as_bucket_viewoverlap_with_ddpdefaultsc                    | j                  |      }| j                          d| _        t        j                  | ||       t        j                  |        i | _        i | _        g | _        g | _	        i | _
        g | _        | j                         | _        | j                  d   j                  | _        ||nt"        j$                  j&                  | _        t#        j*                  | j(                        | _        t#        j.                  | j(                        | _        t"        j2                  j5                  | j(                  | j0                        | _        || _        || _        | j=                  |      | _        |s| jA                          n1tC        | j,                        | _"        |rtF        jI                  d       || _%        g | _&        | jO                          g | _(        d| _        y)zInit.Fr   Nz`parameters_as_bucket_view=True` will be ignored since `overlap_with_ddp=True`; instead, a different bucketing strategy will be usedT))_verify_and_init_params_verify_same_dense_param_typeinitializedr   rS   r   _param_to_rank_cache_param_to_index_cache_partition_parameters_cache_index_to_param_cache _device_to_params_per_rank_cache"_bucket_assignments_per_rank_cache_get_is_trainable_mask_is_trainable_mask_all_paramsr   _default_devicer7   r/   WORLDr   get_world_sizer   r8   rankdistributed_c10dget_global_rankglobal_rank_overlap_with_ddp_optim_defaults_get_optimizer_constructor_optim_constructor_init_local_optimizerrr   _overlap_infologgerwarningr   _buckets_build_param_buckets_all_state_dicts)rT   paramsr   r   r   r   r   s          r&   rS   z ZeroRedundancyOptimizer.__init__s  s    --f5**, !42$
 >@!>@"=?(9;"  	-
  	/ #'"="="?  $//299 +6MDJJ<L<L 	  $2243E3EFt'9'9:	 $ 5 5 E E		!
 (8'"&"A"A/"R  &&(/;DOO/LD(, *C&24!!# 79r+   r   c                 <   | j                   j                          | j                  j                          | j                  j                          | j                  j                          | j
                  j                          | j                  j                          y)z>Clear the cached data structures giving partition information.N)r   r   r   r   r   r   r   rX   s    r&   _clear_cachez$ZeroRedundancyOptimizer._clear_cache  sn    ((..0!!'')""((*""((*--335//557r+   param_groupc                    | j                   r| j                  rt        d      t        |   |       | j                   r| j                          | j                         | j                     }t        |      t        | j                  j                        dz   k(  r| j                  j	                  |d          | j                  r| j                          yyy)a%  
        Add a parameter group to the :class:`Optimizer` 's ``param_groups``.

        This can be useful when fine tuning a pre-trained network, as frozen
        layers can be made trainable and added to the :class:`Optimizer` as
        training progresses.

        Arguments:
            param_group (dict): specifies the parameters to be optimized and
                group-specific optimization options.

        .. warning:: This method handles updating the shards on all partitions
            but needs to be called on all ranks. Calling this on a subset of
            the ranks will cause the training to hang because communication
            primitives are called depending on the managed parameters and
            expect all the ranks to participate on the same set of parameters.
        z[ZeroRedundancyOptimizer with `overlap_with_ddp=True` only supports a single parameter grouprl   N)r   r   RuntimeErrorrR   add_param_groupr   _partition_parametersr   r?   optimparam_groupsr   r   )rT   r   r   rU   s      r&   r   z'ZeroRedundancyOptimizer.add_param_group  s    $  6 64 
 	,
 557		BL < C

(?(?$@1$DD

**<+;< --))+ . r+   r   c           
         | j                          | j                  | j                  | j                  j                         t	        j
                  dgt        j                  | j                        }g | _        t        | j                        D ]|  }t        j                  j                  | j                  |      }| j                  |k(  r|| j                  k(  rS| j                  j!                  t#        | j                  j%                         dt	        j&                  d                   t)        ||| j                  | j                        }| j                  j!                  t#        |dt	        j&                  d                   || j                  k(  rGt)        | j                  j%                         | j*                  | j                  | j                        }S||k7  sZt)        ||| j                  | j                        } y)aG  
        Consolidate a list of ``state_dict`` s (one per rank) on the target rank.

        Arguments:
            to (int): the rank that receives the optimizer states (default: 0).

        Raises:
            RuntimeError: if ``overlap_with_ddp=True`` and this method is
                called before this :class:`ZeroRedundancyOptimizer` instance
                has been fully initialized, which happens once
                :class:`DistributedDataParallel` gradient buckets have been
                rebuilt.

        .. warning:: This needs to be called on all ranks.
        r   r3   Tr,   r   )r.   r/   r   N)_check_overlap_initialized_sync_param_groupsr   r   r   rg   rE   r   r   rw   r   r7   r   r   r   r   appendr   
state_dictr   rM   r   )rT   r   empty_messengerr   r   local_state_dictr   s          r&   consolidate_state_dictz.ZeroRedundancyOptimizer.consolidate_state_dict  s     	'') 	 1 14::3J3JK  ,,Cu{{4+?+?
 !#$//* 0	D//??""DK yyB 499$))001 JJ113)-#(<<#6 (9'!,"00#33	($ ))001,)-#(<<#6 499$)

--/!%!1!1"00#33	A RZ *'!,"00#33	AW0	r+   rx   c                     t        |      | j                  k7  rt        d      t        | j                        }|D ]  }|D ]  }||vst        d        y)a  
        Verify ``params_per_rank`` for :meth:`_partition_parameters`.

        The verification is done by checking that ``params_per_rank`` has length equal
        to the world size and that it does not contain any parameters not passed into the
        :class:`ZeroRedundancyOptimizer` constructor.

        The parameters in ``params_per_rank`` being a strict subset of those
        passed into the constructor is valid since some parameters may be
        frozen.

        Raises:
            ValueError: if ``params_per_rank`` does not have length equal to
                the world size or if it contains a parameter that was not
                passed into the :class:`ZeroRedundancyOptimizer` constructor.
        z:`params_per_rank` must have length equal to the world sizezmPassing a new parameter in `params_per_rank` that was not passed into the ZeroRedundancyOptimizer constructorN)r?   r   rf   setr   )rT   rx   all_params_setr   r'   s        r&   _verify_params_per_rankz/ZeroRedundancyOptimizer._verify_params_per_rankJ  so    ( 4??2L  T--.% 	F .$& 	r+   c                     t        |      D ]=  \  }}t        j                  |      }||d<   | j                  |   j                  |       ? y)aR  
        Partition the parameter group ``param_group`` according to ``params_per_rank``.

        The partition will modify the ``self._partition_parameters_cache``. This method should
        only be used as a subroutine for :meth:`_partition_parameters`.

        Arguments:
            param_group (dict[str, Any]): a parameter group as normally defined
                in an optimizer state.
            params_per_rank (list[list[torch.Tensor]]): a :class:`list` of
                length world size containing :class:`list` s of parameters to
                assign to each rank.
        r   N)	enumeratecopyr   r   )rT   r   rx   r   r   rank_param_groups         r&   _partition_param_groupz.ZeroRedundancyOptimizer._partition_param_groupl  sR      &o6 	LLD&#yy5)/X&,,T299:JK	Lr+   c                 h   |t        | j                        dk(  rt        | j                        D cg c]  }g  c}| _        dg| j                  z  }| j                  D ]  }t        | j                        D cg c]  }g  }}t        |d   d d      }|D ]B  }| j                  |      }||   j                  |       ||xx   |j                         z  cc<   D | j                  ||        | j                  S t        | j                        dk(  sJ d       t        | j                        dk7  rt        d      | j                  |       t        | j                        D cg c]  }g  c}| _        | j                  d   }| j                  ||       | j                  S c c}w c c}w c c}w )	aw  
        Partitions parameters across distributed data parallel ranks.

        Arguments:
            params_per_rank (list[list[torch.Tensor]], optional): a
                :class:`list` of length world size containing :class:`list` s
                of parameters to assign to each rank; this provides a way to
                specify a partition manually.
                If ``None``, the parameters are partitioned according to an
                internal algorithm.
                (default: ``None``)

        Returns:
            A :class:`list` where each element of the list contains the
            ``param_groups`` for a rank (which itself is a :class:`list` of
            :class:`dict`); element 0 corresponds to rank 0, etc.; each rank
            stores the ``param_groups`` for all ranks for the collective
            communication in :meth:`step`.

        Raises:
            ValueError: see :meth:`_validate_params_per_rank`.
            RuntimeError: if ``params_per_rank`` is not ``None`` and this
                :class:`ZeroRedundancyOptimizer` instance is using more than
                one parameter group.
        r   r   c                 "    | j                         S Nnumel)ts    r&   <lambda>z?ZeroRedundancyOptimizer._partition_parameters.<locals>.<lambda>  s    QWWY r+   T)r%   reversezbSpecifying `params_per_rank` should only be done when the parameters have not been partitioned yetrl   zCSpecifying `params_per_rank` only supports a single parameter group)r?   r   rw   r   r   sorted_get_min_indexr   r   r   r   r   )	rT   rx   r   sizesr   param_group_params_per_rankparams_sortedr'   r   s	            r&   r   z-ZeroRedundancyOptimizer._partition_parameters  s   : "43349@Edoo@V3W1B3W0doo-#'#4#4 K$)$//$:? ?/ ? %+#H-3FPT%M "/ 5#22593D9@@Gdu{{}4	5 //#%@$ 333 43349 	
7	
9 t  !Q&U  	$$_58=doo8N+O1B+O( ''*##KA///M 4X?: ,Ps   	F%9	F*'	F/c                     t        | j                        dk(  rAt        | j                               D ]%  \  }}|D ]  }|d   D ]  }|| j                  |<     ' | j                  S )zW:class:`dict` mapping parameters to their assigned data parallel rank in the partition.r   r   )r?   r   r   r   )rT   r   r   r   r'   s        r&   _param_to_rankz&ZeroRedundancyOptimizer._param_to_rank  s     t(()Q.&/0J0J0L&M @"l#/ @K!,X!6 @;?11%8@@@ (((r+   c                     t        | j                        dk(  r;t        t        d | j                  D               D ci c]  \  }}||
 c}}| _        | j                  S c c}}w )z
        :class:`dict` mapping parameters to their indices in the global optimizer state.

        NOTE: This assumes that the global optimizer state's indexing (in
        ``state_dict``) follows a linear ordering over the parameter groups.
        r   c              3   &   K   | ]	  }|d      ywr   Nri   .0gs     r&   	<genexpr>z:ZeroRedundancyOptimizer._param_to_index.<locals>.<genexpr>  s     -Uaak-U   )r?   r   r   r   r   )rT   ips      r&   _param_to_indexz'ZeroRedundancyOptimizer._param_to_index  se     t))*a/ &e-U4CTCT-U&VW*Aq 1*D& )))	*s   A c                     t        | j                        dk(  r't        t        d | j                  D               | _        | j                  S )zSList mapping parameter indices in the global optimizer scheme to the actual params.r   c              3   &   K   | ]	  }|d      ywr   ri   r   s     r&   r   z:ZeroRedundancyOptimizer._index_to_param.<locals>.<genexpr>  s     ?(?r   )r?   r   r   r   r   rX   s    r&   _index_to_paramz'ZeroRedundancyOptimizer._index_to_param  sF     t))*a/)-?T->->?@*D& )))r+   r   c           
      D   | j                   rJ d       g }| j                  rt| j                  D ]c  }||   }t        j                  j                  | j                  |      }|j                  t        j                  ||| j                  d             e |S | j                         |   }t        j                  j                  | j                  |      }|D ]H  }|d   D ]>  }|j                  t        j                  |j                  || j                  d             @ J |S )a8  
        Broadcast the shard of parameters from a given rank to all other ranks asynchronously.

        Arguments:
            rank (int): the source rank.

        Returns:
            A :class:`list` of async work handles for the ``broadcast()`` s
            performed to synchronize the parameters.
        z`_broadcast_params_from_rank()` should not be used if `overlap_with_ddp=True`; instead, the broadcasting should happen in the DDP communication hookT)rg   r1   r/   r2   r   )r   r   r   r7   r   r   r   r   rA   r   rI   )	rT   r   handlesdev_i_bucketsbucketr   r   r   r'   s	            r&   _broadcast_params_from_rankz3ZeroRedundancyOptimizer._broadcast_params_from_rank  s0    )) 	
3	
)
 ))!% &t,"33CC&& NN%'"00!%	8   557=L//??""DK  , 	(2 ENN#(:: +"&"4"4%)		 r+   c                     g }t        | j                        D ]"  }|j                  | j                  |             $ |D cg c]  }|j	                          }}yc c}w )ai  
        Sync all parameter shards across the ranks.

        This rank sends its shard of the parameters to all other ranks and
        receives a shard from each other rank. This is done using
        ``broadcast()``. Parameters are sent bucket-by-bucket if
        ``parameters_as_bucket_view=True``and sent parameter-by-parameter
        otherwise.
        N)rw   r   extendr   r   )rT   r   r   r   r   s        r&   _sync_paramsz$ZeroRedundancyOptimizer._sync_params  sV     $//* 	CDNN4;;DAB	C&'!QVVX'''s   Ac                    | j                   sJ d       t        | j                        dk(  rt        | j	                               D ]  \  }}|D ]w  }|d   D ]m  }|j
                  }|| j                  vr0t        | j                        D cg c]  }g  c}| j                  |<   | j                  |   |   j                  |       o y  | j                  S c c}w )au  
        Return device parameters assigned per rank.

        :class:`dict` mapping each device to a :class:`list` of the per-rank parameter
        lists filtered to only include the parameters stored on that device.
        Each per-rank parameter list gives the parameters assigned to that rank
        to update.

        This is used for constructing the parameter buckets if
        ``parameters_as_bucket_view=True``.

        Let ``dev_i`` denote the ``i``th device for this rank. Then:
        ``dev_0`` maps to a list containing:
            rank 0's assigned parameters stored on ``dev_0``,
            rank 1's assigned parameters stored on ``dev_0``,
            ...
        ``dev_1`` maps to a list containing:
            rank 0's assigned parameters stored on ``dev_1``,
            rank 1's assigned parameters stored on ``dev_1``,
            ...
        ...
        zT`_device_to_params_per_rank` should only be used if `parameters_as_bucket_view=True`r   r   )	r   r?   r   r   r   r   rw   r   r   )rT   r   r   r   r'   r   r   s          r&   _device_to_params_per_rankz2ZeroRedundancyOptimizer._device_to_params_per_rank*  s    4 -- 	
/	
- t445:&/0J0J0L&M 
"l#/ 	K!,X!6 !&!)N)NN,1$//,BM'(MDAA&I ==fEdKRR!	
 444Ms   		C
r$   disallowed_indicesc                     d}t        d      }t        |      D ]  \  }}|r||v r||k  s|}|} |dk\  sJ d       |S )aq  
        Return ``values.index(min(values))``, except only uses one pass.

        It also excludes any indices in ``disallowed_indices`` if provided.

        Arguments:
            values: (List[int]): :class:`list` of values.
            disallowed_indices (Optional[Set[int]]): indices that are
                disallowed from being the returned min index.
        r   infr   zAll indices are disallowed)floatr   )rT   r$   r   	min_index	min_valuer   r   s          r&   r   z&ZeroRedundancyOptimizer._get_min_indexV  sd     	%L	!&) 	HAu!a+=&=y !			 A~;;;~r+   ra   bucket_paramsbucket_offsetassigned_rankrz   c                    | j                   }t        |      dk(  rt        d      |j                  }|j                  }t        |||      | j                  |   |<   | j                  |k(  rt        ||         ||<   ||   j                  |       ||   j                  |       | j                   xj                  dz  c_
        y)a  
        Assign ``bucket_params`` to the rank with the least size assigned so far and collects relevant information.

        The model parameters given by ``bucket_params`` represents a (possibly non-strict)
        subset of the parameters corresponding to a :class:`DistributedDataParallel` bucket.

        Arguments:
            bucket_index (int): index of the :class:`DistributedDataParallel`
                gradient bucket.
            bucket_params (List[torch.Tensor]): subset of the parameters
                corresponding to the bucket to assign.
            bucket_offset (int): offset giving the index of the first element
                in ``bucket_params`` in the bucket's full parameter list.
            assigned_rank (int): group rank to assign to.
            assigned_ranks_per_bucket (List[Set[int]]): :class:`set` of group ranks
                assigned to each bucket.
        r   re   rl   N)r   r?   rf   rx   ry   r`   r   r   r   addr{   )	rT   ra   r   r   r   rz   overlap_inforx   ry   s	            r&   _assign_bucket_subset_to_rankz5ZeroRedundancyOptimizer._assign_bucket_subset_to_rankp  s    2 ))}"677&66&& !}mL 	//>	
 },$'(F$GGL!&--m<!,/33MB11Q61r+   c           	      ,   | j                   sJ d       t        | j                        dkD  r| j                  S | j                  }|j                  t
        j                  k(  sJ t        | j                        D cg c]  }i  c}| _        |j                  }|j                  rN|j                  J d       |j                  | j                  z  }t        | j                        D cg c]  }d }}t        |      }t        |      D cg c]  }t                c}|_        |j                  }|j                  sZt        |      D ]@  \  }}	t        |	      dkD  sJ d       | j                  |      }
| j!                  ||	d|
|       B | j                  S t#        t        |      d       }|D ]  \  }}	t        |	      dkD  sJ d       d}d}t        |	      D ]e  \  }}|j%                         }||z   k\  rC||kD  r>| j'                  ||         }
| j!                  ||	|| ||
|       ||
xx   |z  cc<   |}d}||z  }g | j'                  ||         }
| j!                  ||	|d ||
|       ||
xx   |z  cc<    | j                  S c c}w c c}w c c}w )z
        Return DDP bucket parameters assigned per rank.

        :class:`list` of length world size consisting of :class:`dict` s
        mapping bucket indices to :class:`_DDPBucketAssignment` s for each
        rank.
        zF`_bucket_assignments_per_rank` only be used if `overlap_with_ddp=True`r   Nz`total_size` was not computedEmpty bucketc                 ,    t        d | d   D              S )Nc              3   <   K   | ]  }|j                           y wr   r   r   r   s     r&   r   zYZeroRedundancyOptimizer._bucket_assignments_per_rank.<locals>.<lambda>.<locals>.<genexpr>  s     ?Xa	?X   rl   )sumr   s    r&   r   zFZeroRedundancyOptimizer._bucket_assignments_per_rank.<locals>.<lambda>  s    C?XSTUVSW?X<X r+   )r%   )r   r?   r   r   rt   rk   rp   rw   r   rv   ru   r|   r   rz   r   _get_assigned_rankr   r   r   r   )rT   r   r   rv   	thresholdsize_per_ranknum_bucketsrz   ra   r   r   params_per_bucket_enumr   assignment_sizeparam_indexr'   param_numels                    r&   _bucket_assignments_per_rankz4ZeroRedundancyOptimizer._bucket_assignments_per_rank  s    ""	TS	T"t667!;:::))""n&@&@@@@?DT__?U2V!22V/(::%%**6W8WW6$//$//AI(-doo(>?1Q?M?+,AF{AS1TA#%1T.$0$J$J!))/89J/K 	+m=)A-=~=- $ 7 7 E22 !!-	z 666W &,+,2X&" 0F &@+m=)A-=~=- !"#*3M*B 3&K"'++-K'+5B'-7(,(;(;)+D\+R)
 ::()-D))5 &m4G4(3*+#{2O+30 !% 3 3!#<\#J! 22 !-.1!!- m,?,M&@P 666W 3W @ 2Us   :	J&	J	J	gradientsclosurekwargsc                    t        j                  |        | j                         }|| j                  k7  rC| j                  rt        d      t        j                  d       | j                          || _        | j                  | j                  | j                  j                         |=| | j                  j                  di |n | j                  j                  dd|i|}n8| j                  sJ d       |J d       | j                  j                  |      }| j                  | j                  j                  | j                         |S )a  
        Perform a single optimizer step without syncing parameters across ranks.

        Arguments:
            gradients (list[Optional[torch.Tensor]], optional): a :class:`list`
                of length equal to the number of parameters assigned to this
                rank containing gradient tensors or ``None`` as its elements;
                a ``None`` in the :class:`list` indicates that the
                corresponding parameter should not be updated.
                If the argument itself is ``None``, then all parameters are
                updated, and the gradients are assumed to be already populated.
                (default: ``None``)
            closure (Callable): a closure that re-evaluates the model and
                returns the loss; optional for most optimizers and should be
                ``None`` if ``gradients`` is not ``None``; (default: ``None``)
        Returns:
            Optional loss depending on the underlying local optimizer.

        .. warning::
            The argument ``gradients`` should only be specified (i.e. not
            ``None``) if ``overlap_with_ddp=True``, in which case
            :class:`ZeroRedundancyOptimizer` wraps a functional optimizer.
        zqZeroRedundancyOptimizer with `overlap_with_ddp=True` does not support changing parameter trainability at run timezsZeroRedundancyOptimizer detected that the trainable parameters changed; rebuilding the parameter buckets if enabledr  zGSpecifying `gradients` should not be used when `overlap_with_ddp=False`zB`closure` is not supported when using a local functional optimizer)r  ri   )r   notify_join_contextr   r   r   r   r   r   r   r   r   r   rW   )rT   r  r  r  is_trainable_masklosss         r&   _local_stepz#ZeroRedundancyOptimizer._local_step  sK   : 	  & 779 7 77%%" 
 NN
 %%'&7D# 	 1 14::3J3JK  ?  

)&)$TZZ__?W??  )) 8)
 TST::??Y?7D 	

 7 79J9JKr+   c                     | j                   rt        j                  d       y | j                  dd|i|}| j	                          |S )a  
        Perform a single optimizer step and syncs parameters across all ranks.

        Arguments:
            closure (Callable): a closure that re-evaluates the model and
                returns the loss; optional for most optimizers.
        Returns:
            Optional loss depending on the underlying local optimizer.

        .. note: Any extra parameters are passed to the base optimizer as-is.
        zQ`step()` should not be included in the training loop when `overlap_with_ddp=True`Nr  ri   )r   r   r   r  r   )rT   r  r  r  s       r&   rW   zZeroRedundancyOptimizer.stepA  sR      !!NN*   t::6: 	r+   c                     t        |       S )a~  
        Return the ZeRO join hook.

        It enables training on uneven inputs by
        shadowing the collective communications in the optimizer step.

        Gradients must be properly set before this hook is called.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.

        This hook does not support any keyword arguments; i.e. ``kwargs`` is
        unused.
        )rO   )rT   r  s     r&   	join_hookz!ZeroRedundancyOptimizer.join_hook`  s    $ T""r+   c                     | j                   S )zReturn default device.)r   rX   s    r&   join_devicez#ZeroRedundancyOptimizer.join_devicet  s     ###r+   c                     | j                   S )zReturn process group.)r   rX   s    r&   join_process_groupz*ZeroRedundancyOptimizer.join_process_groupy  s     !!!r+   r   c                    | j                          |d   j                         D ]  \  }}| j                  |   }| j                  |   | j                  k7  r	d|d   |<   :t        |d|j                        | j                  j                  |<   | j                  j                  |   j                         D ]Y  \  }}t        j                  |      s|j                         dk(  s0|j                         | j                  j                  |   |<   [  t        | 9  |       | j                  |d   | j                          | j                  | j                   | j                  j                          y)ad  
        Load the state pertaining to the given rank from the input ``state_dict``, updating the local optimizer as needed.

        Arguments:
            state_dict (dict): optimizer state; should be an object returned
                from a call to :meth:`state_dict`.

        Raises:
            RuntimeError: if ``overlap_with_ddp=True`` and this method is
                called before this :class:`ZeroRedundancyOptimizer` instance
                has been fully initialized, which happens once
                :class:`DistributedDataParallel` gradient buckets have been
                rebuilt.
        stateNTr   r   r   )r   r"   r   r   r   r   r   r   r!  r   	is_tensordimr,   rR   load_state_dictr   r   )rT   r   indexr   r'   
state_namestate_valuerU   s          r&   r$  z'ZeroRedundancyOptimizer.load_state_dict~  sG    	'')&w/557 	PLE5((/E""5)TYY6-1
7#E* +DU\\+

  ' 04zz/?/?/F/L/L/N P+J{38IQ8N>Ioo>O

((/
;P	P 	
+ 	
> :D<M<MN 1 14::3J3JKr+   c                    | j                          t        | j                        dk(  rt        d| j                   d      t
        |          }t        | j                        D ]  \  }}|d   }| j                         |   }t        |      t        |      k(  sJ d       t        ||      D ]f  \  }}|d   }|d   }	t        |      t        |	      k(  sJ d       t        ||	      D ]*  \  }
}|
|d   v s| j                  |   }|d   |
   |d   |<   , h  t        t        |d   j                                     |d<   |S )	a  
        Return the last global optimizer state known to this rank.

        .. warning:
            If the state has not been consolidated to this rank, this raises a
            runtime error, and even if it has, the state may not be up-to-date,
            depending on when :meth:`consolidate_state_dict` was last called.

        Raises:
            RuntimeError: if ``overlap_with_ddp=True`` and this method is
                called before this :class:`ZeroRedundancyOptimizer` instance
                has been fully initialized, which happens once
                :class:`DistributedDataParallel` gradient buckets have been
                rebuilt; or if this method is called without a preceding call
                to :meth:`consolidate_state_dict`.
        r   z_Optimizer state has not been consolidated on this rank. Please call `consolidate_state_dict(to=zA)` on all ranks beforehand if you meant to save the global state.r   z<Mismatch between number of local and global parameter groupsr   zIMismatch between number of local and global parameters in parameter groupr!  )r   r?   r   r   r   rR   r   r   r   zipr   dictr   r"   )rT   r   r   r   local_param_groupsglobal_param_groupslocal_param_groupglobal_param_grouplocal_param_indicesglobal_paramslocal_param_indexglobal_paramglobal_param_indexrU   s                r&   r   z"ZeroRedundancyOptimizer.state_dict  s   " 	'')t$$%*::>)) ENN  W')
 '00E0E&F 	-"D"!1.!A"&"<"<">t"D)*c#/  NMN  :="$7: -5!#5
 '8&A# 28 <./3!4  _^_  8;'8 -3%| ),<W,EE-1-A-A,-O*BR#C+C-
7+,>?--	-: #6*W*=*C*C*E#FG
7r+   src_param_groupsdst_param_groupsc                     t        |       t        |      k(  sJ d       t        | |      D ]-  \  }}t        d |j                               D ]
  }||   ||<    / y)a  
        Sync the attributes from the source parameter groups to the destination parameter groups.

        Example attributes include learning rate or scheduler attributes. The
        two parameter groups should have the same length (i.e. same number of
        parameter groups).

        Arguments:
            src_param_groups (list[dict]): parameter groups giving the
                attribute settings to copy.
            dst_param_groups (list[dict]): parameter groups giving the
                attribute settings to set.
        zBMismatch between number of source and destination parameter groupsc                     | dk7  S )Nr   ri   r  s    r&   r   z<ZeroRedundancyOptimizer._sync_param_groups.<locals>.<lambda>  s
    h r+   N)r?   r)  filterkeys)r4  r5  src_param_groupdst_param_groupattrs        r&   r   z*ZeroRedundancyOptimizer._sync_param_groups  s~    $ #$)
 
 	PO	P 
 144DFV0W 	>,O_68L8L8NO >(7(=%>	>r+   c                    | j                   r| j                  ryt        | j                        }t	        |      D cg c]  }g  c}| _        t        | j                  j                               D ]H  \  }\  }}|D ]9  }d}d}g }	|D ]k  }
t        |
      s.|
j                  j                         j                         |
_	        n$||
j                         z  }|	j                  |
       |
j                  }m |dk(  rt        j                   d|      }nt        j"                  |||      }d}|	D ]f  }
||
j                         z   }||| j%                  |
j                  j'                                ||| j)                  |
j                        |
_	        |}h | j
                  |   j                  |       < K yc c}w )ab  
        Build parameter buckets if ``parameters_as_bucket_view=True``.

        For each device that stores this rank's parameters, there is a
        bucket (represented as a tensor) containing all of the parameters on
        that device that are assigned to a given rank in the parameter update
        partition.

        This method is called in the constructor and any time parameter
        trainability is changed.

        .. warning::
            The current implementation assumes that all of the parameters in a
            bucket are of the same dense type when allocating the bucket's
            tensor.

        .. warning::
            If the model parameters are stored across more than one device,
            then the storage partitioning must be the same across all
            processes in order for parameter synchronization to work.
        Nr   rl   )r   r3   )r   r   r?   r   rw   r   r   r"   r*   rI   detachcloner   r   r4   r   zerosrB   copy_flattenview_as)rT   num_devicesr   dev_ir   rx   r   bucket_sizer4   trainable_paramsr'   r   rc   offset_nexts                 r&   r   z,ZeroRedundancyOptimizer._build_param_buckets  s   , --1G1G $99:%*;%78809++1131
 	4,E,FO * 4#% # (E(/ &+ZZ%6%6%8%>%>%@
#u{{}4(//6!KKE( !#"[[6:F #[[E&QFF!1 -&,u{{}&<vk2889K9K9MN%+F;%?%G%G

%S
!,	-
 e$++F354	4 9s   	F=c                 (   | j                   D ]  }|j                         D ]  }|j                  }d}d}|D ]3  }t        |      sJ d       ||j	                         z  }|j
                  }5 |dkD  sJ d       t        j                  |||j                        }d}|D ]f  }||j	                         z   }	|||	 j                  |j                  j                                |||	 j                  |j                        |_
        |	}h ||_          y)a/  
        Build the DDP bucket with parameters assigned to this rank.

        For each DDP bucket with parameters assigned to this rank, flattens the
        data of those parameters into a single tensor and saves the tensor to
        the ``tensor`` attribute in the corresponding
        :class:`_DDPBucketAssignment` instance stored in
        ``self._bucket_assignments_per_rank``.

        :class:`DistributedDataParallel` guarantees that the parameters
        corresponding to a gradient bucket have the same device and the same
        dtype.
        r   NzUModel parameter corresponding to a gradient in a DDP bucket should require a gradientr  r3   )r  r$   rb   r*   r   r4   r   rB   r   rA  rI   rB  rC  rg   )
rT   bucket_assignmentsbucket_assignmentr   rF  r4   r'   rg   rc   rH  s
             r&   _build_ddp_param_bucketsz0ZeroRedundancyOptimizer._build_ddp_param_buckets;  s2    #'"C"C 	2%7%>%>%@ 2!*55# (E(/ -/
  5;;=0K!KKE( #Q66 u5F5M5M # )E"(5;;="8K6+.44UZZ5G5G5IJ!'{!;!C!CEJJ!OEJ(F	)
 ,2!(12	2r+   r   c                 P   t        |t        j                        r!t        dt        j                  |             	 t        |      }t        |      dk(  rt        d      d}d}|D ]2  }|t        |t        j                        z  }|t        |t              z  }4 |s|st        d      |r	|| _	        |S |r;g | _	        |D ]/  }d|vrt        d	      | j                  j                  |d          1 |S # t        $ r'}t        dt        j                  |             |d}~ww xY w)
a  
        Verify the type of ``params`` and initializes ``self._all_params`` as a :class:`list` of all parameters.

        The initializagtion will first make sure that provided ``params`` is valid.

        Arguments:
            params (Any): Candidate parameter list or parameter groups to verify.

        Raises:
            TypeError: ``params`` has an invalid type.
            ValueError: ``params`` is empty.

        Returns:
            The persistent form of ``params`` to be passed into the parent
            :class:`Optimizer` constructor -- i.e. returns ``params`` as a
            :class:`list` to ensure that it can be iterated over again.
        z<`params` argument should be an iterable of Tensors, but got zE`params` argument should be an iterable of Tensors or dicts, but got Nr   z3ZeroRedundancyOptimizer got an empty parameter listTz;`params` argument should be an iterable of Tensors or dictsr   zkEach parameter group passed-in via `params` must have a 'params' key mapping to the parameters in the group)r   r   r   	TypeErrortypenamer   r?   rf   r*  r   r   )rT   r   
all_paramseall_tensors	all_dictsr'   r   s           r&   r   z/ZeroRedundancyOptimizer._verify_and_init_paramsd  sa   * fell+$$)NN6$:#;= 	fJ z?aRSS	 	1E:eU\\::KE400I	1 9M  )D  !D) ?;.$$ 
   ''H(=>? =  	&&+nnV&<%=? 	s   C5 5	D%>"D  D%c                    t        j                  | j                  d         }| j                  d   j                  rt	        d|       | j                  dd D ]-  }t        j                  |      }||k7  st	        d| d|        y)a  
        Verify that all parameters are of the same dense type.

        The method assumes that ``self._all_params`` has been initialized
        and is non-empty.

        Raises:
            ValueError: ``params`` contains sparse parameters or parameters
            of varying dense types.

        NOTE: This method can be removed once support for sparse parameters
        and varying parameter types is added.
        r   z[ZeroRedundancyOptimizer only supports using the same dense type for all parameters but got rl   Nz`ZeroRedundancyOptimizer only supports using the same dense type for all parameters but got both z and )r   rO  r   	is_sparserf   )rT   rO  r'   other_typenames       r&   r   z5ZeroRedundancyOptimizer._verify_same_dense_param_type  s     >>$"2"21"56A((B* 
 %%ab) 	E"^^E2N) //7j%&( 	r+   c                 H    t        t        t        | j                              S )z[Return a boolean mask indicating if each parameter is trainable (``requires_grad``) or not.)r   mapr*   r   rX   s    r&   r   z.ZeroRedundancyOptimizer._get_is_trainable_mask  s    Ct'7'7899r+   c                    | j                   J d       | j                         | j                     }| j                  rt	        |      dk(  sJ d       |d   d   }dt        j                  | j                         j                  v r' | j                   |fi | j                  ddi| _	        nBt        j                  d	| j                           | j                   |fi | j                  | _	        t        j                         t        j                  j                  k7  rt!        d
 |D              }t	        | j"                  | j$                           }t        j'                  d| j$                  ||       | j$                  dk(  rkt        j'                  dt	        | j(                  j*                        | j(                  j,                         n" | j                   |fi | j                  | _	        | j                  rYt/        | j                  d      sCt/        | j                  d      sJ d       | j                  j0                  g| j                  _        | j5                  | j                  j2                  | j2                         y)z
        Initialize this rank's local optimizer, responsible for its subset of the parameters.

        The local optimizer is saved in ``self.optim``.
        Nz*The local optimizer class has not been setrl   zNInitializing the local functional optimizer with more than one parameter groupr   r   _allow_empty_param_listTz|%s does not support the argument `_allow_empty_param_list`; ZeroRedundancyOptimizer may error due to an empty parameter listc              3   <   K   | ]  }|j                           y wr   r   r  s     r&   r   z@ZeroRedundancyOptimizer._init_local_optimizer.<locals>.<genexpr>  s     !<!'')!<r  z,rank %s with %s parameters across %s bucketsz(%s DDP buckets and %s bucket assignmentsr   r   zbThe functional optimizer should set at least one of the attributes `param_group` or `param_groups`)r   r   r   r   r?   inspect	signaturerb   r   r   r   r   r7   get_debug_level
DebugLevelOFFr  r  r   infor   rv   r{   hasattrr   r   r   )rT   r   r   local_numelnum_assigned_bucketss        r&   r   z-ZeroRedundancyOptimizer._init_local_optimizer  sI    ##/	87	8/ 113DII>!! |$) J) "!_X.F *$$T%<%<=HHI #:$"9"9#"22#LP#
 ; ++	 #:$"9"9&"YDDXDX"Y
 ##%)<)<<!!<V!<<'*55d6F6FG($ E$$(	 ##q(KKKD..@@A**AA %<D$;$;L$aDL`L`$aDJ
 !!'$**n*M4::}5 =5 (,zz'='=&>DJJ#

 7 79J9JKr+   c                    | j                   sJ d       t        j                  | j                  _        | j                          | j                  | j                  j                         | j                          | j                          y)z[Perform a delayed initialization of the local optimizer and the supporting data structures.zM`_init_zero_for_overlap()` should only be called when `overlap_with_ddp=True`N)
r   rk   rp   r   rt   r   r   rx   rL  r   rX   s    r&   _init_zero_for_overlapz.ZeroRedundancyOptimizer._init_zero_for_overlap  sq    %% 	
&	
% %3$>$>!""4#5#5#E#EF%%'""$r+   c                 Z    | j                   j                  rJ d       || j                  z  S )a  
        Return the single rank assigned to a :class:`DistributedDataParallel` gradient bucket.

        Arguments:
            bucket_index (int): index of the :class:`DistributedDataParallel`
                bucket for which to get the assigned rank.
        zThe bucket assignment requires global bucket information and will be computed later; there should be no need to use this method)r   ru   r   )rT   ra   s     r&   r  z*ZeroRedundancyOptimizer._get_assigned_rank  s6     %%33 	
	
3
 doo--r+   c                     | j                   r3| j                  j                  t        j                  k7  rt        d      yy)a  
        Check the delayed initialization depending on the value of ``overlap_with_ddp``.

        The delayed initialization has occurred (see
        :meth:`_init_zero_for_overlap`) if ``overlap_with_ddp=True``, and
        raises a ``RuntimeError`` if not. This should preface methods that
        should not be run before that delayed initialization.

        Raises:
            RuntimeError: if ``overlap_with_ddp=True`` and
                :meth:`_init_zero_for_overlap` has not been called.
        zgThis method should not be called until this ZeroRedundancyOptimizer instance has been fully initializedN)r   r   rt   rk   rp   r   rX   s    r&   r   z2ZeroRedundancyOptimizer._check_overlap_initialized&  sD     """"))^-G-GG  H #r+   c                     t        j                         }| j                  s||v rt        d| d      |S ||v r|S |t         v r"t         |   }t        j                  d||       |S t        d|       )a  
        Return the optimizer constructor using validation and transformation depending on ``overlap_with_ddp``.

        Returns:
            - ``optimizer_class`` if ``overlap_with_ddp=False`` and
                ``optimizer_class`` is not a functional optimizer.
            - ``optimizer_class`` if ``overlap_with_ddp=True`` and
                ``optimizer_class`` is already a functional optimizer.
            - The functional equivalent of ``optimizer_class`` if
                ``overlap_with_ddp=True`` and ``optimizer_class`` is not
                already a functional optimizer (assuming the equivalent
                exists).

        Raises:
            ValueError:

                - if ``overlap_with_ddp=True`` but ``optimizer_class`` is
                    neither a functional optimizer nor translatable to a
                    functional optimizer.
                - if ``overlap_with_ddp=False`` and ``optimizer_class`` is a
                    functional optimizer.
        z"Passing in a functional optimizer z when `overlap_with_ddp=False`zMUsing the functional optimizer %s instead of %s since `overlap_with_ddp=True`zUsing `ddp_with_overlap=True` requires using a functional optimizer, but there is no supported functional optimizer equivalent for )r   r$   r   rf   r   ra  )rT   r   functional_optimsoptim_constructors       r&   r   z2ZeroRedundancyOptimizer._get_optimizer_constructor=  s    . 1779%%"33 !88I J4 4 
 '&"33&& $88 %9$I!. &# )( 00?/@B r+   )NFFr   )r   r   )NN);rZ   r[   r\   rh   r
   r   r   r   boolrS   r   r   strr   rC   r   r   r   r   r   r   r   propertyr   r   r   r   r   r   r   r	   r   r   r`   r  r   r   r  rW   r  r  r  r$  r   staticmethodr   r   rL  r   r*  r   r   r   r   rf  r  r   r   r]   r^   s   @r&   r   r     s{   Wz (,*/!&M  iM   }	M 
 $(M  M  M ^8,,4S> ,,d ,,\O OT Ob d5<<01  
 DLS>L<@ellAS<TL	L. ?CF0!$tELL'9":;F0 
d4j	F0P )U\\3%6 7 ) ) *ellC&7!8 * * *ell!3 * *. .`( )5	ellDell!344	5)5 )5\ 26S	 %SX. 
	4&7&7 ELL)&7 	&7
 &7 $(C>&7 
&7P \7d4=Q8Q3R.S \7 \7@ =A15HD%,,!789H (2u9-.H 	H
 
%HX 26(2u9-.  
%	>#( $U\\ $ $ "C " "$L$sCx. $LT $LL>DcN >@ >tCH~.>tCH~.> 
> >2;4z'2R:: 
tELL!4:-	.:x>:T
 :FLP
%.s .s ..7# 7# 7r+   )2rh   r   r   enumr\  r9   logging	itertoolsr   typingr   r   r   r   r   r	   r
   r   r   torch.distributeddistributedr7   !torch.distributed.algorithms.joinr   r   r   torch.distributed.optim.utilsr   torch.optimr   __all__	getLoggerrZ   r   rl  r   r   r   r*   r/   r   rC   objectrM   rO   r`   IntEnumrk   rr   r   ri   r+   r&   <module>r}     s6   "     	   H H H    F F > ! %
% 
		8	$    LL  		 F $  JJ$$'5<<.	*	** * LL	*
 	*ZH &"3 "3JT\\ &X, X,v[i [r+   