
    sg^4                         d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZ d dlZd dlmZ g dZ G d d      Z G d d	e      Z G d
 de	      Z G d d      Zy)    N)ABCabstractmethod)TracebackType)AnyList
NamedTupleOptionalType)JoinHookJoinableJoinc                   (    e Zd ZdZddZdeddfdZy)r   a  
    This defines a join hook, which provides two entry points in the join context manager.

    Entry points : a main hook, which is called repeatedly while there exists a non-joined
    process, and a post-hook, which is called once all processes have joined.

    To implement a join hook for the generic join context manager, define a
    class that inherits from :class:`JoinHook` and override ``main_hook()`` and
    ``post_hook()`` as appropriate.
    returnNc                      y)zCall this hook while there exists a non-joined process to shadow collective communications in a training iteration.

        Training iteration i.e., in one forward pass, backward pass, and optimizer step.
        N selfs    T/var/www/html/venv/lib/python3.12/site-packages/torch/distributed/algorithms/join.py	main_hookzJoinHook.main_hook           is_last_joinerc                      y)aK  
        Call hook after all processes have joined.

        It is passed an additional ``bool`` argument ``is_last_joiner``, which indicates if the rank is one of the last to join.

        Arguments:
            is_last_joiner (bool): ``True`` if the rank is one of the last to
                join; ``False`` otherwise.
        Nr   )r   r   s     r   	post_hookzJoinHook.post_hook    r   r   r   N)__name__
__module____qualname____doc__r   boolr   r   r   r   r   r      s    		 	 	r   r   c                        e Zd ZdZed fd       Zedefd       Zeede	j                  fd              Zeedefd              Z xZS )r   a_  
    This defines an abstract base class for joinable classes.

    A joinable class
    (inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
    which returns a :class:`JoinHook` instance, in addition to
    :meth:`join_device` and :meth:`join_process_group` that return device and
    process group information, respectively.
    r   c                 T    t         |           t        j                         | _        y N)super__init___JoinConfigconstruct_disabled_join_config_join_config)r   	__class__s    r   r%   zJoinable.__init__7   s    'FFHr   c                      y)a  
        Return a :class:`JoinHook` instance for the given :class:`Joinable`.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.
        Nr   )r   kwargss     r   	join_hookzJoinable.join_hook<   s     	r   c                      y)zeReturn the device from which to perform collective communications needed by the join context manager.Nr   r   s    r   join_devicezJoinable.join_deviceI        	r   c                      y)zfReturns the process group for the collective communications needed by the join context manager itself.Nr   r   s    r   join_process_groupzJoinable.join_process_groupO   r/   r   r   )r   r   r   r   r   r%   r   r,   propertytorchdevicer.   r   r1   __classcell__)r)   s   @r   r   r   ,   s     I I 
X 
 
 U\\    C   r   r   c                   @    e Zd ZU dZeed<   eed<   eed<   ed        Zy)r&   zdThis includes all fields needed from a :class:`Joinable` instance for the join context manager side.enablethrow_on_early_terminationis_first_joinablec                      t        ddd      S )zReturn a :class:`_JoinConfig` instance indicating that join-related logic should be disabled.

        e.g. if the caller is not in a join context manager.
        Fr7   r8   r9   )r&   r   r   r   r'   z*_JoinConfig.construct_disabled_join_config]   s     Ue
 	
r   N)r   r   r   r   r    __annotations__staticmethodr'   r   r   r   r&   r&   V   s(    oL $$
 
r   r&   c                       e Zd ZdZ	 	 ddee   dedefdZddZddZ	d	 Z
d
eee      dee   dee   fdZd Zd Zedefd       Zy)r   a
  
    This class defines the generic join context manager, which allows custom hooks to be called after a process joins.

    These hooks should shadow the
    collective communications of non-joined processes to prevent hanging and
    erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
    for details about the hook definition.

    .. warning::
        The context manager requires each participating :class:`Joinable` to
        call the method :meth:`notify_join_context()` before its own per-
        iteration collective communications to ensure correctness.

    .. warning::
        The context manager requires that all ``process_group`` attributes in
        the :class:`JoinHook` objects are the same. If there are multiple
        :class:`JoinHook` objects, then the ``device`` of the first is used.
        The process group and device information is used for checking for non-
        joined processes and for notifying processes to throw an exception if
        ``throw_on_early_termination`` is enabled, both of which using an all-
        reduce.

    Arguments:
        joinables (List[Joinable]): a list of the participating
            :class:`Joinable` s; their hooks are iterated over in the given
            order.

        enable (bool): a flag enabling uneven input detection; setting to
            ``False`` disables the context manager's functionality and should
            only be set when the user knows the inputs will not be uneven
            (default: ``True``).

        throw_on_early_termination (bool): a flag controlling whether to throw an
            exception upon detecting uneven inputs (default: ``False``).

    Example::

        >>> import os
        >>> import torch
        >>> import torch.distributed as dist
        >>> import torch.multiprocessing as mp
        >>> # xdoctest: +SKIP
        >>> import torch.nn.parallel.DistributedDataParallel as DDP
        >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
        >>> from torch.distributed.algorithms.join import Join
        >>>
        >>> # On each spawned worker
        >>> def worker(rank):
        >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
        >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
        >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
        >>>     # Rank 1 gets one more input than rank 0
        >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
        >>>     with Join([model, optim]):
        >>>         for input in inputs:
        >>>             loss = model(input).sum()
        >>>             loss.backward()
        >>>             optim.step()
        >>>     # All ranks reach here without hanging/erroring
    	joinablesr7   r8   c                    t        |      dk(  rt        d      || _        | j                  D cg c]  } |j                  di | c}| _        || _        || _        | j                          | j                          y c c}w )Nr   z7The join context manager requires at least one joinabler   )	len
ValueError
_joinablesr,   _join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   r?   r7   r8   r+   joinables         r   r%   zJoin.__init__   s~     y>QVWW#9=
-5H((
 +E(""$!
s   A?Nc                     t        | j                        dkD  sJ d}| j                  D ]+  }t        | j                  | j                  |      |_        d}- y)zESet the :class:`_JoinConfig` of each participating :class:`Joinable`.r   Tr;   FN)rA   rC   r&   rE   rF   r(   )r   r9   rI   s      r   rG   zJoin._set_joinable_configs   sZ    4??#a'''  	&H$/||+/+K+K"3%H!
 !&	&r   c                    d}d}| j                   D ]:  }||j                  }n||j                  k7  rt        d      |/|j                  }< || _        t        j                  | j                        | _        || _        y)a  
        Extract the process group and device information from the joinables.

        If there are multiple joinables, then the context manager uses the
        first specified device.

        Preconditions:
            ``self._joinables`` is not ``None`` and is non-empty.

        Raises:
            ValueError
                If there are multiple conflicting ``process_group`` attributes
                among the ``Joinable`` objects.
        Nz7Using join context manager with multiple process groups)	rC   r1   rB   r.   _process_groupdistget_rank_rank_device)r   process_groupr4   rI   s       r   rH   zJoin._extract_dist_info   s      	.H$ ( ; ;("="== M  ~!--	. ,]]4#6#67
r   c                      y r#   r   r   s    r   	__enter__zJoin.__enter__   s    r   typevalue	tracebackc           	         | j                   r|ryd}d}d}d}t        j                  d       |s||kD  r)t        j                  d| d| j                   d	| d
       | j                         }|dk(  rd}nD| j                  r| j                          | j                  D ]  }	|	j                           d}|dz  }|s| j                  D ]  }	|	j                  |        y)z
        Repeatedly runs the main hooks until all processes join; then, runs the post-hooks.

        Raises:
            RuntimeError
                If ``throw_on_early_termination=True``.
        NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )rE   warningssimplefilterwarnrO   _get_num_nonjoined_procsrF   _notify_procs_to_terminaterD   r   r   )
r   rT   rU   rV   all_procs_joinedr   iWARN_THRESHOLDnum_nonjoined_procsr,   s
             r   __exit__zJoin.__exit__   s    ||t f%">!A%&&=zzl.0@ A33 #'"?"?"A"a'#' 33335 "&!1!1 *I'')* "'Q/ #4 )) 	0I/	0r   c                     t        j                  d| j                        }t        j                  || j
                         |j                         S )zaReturn the number of non-joined processes by shadowing an all-reduce in the non-joined processes.rY   r4   group)r3   zerosrP   rM   
all_reducerL   item)r   rb   s     r   r]   zJoin._get_num_nonjoined_procs  s;    #kk!DLLA+43F3FG"''))r   c                     t        j                  d| j                        }t        j                  || j
                         t        d| j                   d      )zSchedule an all-reduce to notify non-joined processes to terminate.

        Also raise a ``RuntimeError`` indicating that the current process has exhausted its inputs.
        rY   re   rf   zRank z exhausted all inputs.)r3   onesrP   rM   ri   rL   RuntimeErrorrO   )r   rl   s     r   r^   zJoin._notify_procs_to_terminate  sE    
 zz!DLL1D$7$78U4::,.DEFFr   rI   c                    t        | d      sJ dt        |        d       | j                  }|j                  r|j                  sy| j
                  }| j                  }t        j                  d|      }t        j                  ||d      }|j                  rKt        j                  d|      }t        j                  ||	       |j                         }|rt        d
      |S )aH  
        Notifies the join context manager that the calling process has not yet joined.

        Then, if ``throw_on_early_termination=True``, checks if uneven inputs have been detected
        (i.e. if one process has already joined) and throws an exception if so.

        This method should be called from a :class:`Joinable` object before
        its per-iteration collective communications. For example, this should
        be called at the beginning of the forward pass in
        :class:`DistributedDataParallel`.

        Only the first :class:`Joinable` object passed into the context
        manager performs the collective communications in this method, and
        for the others, this method is vacuous.

        Arguments:
            joinable (Joinable): the :class:`Joinable` object calling this
                method.

        Returns:
            An async work handle for the all-reduce meant to notify the context
            manager that the process has not yet joined if ``joinable`` is the
            first one passed into the context manager; ``None`` otherwise.
        r(   zCheck that the z/ constructor calls the ``Joinable`` constructorNrY   re   T)rg   async_oprf   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrT   r(   r9   r7   r.   r1   r3   rl   rM   ri   r8   rh   rj   rm   )rI   join_configr4   rQ   rl   workrh   should_throws           r   notify_join_contextzJoin.notify_join_context(  s    4 x0 	
d8n- .' '	
0
 ++,,K4F4F%% 33 zz!F+t=4H11KK&1EOOE7 ::<L"1  r   )TFr   )r   r   r   r   r   r   r    r%   rG   rH   rS   r	   r
   BaseExceptionr   rc   r]   r^   r=   rt   r   r   r   r   r   h   s    ;@ +0	">" " %)	"$
&<20tM*+20 &20 M*	20h*G 4h 4 4r   r   )rZ   abcr   r   typesr   typingr   r   r   r	   r
   r3   torch.distributeddistributedrM   __all__r   r   r&   r   r   r   r   <module>r|      sP     #  8 8    + <'s 'T
* 
$u ur   