
    sg                        d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZ d dlmZmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmc mc mZ d dlmc mc mc m Z! d dl"m#Z#m$Z$m%Z% d dl&m'Z'm(Z( d d	l)m*Z*m+Z+ d d
lm,Z, d dl-m.Z. g dZ/dZ0dZ1 e.e2      Z3e G d d             Z4 G d d      Z5 G d de6e      Z7 G d d      Z8 G d d      Z9e G d d             Z:de6fdZ; G d de jx                        Z= G d d e=      Z>y)!    N)defaultdict)contextmanager)	dataclassfield)Enum)AnyCallableDictListOptionalTupleUnion)EventEventSourcerecord)prof
put_metric)ProcessFailureSignalException)RendezvousGracefulExitError)
get_logger)
WorkerSpecWorkerWorkerStateWorkerGroup	RunResultElasticAgentSimpleElasticAgentz!torchelastic/agent/terminal_statedefaultc                       e Zd ZU dZeed<   eed<   ej                  ed<   dZ	e
e   ed<   dZeeedf   ed<   dZeed	<   d
Zeed<   dZeed<   dZe
e   ed<   dZe
e   ed<   dZe
e   ed<   d Zd Zy)r   ai  Blueprint information about a particular type of worker.

    For a given role, there must only exist a single worker spec.
    Worker spec is expected to be homogeneous across all nodes (machine),
    that is each node runs the same number of workers for a particular spec.

    Args:
        role: user-defined role for the workers with this spec
        local_world_size: number local workers to run
        fn: (deprecated use entrypoint instead)
        entrypoint: worker function or command
        args: arguments to pass to ``entrypoint``
        rdzv_handler: handles rdzv for this set of workers
        max_restarts: number of max retries for the workers
        monitor_interval: monitor status of workers every ``n`` seconds
        master_port: fixed port to run the c10d store on rank 0
                     if not specified then will chose a random free port
        master_addr: fixed master_addr to run the c10d store on rank 0
                     if not specified then will chose hostname on agent rank 0
        redirects: redirect std streams to a file,
                   selectively redirect for a particular
                   local rank by passing a map
        tee: tees the specified std stream(s) to console + file,
             selectively tee for a particular local rank by passing a map,
             takes precedence over ``redirects`` settings.

    rolelocal_world_sizerdzv_handlerNfn
entrypoint args   max_restartsg?monitor_intervalmaster_portmaster_addr
local_addrc                     | j                   dkD  sJ | j                  dkD  sJ | j                  r,t        j                  dt
               | j                  | _        | j                  sJ y )Nr   zJWorkerSpec.fn will be deprecated, please use WorkerSpec.entrypoint instead)category)r"   r*   r$   warningswarnDeprecationWarningr%   selfs    ]/var/www/html/venv/lib/python3.12/site-packages/torch/distributed/elastic/agent/server/api.py__post_init__zWorkerSpec.__post_init__Z   s^    $$q((($$q(((77MM<+
 #ggDO    c                     t        | j                  t              r)t        j                  j                  | j                        S | j                  J | j                  j                  S )zGet the entry point name.

        If the entrypoint is a function (e.g. ``Callable``) returns its ``__qualname__``
        else if the entrypoint is a binary (e.g. ``str``), returns the binary name.
        )
isinstancer%   strospathbasename__qualname__r3   s    r5   get_entrypoint_namezWorkerSpec.get_entrypoint_nameg   sK     doos+77##DOO44??...??///r7   )__name__
__module__r>   __doc__r:   __annotations__intrdzvRendezvousHandlerr$   r   r	   r%   r   r'   r   r)   r*   floatr+   r,   r-   r6   r?   r&   r7   r5   r   r   /   s    8 I(((!B!-1JhT)*1D%L#!e!!%K#%!%K#% $J$
0r7   r   c                   J    e Zd ZdZg dZ	 	 	 	 ddededededef
dZd	 Zd
 Zy)r   ah  A worker instance.

    Contrast this with ``WorkerSpec`` that represents the specifications of a
    worker. A ``Worker`` is created from a ``WorkerSpec``. A ``Worker`` is to
    a ``WorkerSpec`` as an object is to a class.

    The ``id`` of the worker is interpreted
    by the specific implementation of ``ElasticAgent``. For a local
    agent, it could be the ``pid (int)`` of the worker, for a remote
    agent it could be encoded as ``host:port (string)``.

    Args:
        id (Any): uniquely identifies a worker (interpreted by the agent)
        local_rank (int): local rank of the worker
        global_rank (int): global rank of the worker
        role_rank (int): rank of the worker across all workers that have the same role
        world_size (int): number of workers (globally)
        role_world_size (int): number of workers that have the same role
    id
local_rankglobal_rank	role_rank
world_sizerole_world_sizerK   rL   rM   rN   rO   c                 X    d | _         || _        || _        || _        || _        || _        y NrI   )r4   rK   rL   rM   rN   rO   s         r5   __init__zWorker.__init__   s:       *
 !,
 (  * %4r7   c           
          d| j                    d| j                   d| j                   d| j                   d| j                   
S )Nzlocal_rank=z,global_rank=z,role_rank=z,world_size=z,role_world_size=rK   rL   rM   rN   rO   r3   s    r5   __str__zWorker.__str__   sJ    $//*-8H8H7I$..)doo5F 4 457	
r7   c                     t        |       S rQ   )r:   r3   s    r5   __repr__zWorker.__repr__   s    4yr7   N)rX   rX   rX   )	r@   rA   r>   rB   	__slots__rD   rR   rU   rW   r&   r7   r5   r   r   t   s\    (I !44 4 	4
 4 4B
r7   r   c                   F    e Zd ZdZdZdZdZdZdZdZ	dZ
ed	d d
efd       Zy)r   a  A state of the ``WorkerGroup``.

    Workers in a worker group change state as a unit. If a single worker
    in a worker group fails the entire set is considered failed::

      UNKNOWN - agent lost track of worker group state, unrecoverable
      INIT - worker group object created not yet started
      HEALTHY - workers running and healthy
      UNHEALTHY - workers running and unhealthy
      STOPPED - workers stopped (interrupted) by the agent
      SUCCEEDED - workers finished running (exit 0)
      FAILED - workers failed to successfully finish (exit !0)


    A worker group starts from an initial ``INIT`` state,
    then progresses to ``HEALTHY`` or ``UNHEALTHY`` states,
    and finally reaches a terminal ``SUCCEEDED`` or ``FAILED`` state.

    Worker groups can be interrupted and temporarily put into ``STOPPED`` state
    by the agent. Workers in ``STOPPED`` state are scheduled to be restarted
    in the near future by the agent. Some examples of workers being put into
    ``STOPPED`` state are:

    1. Worker group failure|unhealthy observed
    2. Membership change detected

    When actions (start, stop, rdzv, retry, etc) on worker group fails
    and results in the action being partially applied to the worker group
    the state will be ``UNKNOWN``. Typically this happens on uncaught/unhandled
    exceptions during state change events on the agent. The agent is not
    expected to recover worker groups in ``UNKNOWN`` state and is better off
    self terminating and allowing the job manager to retry the node.
    UNKNOWNINITHEALTHY	UNHEALTHYSTOPPED	SUCCEEDEDFAILEDstatereturnc                 F    | t         j                  t         j                  hv S )zReturn the state of the Worker.

        Returns:
             True if the worker state represents workers still running
             (e.g. that the process exists but not necessarily healthy).
        )r   r]   r^   )rb   s    r5   
is_runningzWorkerState.is_running   s      ,,k.C.CDDDr7   N)r@   rA   r>   rB   r[   r\   r]   r^   r_   r`   ra   staticmethodboolre   r&   r7   r5   r   r      sQ     D GDGIGIFE- ED E Er7   r   c                   $    e Zd ZdZg dZdefdZy)r   a  A set of ``Worker`` instances.

    The class defines a set of ``Worker`` instances for the given ``WorkerSpec`` managed by ``ElasticAgent``. Whether the worker
    group contains cross instance workers or not depends on the implementation of the agent.
    )specworkersstore
group_rankgroup_world_sizerb   r,   r+   ri   c                    || _         t        | j                   j                        D cg c]  }t        |       c}| _        d | _        d | _        d | _        d | _        d | _	        t        j                  | _        y c c}w )N)rK   )ri   ranger"   r   rj   rk   rl   rm   r,   r+   r   r\   rb   )r4   ri   is      r5   rR   zWorkerGroup.__init__  sj    	6;DII<V<V6WX!,X 
 $ %%
 Ys   A=N)r@   rA   r>   rB   rY   r   rR   r&   r7   r5   r   r      s    	I&Z &r7   r   c            	           e Zd ZdZg dZdededefdZdefdZ	e
d	efd
       Ze
defd       Ze
dededeeef   fd       Zy)_RoleInstanceInfoa  The class is used by the agent to exchange the information with other agents.

    The information is used to determine the rank of the workers that agent
    manages in heterogeneous environments, where different agents can have
    different number of workers.
    r!   rankr"   r!   rt   r"   c                 .    || _         || _        || _        y)zInitialize the agent class instance.

        Args:
            role (str): user-defined role for the workers with this spec
            rank (int): the rank of the agent
            local_world_size (int): number of local workers to run
        Nrs   )r4   r!   rt   r"   s       r5   rR   z_RoleInstanceInfo.__init__  s     		 0r7   rc   c                     | j                   | j                  | j                  d}t        j                  |      j                  d      S )Nrs   UTF-8encoding)r!   rt   r"   jsondumpsencode)r4   	dict_datas     r5   	serializez_RoleInstanceInfo.serialize*  s?    IIII $ 5 5
	
 zz)$++W+==r7   datac                 x    t        j                  | j                  d            }t        |d   |d   |d         S )Nrw   rx   r!   rt   r"   )rz   loadsdecoderr   )r   r}   s     r5   deserializez_RoleInstanceInfo.deserialize2  s?    JJt{{G{<=	 fy0)<N2O
 	
r7   c                     | j                   |j                   k(  r| j                  |j                  z
  S | j                   |j                   kD  ryy)N   rX   )r!   rt   )obj1obj2s     r5   comparez_RoleInstanceInfo.compare9  s;    99		!99tyy((YY"r7   roles_infosc                 l    d\  }}t        |       D ]  \  }}|j                  |k(  s|dk(  r|}|}  ||fS )N)rX   rX   rX   )	enumerater!   )r   r!   	start_idxend_idxidx	role_infos         r5   find_role_boundariesz&_RoleInstanceInfo.find_role_boundariesB  sN    #	7'4 	NC~~%? #I		
 7##r7   N)r@   rA   r>   rB   rY   r:   rD   rR   bytesr~   rf   r   r   r   r   r   r&   r7   r5   rr   rr     s     5I
1S 
1 
1s 
1>5 > 
% 
 
 s   $$ $c $eCHo $ $r7   rr   c                   t    e Zd ZU dZeed<    ee      Ze	e
ef   ed<    ee      Ze	e
ef   ed<   defdZy)	r   a  Return results of the worker executions.

    Run results follow an "all-or-nothing" policy where the run is successful if and
    only if ALL local workers managed by this agent complete successfully.

    If the result is successful (e.g. ``is_failed() = False``) then the ``return_values``
    field contains the outputs (return values) of the workers managed by THIS agent mapped
    by their GLOBAL ranks. That is ``result.return_values[0]`` is the return value of
    global rank 0.

    .. note:: ``return_values`` are only meaningful for when the worker entrypoint
              is a function. Workers specified as a binary entrypoint do not canonically
              have a return value and the ``return_values`` field is meaningless and
              may be empty.

    If ``is_failed()`` returns ``True`` then the ``failures`` field contains the
    failure information, again, mapped by the GLOBAL rank of the worker that failed.

    The keys in ``return_values`` and ``failures`` are mutually exclusive, that is,
    a worker's final state can only be one of: succeeded, failed. Workers intentionally
    terminated by the agent according to the agent's restart policy, are not represented
    in either ``return_values`` nor ``failures``.
    rb   )default_factoryreturn_valuesfailuresrc   c                 <    | j                   t        j                  k(  S rQ   )rb   r   ra   r3   s    r5   	is_failedzRunResult.is_failedk  s    zz[////r7   N)r@   rA   r>   rB   r   rC   r   dictr   r
   rD   r   r   r   rg   r   r&   r7   r5   r   r   M  sK    0 $)$$?M4S>?*/*EHd3&'E04 0r7   r   rc   c                  P    t        j                  t        j                               S rQ   )socketgetfqdngethostnamer&   r7   r5   _get_fq_hostnamer   o  s    >>&,,.//r7   c                   t    e Zd ZdZej
                  efdedefd       Z	ej
                  efdede
fd       Zy)r   a0  An agent process responsible for managing one or more worker processes.

    The worker processes are assumed to be regular distributed PyTorch scripts.
    When the worker process is created by the agent, the agent provides the
    necessary information for the worker processes to properly initialize
    a torch process group.

    The exact deployment topology and ratio of agent-to-worker is dependent
    on the specific implementation of the agent and the user's job placement
    preferences. For instance, to run a distributed training job on GPU with
    8 trainers (one per GPU) one can:

    1. Use 8 x single GPU instances, place an agent per instance, managing
       1 worker per agent.
    2. Use 4 x double GPU instances, place an agent per instance, managing
       2 workers per agent.
    3. Use 2 x quad GPU instances, place an agent per instance, managing
       4 workers per agent.
    4. Use 1 x 8 GPU instance, place an agent per instance, managing
       8 workers per agent.

    Usage
    ::

     group_result = agent.run()
      if group_result.is_failed():
        # workers failed
        failure = group_result.failures[0]
        logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
      else:
        return group_result.return_values[0] # return rank 0's results

    r!   rc   c                     t         )al  Run the agent.

        Supports retrying the worker group on failures up to ``max_restarts``.

        Returns:
            The result of the execution, containing the return values or
            failure details for each worker mapped by the worker's global rank.

        Raises:
            Exception - any other failures NOT related to worker process
        NotImplementedErrorr4   r!   s     r5   runzElasticAgent.run  s
     "!r7   c                     t         )a'  Return the ``WorkerGroup`` for the given ``role``.

        Note that the worker group is a mutable object and hence in a
        multi-threaded/process environment it may change state.
        Implementors are encouraged (but not required) to return
        a defensive read-only copy.
        r   r   s     r5   get_worker_groupzElasticAgent.get_worker_group  
     "!r7   N)r@   rA   r>   rB   abcabstractmethodDEFAULT_ROLEr:   r   r   r   r   r&   r7   r5   r   r   s  s^     D 	* " "y " " 	+7 "S "K " "r7   r   c                      e Zd ZdZd+dedefdZefdede	fdZ
ej                  de	deeef   fd	       Zej                  	 d,de	deddfd       Zej                  de	defd       Zej                  ej,                  d
fdej.                  deddfd       Zede	ddfd       Zededededee   fd       Zede	ddfd       Zede	ddfd       Zeefdedefd       Z de!fdZ"de!fdZ#deddfdZ$dededefdZ%e&defd       Z'	 	 	 d-ded e(de)e   d!e)e   d"e)e   de!fd#Z*d$efd%Z+d& Z,d,d'efd(Z-efdedefd)Z.d* Z/y).r   zAn ``ElasticAgent`` that manages one particular type of worker role.

    An ``ElasticAgent`` that manages workers (``WorkerGroup``) for a single ``WorkerSpec``
    such as one particular type of worker role.
    ri   exit_barrier_timeoutc                     t        |      | _        | j                  j                  j                  | _        d | _        || _        d| _        y Nr   )r   _worker_groupri   r)   _remaining_restarts_store_exit_barrier_timeout_total_execution_time)r4   ri   r   s      r5   rR   zSimpleElasticAgent.__init__  s@    (.#'#5#5#:#:#G#G %9"%&"r7   r!   rc   c                     | j                   S rQ   )r   r   s     r5   r   z#SimpleElasticAgent.get_worker_group  s    !!!r7   worker_groupc                     t         )zStart ``worker_group.spec.local_world_size`` number of workers.

        This is according to worker spec for the worker group .
        Returns a map of ``local_rank`` to worker ``id``.
        r   r4   r   s     r5   _start_workersz!SimpleElasticAgent._start_workers  s
     "!r7   F
is_restartNc                     t         )zStop all workers in the given worker group.

        Implementors must deal with workers in all states defined by
        ``WorkerState``. That is, it must gracefully handle stopping
        non-existent workers, unhealthy (stuck) workers, etc.
        r   )r4   r   r   s      r5   _stop_workersz SimpleElasticAgent._stop_workers  s
     "!r7   c                     t         )z~Check on the workers for the ``worker_group``.

        This function also returns the new state of the worker group.
        r   r   s     r5   _monitor_workersz#SimpleElasticAgent._monitor_workers  s
     "!r7   	death_sigc                     t         )zClean up any resources that were allocated during the agent's work.

        Args:
            death_sig: Signal to send to the child process, SIGTERM is default
        r   )r4   r   r   s      r5   	_shutdownzSimpleElasticAgent._shutdown  r   r7   c                    |j                   }| j                  d      5  |j                  j                         }ddd       j                  }|j
                  }|j                  }|j                  xs |j                  j                  }|j                  xs |j                  j                  }|| _
        | j                  d      5  | j                  ||||      }	ddd       	|_        ||_        ||_        ||_        ||_        ||_	        |j                  | j                   z
  }
t"        j%                  d|j&                  |
|||||	D cg c]  }|j(                   c}|	D cg c]  }|j*                   c}|	D cg c]  }|j,                   c}|	D cg c]  }|j.                   c}|	D cg c]  }|j                   c}d       y# 1 sw Y   xY w# 1 sw Y   xY wc c}w c c}w c c}w c c}w c c}w )zRun rendezvous for the workers specified by the worker spec.

        Assigns workers a new global rank and world size.
        Updates the rendezvous store for the worker group.
        
RENDEZVOUSNASSIGN_WORKER_RANKSa  [%(role)s] Rendezvous complete for workers. Result:
  restart_count=%(restart_count)s
  master_addr=%(master_addr)s
  master_port=%(master_port)s
  group_rank=%(group_rank)s
  group_world_size=%(group_world_size)s
  local_ranks=%(local_ranks)s
  role_ranks=%(role_ranks)s
  global_ranks=%(global_ranks)s
  role_world_sizes=%(role_world_sizes)s
  global_world_sizes=%(global_world_sizes)s
)r!   restart_countr,   r+   rl   rm   local_ranks
role_ranksglobal_ranksrole_world_sizesglobal_world_sizes)ri   record_durationr#   next_rendezvousrk   rt   rN   r,   bootstrap_store_infor+   r   _assign_worker_ranksrj   rl   rm   r)   r   loggerinfor!   rK   rM   rL   rO   )r4   r   ri   	rdzv_infork   rl   rm   r,   r+   rj   r   workers               r5   _rendezvouszSimpleElasticAgent._rendezvous  s      !!,/ 	<))99;I	<^^
$// &&T)*H*H*T*T&&T)*H*H*T*T!!"78 	//z#3TG	  '"",(8%#. #. ))D,D,DD
< 		!.**($4@GHf 1 1H>EFFv//FBI J!3!3 JJQ$RV%;%;$RGN&OVv'8'8&O	
5	< 	<	 	@  IF J$R&Os5   G
G	G#"G(;G-G2-G7
GG rl   rm   c           
      8   d}d}t        |j                  ||j                        }|j                  | | |j	                                |dk(  rP|j                  t        |      D cg c]  }d| 	 c}      }	|	D 
cg c]  }
t         j                  |
       }}
t        d       }d}|D ]2  }||j                  xx   |j                  z  cc<   ||j                  z  }4 d}t        d       }g }g }t        |      D ]  \  }}|j                  | |        |j                  t        j                  ||||j                     ||j                     g             ||j                  z  }||j                  xx   |j                  z  cc<    |j                  ||       t        j                  |j                  | |             \  }}}}g }t        |j                        D ])  }t!        |||z   ||z   ||      }|j                  |       + |S c c}w c c}
w )ao  Determine proper ranks for worker processes.

        The rank assignment is done according to the following algorithm:

        1. Each agent writes its configuration(group_rank, group_world_size
           , num_workers) to the common store.
        2. The rank 0 agent reads all the role_info from the store and
           determines each agents worker ranks.
        3. Determine the global rank: the global rank of the workers is computed
           by cumulative sum of the local_world_size for all workers in front of it.
           For efficiency reasons each worker is assigned a base global rank
           such that it's workers are in the range [base_global_rank,
           base_global_rank + local_world_size).
        4. Determine the role rank: The role rank is determined using the algorithms
           in the point 3 with the exception that the ranks are calculated with
           respect to the role name.
        5. The rank 0 agent writes the assigned ranks to the store.
        6. Each agent reads the assigned ranks from the store.

        Time complexity: each worker O(1), rank0 O(n), overall O(n)
        ztorchelastic/role_info/ztorchelastic/assigned_ranks/r   c                       yr   r&   r&   r7   r5   <lambda>z9SimpleElasticAgent._assign_worker_ranks.<locals>.<lambda>V      r7   c                       yr   r&   r&   r7   r5   r   z9SimpleElasticAgent._assign_worker_ranks.<locals>.<lambda>]  r   r7   rT   )rr   r!   r"   setr~   	multi_getro   r   r   r   appendrz   r{   	multi_setr   getr   )r4   rk   rl   rm   ri   ROLE_INFO_PREFIXASSIGNED_RANKS_PREFIXagent_role_inforp   role_infos_bytes
info_bytes
role_infos
role_sizesglobal_sizer   base_global_rankr   keysvaluesglobal_world_sizebase_role_rankrO   rj   rK   r   s                            r5   r   z'SimpleElasticAgent._assign_worker_ranks*  sT   4 5 >+IIz4#8#8
 			%&zl3_5N5N5PQ ?$8=>N8OP1*1#.P 
 #3 "--j9J 
 %Y/JK' :	9>>*i.H.HH*y999:  !$Y/JDF )* 5 I945aS9:JJ,'&y~~6&y~~6		 !I$>$>> 9>>*i.H.HH*I  OOD&) JJuyy$9#::,!GHI	
  5 56 	#J%,z9(:5, /F NN6"	# m Qs   (H?Hc                 R   |j                   j                  }t        j                  d|       | j	                  |       t        j                  d|       | j                  |      }|j                         D ]  \  }}|j                  |   }||_         t        j                  |_        y)a  Start a fresh set of workers for the worker_group.

        Essentially, a rendezvous followed by a ``start_workers``.
        The caller should first call ``_stop_workers()`` to stop running workers
        prior to calling this method.

        Optimistically sets the state of the worker group that
        just started as ``HEALTHY`` and delegates the actual monitoring
        of state to ``_monitor_workers()`` method
        z [%s] Rendezvous'ing worker groupz[%s] Starting worker groupN)ri   r!   r   r   r   r   itemsrj   rJ   r   r]   rb   )r4   r   r!   
worker_idsrK   w_idr   s          r5   _initialize_workersz&SimpleElasticAgent._initialize_workers  s       %%6= 	&0$7((6
 * 0 0 2 	J!))*5FFI	 )00r7   c                     |j                   j                  }t        j                  d|       | j	                  |d       t
        j                  |_        | j                  |       y)zCRestart (stops, rendezvous, starts) all local workers in the group.z[%s] Stopping worker groupT)r   N)	ri   r!   r   r   r   r   r_   rb   r   )r4   r   r!   s      r5   _restart_workersz#SimpleElasticAgent._restart_workers  sS       %%0$7<D9(00  .r7   c                 (   t        j                         }d}	 | j                  |      }t        t        j                         |z
        | _        | j                  |       | j                  |       ||s| j                          t        t        j                         |z
        | _        S # t        $ r }t        j                  d|       Y d }~nRd }~wt        $ rC}t        j                  d|j                         | j                  |j                         d} d }~ww xY w	 |s| j                          t        t        j                         |z
        | _        y # |s| j                          t        t        j                         |z
        | _        w xY w)NFz Rendezvous gracefully exited: %sz/Received %s death signal, shutting down workersT)time	monotonic_invoke_runrD   r   _record_metrics_record_worker_eventsr   r   r   r   r   warningsigval)r4   r!   
start_timeshutdown_calledresultes         r5   r   zSimpleElasticAgent.run  s8   ^^%
 %	L%%d+F),T^^-=
-J)KD&  (&&v. # ),T^^-=
-J)KD& + 	?KK:A>> 	NNLahhWNN188$"O		 ? # ),T^^-=
-J)KD& # ),T^^-=
-J)KD&s6   AB) )	D2CE D>DDE 9Fc                 j    | j                  dt        j                  t        j                               S )Nra   )rb   source	raw_error)_construct_eventr   AGENT	traceback
format_excr3   s    r5   get_event_failedz#SimpleElasticAgent.get_event_failed  s2    $$$$**, % 
 	
r7   c                 D    | j                  dt        j                        S )Nr`   )rb   r   )r   r   r   r3   s    r5   get_event_succeededz&SimpleElasticAgent.get_event_succeeded  s&    $$$$ % 
 	
r7   r   c           	      D   | j                   j                  D ]  }|j                  j                  |j                        }| j                  ||      }|rt        j                  |j                        nd }t        | j                  |t        j                  ||              y rQ   )r   rj   r   r   rL   _get_worker_staterz   r{   error_file_datar   r   r   WORKER)r4   r   r   failurerb   r   s         r5   r   z(SimpleElasticAgent._record_worker_events  s    ((00 	XFoo))&*<*<=G//?E?F

7#:#:;DI4((0B0BFIVW		Xr7   r   c                 :   |j                   j                  |j                        }|j                  t        j
                  t        j                  hv r|sy|s|j                  |j                  v r|j                  j                  S t        d|j                         )N
TERMINATEDzUnknown worker: )
r   r   rL   rb   r   r^   ra   r   value
ValueError)r4   r   r   r
  s       r5   r  z$SimpleElasticAgent._get_worker_state  s~    //%%f&8&89<<K11;3E3EFFw**f.B.BB<<%%%/0B0B/CDEEr7   rb   c           	   #   d  K   t        j                         }	 d  t        j                         }||z
  dz  }t        | j                  |t        j
                  |             y # t        j                         }||z
  dz  }t        | j                  |t        j
                  |             w xY ww)Ni  )rb   r   duration_ms)r   perf_counterr   r   r   r   )r4   rb   r   end_timer  s        r5   r   z"SimpleElasticAgent.record_duration  s     &&(
		((*H#j0D8K%%(9(9{ &  ((*H#j0D8K%%(9(9{ & s   B0A$ AB0$A	B--B0r   r   r  c                 b   | j                   }|j                  }|j                  |j                         d}|rR|j                  f|d<   |j
                  f|d<   |j                  f|d<   |j                  }	t        |j                        }
nd }	d }
t        j                  |      }|j                  j                         |	|j                  |
|j                  t!               || j"                  |j                  j%                         |||j&                  | j(                  z
  |d}t+        d| ||      S )N)rm   entry_pointrK   rM   rO   )run_idrL   rl   	worker_idr!   hostnamerb   total_run_timerdzv_backendr   metadataagent_restartsr  ztorchelastic.worker.status.)r   r  )r   ri   rm   r?   rK   rM   rO   rL   r:   rJ   rz   r{   r#   
get_run_idrl   r!   r   r   get_backendr)   r   r   )r4   rb   r   r   r   r  wgri   mdrL   r  md_strr  s                r5   r   z#SimpleElasticAgent._construct_event  s,    ww " 3 3335
  & 1 13B|%//1B{O%+%;%;$=B ! ,,KFIIIKIB''224&--"II(*"88 --99;""//$2J2JJ&
 )%1&8
 	
r7   group_resultsc                    |j                         }| j                  |       | j                  j                  }| j                  |j
                  k7  }t        d|j                   dd       | j                  d| xr |       | j                  d| xr |        | j                  d|xr |       | j                  d|xr |        y )Nworkers.z
.run_totalr   run_success_with_retriesrun_success_no_retriesrun_failed_with_retriesrun_failed_no_retries)	r   _record_flakiness_metricr   ri   r   r)   r   r!   _record_metric_with_condition)r4   r!  r   ri   restarts_happeneds        r5   r   z"SimpleElasticAgent._record_metrics!  s    !++-	%%i0!!&& 448I8IIXdii[
3Q7**&I(K:K	
 	**$)m&M<M8M	
 	**%y'F5F	
 	**#Y%H7H3H	
r7   c                     | j                   j                  }|rt        d|j                   d| d       y t        d|j                   d| d       y )Nr#  .r   r   )r   ri   r   r!   )r4   metric_name	conditionri   s       r5   r)  z0SimpleElasticAgent._record_metric_with_condition4  sL    !!&&$))Ak];Q?$))Ak];Q?r7   r   c                     |rd}n;| j                   j                  }dd| j                  dz   z  |j                  dz   z  z
  }| j                   j                  }t	        d|j
                   dt        |             y )Ng      Y@r   r#  z
.flakiness)r   ri   r   r)   r   r!   rD   )r4   r   	flakinessri   s       r5   r(  z+SimpleElasticAgent._record_flakiness_metric;  sx    I%%**D)A)AA)E F!!A%! I !!&&Xdii[
3S^Dr7   c                    | j                   j                  }|j                  }t        j	                  d||j                                | j                  | j                          |j                  }|j                  }	 | j                   j                  t        j                  k7  sJ t        j                  |       | j                  | j                         }|j                  }|| j                   _	        t        d| d| j                          t        d| d|j"                  j%                          d       |t        j&                  k(  r3t        j	                  d|| j(                         | j+                          |S |t        j,                  t        j.                  hv r| j                   dkD  rht        j	                  d||j"                  | j                   |j0                         | xj                   dz  c_        | j3                  | j                          n| j5                  | j                          t        j.                  | j                   _	        |S |t        j6                  k(  r_|j9                         }| j                   j:                  }|dkD  rPt        j	                  d	|||       | j3                  | j                          nt=        d
| d|j"                   d      e)Nz([%s] starting workers for entrypoint: %sr#  z.remaining_restartsr,  r   zW[%s] worker group successfully finished. Waiting %s seconds for other agents to finish.r   zD[%s] Worker group %s. %s/%s attempts left; will restart worker groupzH[%s] Detected %s new nodes from group_rank=%s; will restart worker group[z] Worker group in z state)r   ri   r!   r   r   r?   r   r*   r#   rb   r   r\   r   sleepr   r   r   namelowerr`   r   _exit_barrierr^   ra   r)   r   r   r]   num_nodes_waitingrl   	Exception)	r4   r!   ri   r*   r#   
run_resultrb   r7  rl   s	            r5   r   zSimpleElasticAgent._invoke_runG  s    !!&&yy6d>V>V>X	
 	  !3!3400((%%++{/?/????JJ'(..t/A/ABJ$$E',D$$':;T=U=UV$q)9)9);(<=qA---F..	 ""$!!;00+2D2DEE++a/KK5 

00)) ,,1,))$*<*<=&&t'9'9:/:/A/AD&&,%%+---$0$B$B$D!!//::
$q(KK4 )" ))$*<*<=v/

|6B g r7   c                 F   t         j                  d| j                  j                  | j                         t        j
                         }	 t        j                  | j                  | j                  j                  t        | j                         t         j                  dt        j
                         |z
         y# t        $ r&}t         j                  d|j                          d}~wt        $ r. t         j                  dt        j
                         |z
         Y yw xY w)aS  
        Define a barrier that keeps the agent process alive until all workers finish.

        Wait for ``exit_barrier_timeout`` seconds for all agents to finish
        executing their local workers (either successfully or not). This
        acts as a safety guard against user scripts that terminate at different
        times.
        zOLocal worker group finished (%s). Waiting %s seconds for other agents to finish)rk   rN   
key_prefixbarrier_timeoutz2Done waiting for other agents. Elapsed: %s secondszGot termination signal: %sNz2Error waiting on exit barrier. Elapsed: %s seconds)r   r   r   rb   r   r   
store_utilbarrierr   rm   _TERMINAL_STATE_SYNC_IDr   r   r   r8  	exception)r4   startr   s      r5   r6  z SimpleElasticAgent._exit_barrier  s     	<$$&&		
 			kk-->>2 $ : :	 KKD		e#  	NN7B 	D		e#	s   A0B< <	D !C&&7D D )i,  )F)NNN)0r@   rA   r>   rB   r   rG   rR   r   r:   r   r   r   r   r
   rD   r   r   rg   r   r   r   signalSIGTERMSignalsr   r   r   r   r   r   r   r   r   r   r  r  r   r  r   r   r   r   r   r   r)  r(  r   r6  r&   r7   r5   r   r     s   'Z 'u ' ,8 "S "K " 	"; "4S> " " 	<A	"'	"59	"		" 	" 	"[ "Y " " 	*0..U""FJ"	" " 
;
 ;
 ;
 
;
~ 
Z!$Z8;ZCMZ	fZ 
Z| 
1 1 1 
1< 
/[ /T / 
/ 
* L Ly L 
L,
% 

U 
XI X$ XF F	 Fc F S  " $(#''+)
)
 )
  	)

 C=)
 e_)
 
)
V
Y 
&@
E$ 
E '3 C Cy CJ"r7   r   )?r   rz   r;   rB  r   r   r  r0   collectionsr   
contextlibr   dataclassesr   r   enumr   typingr   r	   r
   r   r   r   r   $torch.distributed.elastic.rendezvousdistributedelastic
rendezvousrE   %torch.distributed.elastic.utils.storeutilsrk   r=   torch.distributed.elastic.eventsr   r   r   !torch.distributed.elastic.metricsr   r   )torch.distributed.elastic.multiprocessingr   r   r   'torch.distributed.elastic.utils.loggingr   __all__r?  r   r@   r   r   r   r:   r   r   rr   r   r   ABCr   r   r&   r7   r5   <module>rV     s     	      # % (  D D D 3 3 : : G G > U L > > 	H	 A0 A0 A0HG GT3E#t 3El& &@6$ 6$r 0 0 0B0# 0;"377 ;"|} }r7   