
    sg                     L   U d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dlm	Z	m
Z
mZmZmZmZmZ ddlmZmZ ddlmZ g dZ ej,                  d      Z ej0                  ej2                        Zej7                  e        ej8                  e      Zd	e_        ejA                  ejB                         ejE                  e        G d
 de#      Z$ G d de#      Z%dZ&e'e(d<   dZ)e'e(d<   dZ*dZ+dZ,dZ-dZ. G d de      Z/ G d d      Z0dedejb                  fdZ2dedefdZ3y)    N)Optional)RendezvousClosedErrorRendezvousErrorRendezvousHandlerRendezvousInfoRendezvousParametersRendezvousStoreInfoRendezvousTimeoutError   )	cas_delay	EtcdStore)parse_rendezvous_endpoint)EtcdRendezvousRetryableFailureEtcdRendezvousRetryImmediatelyEtcdRendezvousHandlerEtcdRendezvouscreate_rdzv_handlerz%%(levelname)s %(asctime)s %(message)sFc                       e Zd Zy)r   N__name__
__module____qualname__     g/var/www/html/venv/lib/python3.12/site-packages/torch/distributed/elastic/rendezvous/etcd_rendezvous.pyr   r   7       r   r   c                       e Zd Zy)r   Nr   r   r   r   r   r   =   r   r   r   iX  _DEFAULT_TIMEOUT   _DEFAULT_LAST_CALL_TIMEOUT   
   i   c                   h    e Zd ZdZdddee   fdZd ZdefdZd	 Z	d
 Z
d Zd ZdefdZdefdZy)r   a  
    Implements a
    :py:class:`torch.distributed.elastic.rendezvous.RendezvousHandler` interface
    backed by
    :py:class:`torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous`.
    ``EtcdRendezvousHandler`` uses a URL to configure the type of rendezvous to
    use and to pass implementation specific configurations to the rendezvous
    module. The basic etcd rendezvous configuration URL looks like the following
    ::

     etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

     -- example --

     etcd://localhost:2379/1234?min_workers=1&max_workers=3

    The URL above is interpreted as follows:

    1. Use the rendezvous handler that is registered with the ``etcd``
       scheme
    2. The ``etcd`` endpoint to use is ``localhost:2379``
    3. ``job_id == 1234`` is used as the prefix in etcd (this allows one to
       share a common etcd server for multiple jobs so long as the
       ``job_ids`` are guaranteed to be unique). Note that the job id can be
       any string (e.g. does not need to be a number) as long as it is
       unique.
    4. ``min_workers=1`` and ``max_workers=3`` specifies a range for
       membership size - Torch Distributed Elastic starts running the job as
       long as the cluster size is greater than or equal to ``min_workers``
       and admits up to ``max_workers`` into the cluster.

    Below are a full list of the parameters that can be passed to etcd
    rendezvous:

    +--------------------------------------------+--------------------------+
    | Parameter                                  | Description              |
    +============================================+==========================+
    | min_workers                                | minimum number of        |
    |                                            | workers for the          |
    |                                            | rendezvous to be valid   |
    +--------------------------------------------+--------------------------+
    | max_workers                                | maximum number of        |
    |                                            | workers to admit         |
    +--------------------------------------------+--------------------------+
    | timeout                                    | total timeout within     |
    |                                            | which next_rendezvous is |
    |                                            | expected to succeed      |
    |                                            | (default 600s)           |
    +--------------------------------------------+--------------------------+
    | last_call_timeout                          | additional wait amount   |
    |                                            | ("last call") after min  |
    |                                            | number of workers has    |
    |                                            | been reached (defaults   |
    |                                            | to 30s)                  |
    +--------------------------------------------+--------------------------+
    | etcd_prefix                                | path prefix (from etcd   |
    |                                            | root), inside which all  |
    |                                            | etcd nodes will be       |
    |                                            | created (defaults to     |
    |                                            | ``/torchelastic/p2p``)   |
    +--------------------------------------------+--------------------------+
    	rdzv_implr   
local_addrc                      || _         || _        y)z
        Args:
            rdzv_impl: the implementation of the rendezvous
            local_addr: the local address of the current node
        N)
_rdzv_impl_local_addr)selfr$   r%   s      r   __init__zEtcdRendezvousHandler.__init__   s     $%r   c                     | ` y N)r'   r)   s    r   __del__zEtcdRendezvousHandler.__del__   s    Or   returnc                      y)Netcdr   r-   s    r   get_backendz!EtcdRendezvousHandler.get_backend   s    r   c                     | j                   j                         \  }}}t        j                  d       | j                   j	                  |      }t        j                  ||| j                        }t        ||||      S )Nz4Creating EtcdStore as the c10d::Store implementation)r%   )	r'   rendezvous_barrierloggerinfosetup_kv_storer	   buildr(   r   )r)   rdzv_versionrank
world_sizestorebootstrap_store_infos         r   next_rendezvousz%EtcdRendezvousHandler.next_rendezvous   sn    )-)K)K)M&dJJK..|<288%D$4$4 
 eT:7KLLr   c                     	 | j                   j                         \  }}|d   dk(  S # t        j                  $ r Y yw xY w)NstatusclosedFr'   get_rdzv_stater1   EtcdKeyNotFoundr)   _states      r   	is_closedzEtcdRendezvousHandler.is_closed   sD    	557HAu?h..## 		s   $' ==c                 8    | j                   j                          y r,   )r'   
set_closedr-   s    r   rJ   z EtcdRendezvousHandler.set_closed   s    ""$r   c                     	 | j                   j                         \  }}|d   dk(  r|d   S 	 y# t        j                  $ r Y yw xY w)Nr@   finalnum_workers_waitingr   rB   rE   s      r   num_nodes_waitingz'EtcdRendezvousHandler.num_nodes_waiting   s\    	557HAuX')233 *  ## 		s   ). AAc                 .    | j                   j                  S r,   )r'   _run_idr-   s    r   
get_run_idz EtcdRendezvousHandler.get_run_id   s    &&&r   c                     	 | j                          y# t        $ r)}t        j                  dt	        |             Y d }~yd }~ww xY w)NTz#Shutdown failed. Error occurred: %sF)rJ   BaseExceptionr5   warningstrr)   es     r   shutdownzEtcdRendezvousHandler.shutdown   s:    	OO 	NN@#a&I	s    	AA  AN)r   r   r   __doc__r   rU   r*   r.   r2   r>   rH   rJ   rN   rQ   boolrX   r   r   r   r   r   X   s^    =~&"2 & &S 	M%'C '$ r   r   c                       e Zd ZdZd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd Zd Zd Zd Zd Zd Zd ZddZd ZddZd Zd ZddZd Zy)r   zWA rendezvous implementation that uses `etcd <https://etcd.io/>`__ as the backend store.c                    || _         t        j                  d| j                   j                         || _        || _        || _        || _        || _        || _	        d | _
        d | _        | j                  j                  d      s| xj                  dz  c_        | j                  dk7  r| j                  | j                         | j                  | j                  d      t               | j!                  | j                  d      t              | _
        | j                  | j                  d             	 | j                   j#                  | j                  d      dd	       y # t$        j&                  $ r Y y w xY w)
NzEtcd machines: %s/ ttl/rdzv/rdzv/version_counter0Fkeyvalue	prevExist)clientr5   r6   machines_prefixrP   _num_min_workers_num_max_workers_timeout_last_call_timeout_lease_run_id_stop_lease_this_rank_stopendswithcreate_path_if_not_existsget_pathCONST_RUNID_SUBROOT_TTLsetup_lease_renewalwriter1   EtcdAlreadyExist)r)   rh   prefixrun_idnum_min_workersnum_max_workerstimeoutlast_call_timeouts           r   r*   zEtcdRendezvous.__init__   sJ    ')=)=> / /"3 #'%)"||$$S)LLCL <<3**4<<8 	&&t}}R'8>U&V"&":":MM"#: #; #

 	&&t}}W'=>	KKMM"9:#QV   $$ 		s   6-E$ $E:9E:c                     | j                   | j                   j                          | j                  | j                  j                          y y r,   )ro   setrp   r-   s    r   r.   zEtcdRendezvous.__del__  sC    "".##'')%%1&&**, 2r   c                    t        j                          | j                  z   | _        	 t        j                          | j                  kD  rt        t        j                  d       	 | j                  | j                  j                          | j                         S # t        $ r Y nt        $ r t        j                  d       Y nt        $ r t        j                  d        t        $ r" t        j                  d| j                          t        $ r  t        $ r5}t        j                  d|       t        j                  d       Y d}~nd}~ww xY w/)a  
        Main entry point for next rendezvous.

        This method is blocking until rendezvous succeeds or a timeout occurs.

        Returns:
             ``(rdzv_version, rank, world_size)``

        Raises:
            RendezvousTimeoutError - timeout waiting for rendezvous
            RendezvousClosedError - rendezvous is or was closed while waiting
            RendezvousError - other persistent errors that
             render the rendezvous non-retryable
        z"Attempting to join next rendezvousNr   z4Rendezvous timeout occurred in EtcdRendezvousHandlerz2Rendezvous for run_id=%s was observed to be closedz1Rendezvous attempt failed, will retry. Reason: %s)timerm   _rendezvous_deadliner
   r5   r6   rp   r   
init_phaser   r   sleepr   rP   r   	ExceptionrV   s     r   r4   z!EtcdRendezvous.rendezvous_barrier  s    %)IIK$--$?!yy{T666,,KK<=#--9..224((1 1  

1) RS( H$,, "  
 OQRS

1E s%   %5B 	E&EAE+EEc                    	 | j                         }t        j                  |j                        }t        j                  d|       |d   dk(  rt        |d   dk(  r| j                  |d         S |d   dk(  r| j                  |d          t        | j                  |j                  dz   	       t        # t        j                  $ r, | j                         \  }}t        j                  d|       Y w xY w)
a  
        Initially, the rendezvous state is expected to be one of:

        1. empty (non-existent) - in this case we try to create a new one.
        2. joinable - we try to join it.
        3. final - we announce ourselves as waiting, and go into monitoring mode

        Any other state is considered transitional, and will be retried after
        a short delay.

        Returns:
            ``(rdzv_version, rank, world_size)``

        Raises:
            RendezvousClosedError - current rendezvous was/is closed
            EtcdRendezvousRetryableFailure - observed some intermediate
             state, which is best handled by retrying later
        z New rendezvous state created: %sz&Observed existing rendezvous state: %sr@   rA   joinableversionrL   r   
etcd_index)try_create_rendezvousjsonloadsrf   r5   r6   r1   rw   rC   r   
join_phasehandle_existing_rendezvousr   try_wait_for_state_changer   r   r)   active_versionrG   s      r   r   zEtcdRendezvous.init_phaseO  s    &	I!779NJJ~334EKK:EB ?h&''?j(??5#344?g%++E),<=00&&.2K2Ka2O&P,,# $$ 	I$($7$7$9!NE KK@%H		Is   AB8 8<C76C7c                 \   | j                  |      \  }}t        j                  |j                        }t        j                  d|d   ||       || j                  dz
  k(  rg|d   dk(  r_t        j                  d|       t        j                         | j                  z   }| j                  ||       t        j                  d|       t        j                  d       | j                  |      }t        j                  |j                        }|d   |k(  sJ d	       | j                  ||      S )
z
        We observed a rendezvous state in 'joinable' state, and attempt to join this
        particular version, and then wait for all other peers to join.
        z7Joined rendezvous version %s as rank %s. Full state: %sr   r   r@   r   z*Rank %s is responsible for join last call.z Rank %s finished join last call.zWaiting for remaining peers.z/Logic error: failed to observe version mismatch)join_rendezvousr   r   rf   r5   r6   rk   r   rn   handle_join_last_callwait_for_peersconfirm_phase)r)   expected_versionr   	this_rankrG   last_call_deadlines         r   r   zEtcdRendezvous.join_phasey  s    %)$8$89I$J!	

>//0E)		
  --11eHo6SKKDiP!%t/F/F!F&&'79KLKK:IF 	23,,-=>

>//0 ) 00	=<	=0 !!"2I>>r   c                 6   t         j                  d       | j                  ||       t         j                  d       | j                  |      }t	        j
                  |j                        }t         j                  d|d   |       |d   |t        |d         fS )a6  
        Once the rendezvous state transitions from 'joinable' to 'frozen',
        we have every participant confirm their membership and setup per-member
        keep-alive TTL keys, and then wait for all other participants to confirm,
        which would then successfully conclude this rendezvous.
        z)All peers arrived. Confirming membership.z)Waiting for confirmations from all peers.z2Rendezvous version %s is complete. Final state: %sr   participants)r5   r6   confirm_membershipwait_for_finalr   r   rf   len)r)   r   r   r   rG   s        r   r   zEtcdRendezvous.confirm_phase  s     	?@ 0)<?@,,-=>

>//0@)	
 YCn0E,FFFr   c                     | j                  |      }t        j                  d|j                         | j	                  |       t        j                  d       y)z
        Handle the case when there's an existing (state 'final) rendezvous already
        in place, and we have to announce ourselves waiting, and wait until
        the next rendezvous opportunity.
        z5Added self to waiting list. Rendezvous full state: %szBPreviously existing rendezvous state changed. Will re-try joining.N)announce_self_waitingr5   r6   rf   wait_for_rendezvous_to_free)r)   r   active_states      r   r   z)EtcdRendezvous.handle_existing_rendezvous  sL     112BCC\EWEW	
 	(()9:P	
r   c           	         | j                   j                  | j                  d      t        j                  ddi      dt
              }	 | j                   j                  | j                  d            }t        t        |j                        dz         |_	        | j                   j                  |       | j                   j                  | j                  d
|j                         d	dd       | j                   j                  | j                  d      t        j                  d|j                  g d      |j                        S # t        j                  t        j                  f$ r}t        d      |d	}~ww xY w)z
        Create new rendezvous state or raise an exception that indicates an unexpected state (e.g. already exists).

        Raises:
             RendezvousError - on unexpected state
        /rdzv/active_versionr@   setupF)re   rf   rg   r`   rb   r   z?Unexpected state of EtcdRendezvousHandler, worker needs to die.N/rdzv/v_T)re   rf   dirrg   r   )r@   r   r   re   rf   
prev_value)rh   rv   rs   r   dumpsCONST_ETCD_SETUP_TTLgetrU   intrf   updater1   rD   EtcdCompareFailedr   test_and_set)r)   r   version_counterrW   s       r   r   z$EtcdRendezvous.try_create_rendezvous  sY    **45**h01$	 + 
	"kkoodmm<S.TUO$'O,A,A(BQ(F$GO!KK/ 	)>)>(?@A	 	 	
 {{''45**(.44$& &++ ( 

 
	
) $$d&<&<= 	!Q	s   	A+E #E;*E66E;c                    	 t                | j                         \  }}|d   dk7  rt        d      |d   |k7  rt        d      t	        |d         | j
                  k  sJ d       t	        |d         }|d   j                  |       d}t	        |d         | j
                  k(  rd	|d<   g |d
<   t        }n!t	        |d         | j                  k\  rt        }	 | j                  j                  | j                  d      t        j                  |      |j                  |      }||fS # t         j"                  $ r t$        j'                  d       Y nw xY wV)!Helper method for the join phase.r@   r   zNRendezvous state became non-joinable before we could join. Must join next one.r   6Rendezvous version changed. Must try join the new one.r   z>Logic error: joinable rendezvous should always have space leftNfrozenkeep_alivesr   re   rf   r   r`   z*Join rendezvous CAS unsuccessful, retrying)r   rC   r   r   r   rl   appendCONST_ETCD_FROZEN_TTLrk   !CONST_ETCD_JOINABLE_EPHEMERAL_TTLrh   r   rs   r   r   rf   r1   r   r5   r6   )r)   r   r   rG   r   set_ttls         r   r   zEtcdRendezvous.join_rendezvous  s|    K$($7$7$9!NEX*,4* 
 Y#334L 
 E.)*T-B-BBPOPB E.12I.!((3 &*G5()T-B-BB"*h')m$/U>*+t/D/DD;J!%!9!9&<=**U+-33	 ": " &y00)) JHIJW s   AD( ((EEc                     | j                         \  }}	 |d   dk(  r
|d   |k(  r|S |d   dk(  r+|d   |k(  r#| j                  |j                  dz         \  }}nt        d      Q)r   r@   r   r   r   r   r   >Rendezvous state transition no longer possible. Must re-enter.rC   r   r   r   r)   r   r   rG   s       r   r   zEtcdRendezvous.wait_for_peers3  s     $ 3 3 5X(*uY/?CS/S%%xJ.53CGW3W(,(F(F-881< )G )% 5T  r   c                    	 t                | j                         \  }}|d   dk7  rt        d      |d   |k7  rt        d      | j                  d| d|       }| j                  j                  |d	t        
       |d   j                  |       t        |d         t        |d         k(  rd|d<   d|d<   d}nd}	 | j                  j                  | j                  d      t        j                  |      |j                  |rd	nt              }| j                  |t              | _        |S # t         j"                  $ r t$        j'                  d       Y nw xY w^)$Helper method for the confirm phase.Tr@   r   zDRendezvous no longer frozen, before we confirmed. Must join next oner   r   r   z/rank_N)rf   r`   r   r   rL   r   rM   Fr   r   r_   z-Confirm membership CAS unsuccessful, retrying)r   rC   r   rs   rh   r   CONST_WORKER_KEEPALIVE_TTLr   r   r   r   r   rf   r   ru   rp   r1   r   r5   r6   )r)   r   r   r   rG   this_lease_keyfinalizes          r   r   z!EtcdRendezvous.confirm_membershipG  s    K$($7$7$9!NEX(*4)  Y#334L  "]]+,F9+>N KKOON$<VOW- ''75'(Cn0E,FF")h/0+, M!%!9!9&<=**U+-33 (.C	 ": " .2-E-E"(B .F .* &%)) MKLMU s   ?A0D0 0(EEc                     | j                         \  }}	 |d   dk(  r
|d   |k(  r|S |d   dk(  r+|d   |k(  r#| j                  |j                  dz         \  }}nt        d      Q)r   r@   rL   r   r   r   r   r   r   r   s       r   r   zEtcdRendezvous.wait_for_finalw  s     $ 3 3 5X')eI.>BR.R%%xH,y1AEU1U(,(F(F-881< )G )% 5T  r   c                 |   	 t                | j                         \  }}|d   dk7  s|d   |k7  rt        |dxx   dz  cc<   	 | j                  j	                  | j                  d      t        j                  |      |j                        }|S # t        j                  $ r t        j                  d       Y nw xY w)	z
        Announce this worker is waiting (via num_workers_waiting counter) to join next
        rendezvous, but only if state and version match.
        r@   rL   r   rM   r   r   r   z3Announce self as waiting CAS unsuccessful, retrying)r   rC   r   rh   r   rs   r   r   rf   r1   r   r5   r6   r   s       r   r   z$EtcdRendezvous.announce_self_waiting  s    
 K$($7$7$9!NEX')U9-=AQ-Q44 '(A-(	S!%!9!9&<=**U+-33 ": "
 &%)) SQRS% s   AB (B:9B:c                    | j                         \  }}	 |d   dk7  s|d   |k7  ry| j                  j                  | j                  d|             }|j                  D cg c]  }|j
                   }}|d   D ]  }||vst        j                  d|       t        j                  d	|       t        j                  d
       | j                  j                  | j                  d      |j                         t        j                  d|        y 	 t        | j                  t        j                         z
  d      dz   }| j                  j                  | j                  d      |j                  dz   d|       t        j                         | j                  kD  rt$        | j                         \  }}c c}w # t        j                   t        j"                  f$ r Y fw xY w)a  
        When there's an existing valid rendezvous in state 'final', we have to wait until the next opportunity to join.

        Such opportunity may come from:

        1. rendezvous state changed by someone else, in which case we unblock and retry.
        2. rendezvous becomes invalid because at least one member failed to renew their
           leased keep_alive node. We detect this, and destroy the rendezvous.
        Tr@   rL   r   Nr   r   z!Keep-alive key %s is not renewed.z%Rendezvous version %s is incomplete. zAttempting to destroy it.r   )re   	prevValuez-Destroyed rendezvous version %s successfully.              ?ra   r   )re   index	recursiver|   )rC   rh   r   rs   childrenre   r5   r6   deleterf   maxr   r   watchr   r1   EtcdEventIndexClearedEtcdWatchTimedOutr
   )	r)   r   r   rG   alive_memberschkeep_alive_keysre   overall_timeouts	            r   r   z*EtcdRendezvous.wait_for_rendezvous_to_free  s    !% 3 3 5X')U9-=AQ-Q
 !KKOO)9(:;<M 1>0F0FG"rvvGOG]+ o- KK CSIKK?AQ KK ;<
 KK&& MM*@A"0"6"6 ' 
 KKG( 1811DIIK?EK   !!g.(33a7"+	 "  yy{T666,,$($7$7$9!NEo  HP ..0F0FG s   !F9A)F> >#G$#G$c                    | j                         \  }}	 |d   dk(  r	|d   |k(  ry|d   dk7  s|d   |k7  rt        d      t        j                         |k\  r[d|d<   g |d<   	 | j                  j	                  | j                  d      t        j                  |      |j                  t        	      }y	 | j                  j	                  | j                  d      |j                  |j                  t        	      }t!        t        dz  |t        j                         z
  dz         }| j#                  |j$                  dz   |      \  }}5# t        j                  $ r6 t        j                  d
       t                | j                         \  }}Y ~w xY w# t        j                  $ r5 t        j                  d       t                | j                         \  }}Y w xY w)a  
        After we reach min number of workers, one particular worker takes on the
        responsibility of waiting an additional timeout before closing the join window.
        If the worker responsible for this fails, the rendezvous will be destroyed due
        to expiring TTL, and the other participants will re-rendezvous.

        Here we expect to see state <joinable, expected_version>
        Exit gracefully if either:

        1. state becomes <frozen, expected_version>
        2. timeout happens (reaching deadline), in which case
           we try the transition to <frozen, expected_version>

        Exit with exception otherwise.
        r@   r   r   Nr   r   r   r   r   z6Join last-call transition CAS unsuccessful. Will retry   r   r   )r   r|   z7Join last-call TTL refresh CAS unsuccessful, will retry)rC   r   r   rh   r   rs   r   r   rf   r   r1   r   r5   r6   r   r   minr   r   )r)   r   deadliner   rG   r|   s         r   r   z$EtcdRendezvous.handle_join_last_call  s     !% 3 3 5X(*uY/?CS/S X*,i0@DT0T4T 
 yy{h&"*h')m$%)[[%=%= MM*@A"jj/#1#7#71	 &> &N >!%!9!9&<=(..-339	 ": " 59tyy{*S0 )-(F(F-881<g )G )%c 0 -- KKP K,0,?,?,A)NE8 )) >UV(,(;(;(=%>s(   #AE
 4BF 
AFFAGGc                 P   	 | j                         \  }}|d   dk(  ryd|d<   	 | j                  j                  | j                  d      t	        j
                  |      |j                         y# t        j                  $ r" t        j                  d       t                Y nw xY w)z
        Mark rendezvous 'closed' for current run_id, which is used to signal other
        participants to not attempt to perform (re-)rendezvous. This is useful
        when one of the workers decides the job is complete.
        r@   rA   Nr   r   z%Set closed CAS unsuccessful, retrying)rC   rh   r   rs   r   r   rf   r1   r   r5   r6   r   r   s      r   rJ   zEtcdRendezvous.set_closed3  s     $($7$7$9!NEX(*&E(O
((&<=**U+-33 ) 
 )) CD! s   A
A/ /2B$#B$c                     | j                   j                  | j                  d            }|t        j                  |j
                        fS )Nr   )re   )rh   r   rs   r   r   rf   )r)   r   s     r   rC   zEtcdRendezvous.get_rdzv_stateM  s:    T]];Q-RStzz.*>*>???r   Nc                    t        | j                  t        j                         z
  d      dz   }||nt        ||      }	 | j                  j                  | j                  d      ||       t        j                         | j                  kD  rt        | j                         S # t        j                  t        j                  f$ r Y \w xY w)Nr   r   r   )r   r|   )r   r   r   r   rh   r   rs   r1   r   r   r
   rC   )r)   r   r|   r   s       r   r   z(EtcdRendezvous.try_wait_for_state_changeQ  s    d77$))+EsKcQ%,_/#g:W	KK45ZQX   99;222(( ""$$ **D,B,BC 		s    -B$ $#C
	C
c                 h    |j                  d      sd|z   }| j                   d| j                   | S )Nr]   run_)
startswithrj   rP   )r)   paths     r   rs   zEtcdRendezvous.get_pathc  s4    s#:D,,tDLL>$88r   c                 x    	 | j                   j                  |d dd|       y # t        j                  $ r Y y w xY w)NTF)re   rf   r   rg   r`   )rh   rv   r1   rw   )r)   	full_pathr`   s      r   rr   z(EtcdRendezvous.create_path_if_not_existsi  sC    	KKTtu#   $$ 		s    # 99c                     d }t        j                         }t        j                  || j                  |||f      }d|_        |j                          |S )Nc                     	 	 | j                  ||       |j	                  |dz        ry +# t        j                  $ r Y y t        $ r Y y w xY w)Nr_   r   )r|   )refreshr1   rD   ConnectionRefusedErrorwait)rh   r   r`   
stop_events       r   lease_workerz8EtcdRendezvous.setup_lease_renewal.<locals>.lease_workerw  s]    NN4SN1 ??37?3  ++ -  s   - AAA)targetargsT)	threadingEventThreadrh   daemonstart)r)   r   r`   r   lease_stop_eventlease_threads         r   ru   z"EtcdRendezvous.setup_lease_renewalq  sU    	 %??, ''t{{IsDT&U
 #r   c                 J   | j                  d| d      }	 | j                  j                  |t        j                  ||i      d      }y # t
        j                  $ r Y nw xY w	 | j                  j                  |      }t        j                  |j                        }|||<   	 | j                  j                  |t        j                  |      |j                        }y # t
        j                  $ r- t        j                  d       t        j                  d       Y nw xY w)Nr   /extra_dataFrd   r   z+Store extra_data CAS unsuccessful, retryingg?)rs   rh   rv   r   r   r1   rw   r   r   rf   r   r   r5   r6   r   r   )r)   r9   re   rf   node
extra_datanew_extra_data_values          r   store_extra_datazEtcdRendezvous.store_extra_data  s   }}x~[AB	**

C< 8E + J $$ 		 .J#'::j.>.>#? (- %	 ![[55**%9:)// 6 

 ))  IJ

3  s#   3A A! A!%;C! !=D! D!c                 $   | j                  d| d      }| j                  d|       }	 | j                  j                  |      }|j                  D cg c]  }|j                  |k(  s| }}t        |      dk  sJ t        |      dk(  r+t        j                  |d   j                        }	||	v r|	|   S 	 | j                  j                  ||j                  dz          c c}w # t        j                  t        j                  f$ r Y +w xY w)Nr   r   r   r   )r   )rs   rh   r   r   re   r   r   r   rf   r   r   r1   r   r   )
r)   r9   re   r|   r   node_dirrootnr   extra_data_dicts
             r   load_extra_datazEtcdRendezvous.load_extra_data  s   }}x~[AB==8L>!:; ;;??8,D &*]]Daeetm!DJDz?a''' :!#"&**Z]-@-@"A/)*3//!!$doo.A!B# 
 E ..0F0FG s   C$)C$9*C) )#DDc                 |    | j                  d| d      }| j                  |       t        | j                  |      S )Nr   z/kv)etcd_clientetcd_store_prefix)rs   rr   r   rh   )r)   r9   
store_paths      r   r7   zEtcdRendezvous.setup_kv_store  s8    ]]Xl^3#?@
&&z2T[[JOOr   r,   )r   r   r   rY   r*   r.   r4   r   r   r   r   r   r   r   r   r   r   r   r   rJ   rC   r   rs   rr   ru   r   r   r7   r   r   r   r   r      s    a.`-8t(-T'?RG.
(3
j/Jb(.M`(S4B:HH>T4@%$9 < <8Pr   r   paramsr/   c                    t        | j                  d      \  }}| j                  j                  d      }|d}n|dk7  r|dk7  rt	        d      | j                  j                  d      }|!| j                  j                  d      }|||f}| j                  j                  d      }t        j                  |||||d	
      S )zICreate a new ``etcd.Client`` from the specified ``RendezvousParameters``.iK	  protocolhttphttpsz(The etcd protocol must be HTTP or HTTPS.certre   cacertT)r  r
  ca_certallow_reconnect)r   endpointconfigr   
ValueErrorr1   Client)r  hostnameportr  ssl_certcert_keyr  s          r   _create_etcd_clientr    s    .vENHd }}  ,Hv(g"5GHH }}  (H==$$U+ !(+H mm)G;; r   c                    t        |       }| j                  dd      }t        ||| j                  | j                  | j
                  | j                  dt              | j                  dt                    }t        || j                        S )a2  
    Usage:

    ::

    rdzv_params = RendezvousParameters(
                        backend="etcd",
                        endpoint="192.168.0.42:2379",
                        run_id="123",
                        min_nodes=4,
                        max_nodes=8,
                        timeout=300,
                        last_call_timeout=30,
                        etcd_prefix="custom_prefix",
                        protocol="https",
                        cacert="/etc/kubernetes/certs/ca.crt",
                        cert="/etc/kubernetes/certs/client.crt",
                        key="/etc/kubernetes/certs/client.key")
    # -- or --
    rdzv_params = RendezvousParameters(
                        backend="etcd",
                        endpoint="192.168.0.42:2379",
                        run_id="123",
                        min_nodes=4,
                        max_nodes=8)

    etcd_rdzv_handler = create_etcd_rendezvous_handler(rdzv_params)


    Where:
        run_id - unique id for this training job instance,
        min_nodes - min number of workers expected to join the rendezvous,
        max_nodes - max number of workers allowed to join the rendezvous,
                        defaults to min_workers is not specified.
        timeout - total timeout within which next_rendezvous is expected to
                      succeed; a RendezvousTimeoutError is raised otherwise;
                      Defaults is 600 (10 minutes).
        last_call_timeout - additional wait amount ("last call") after
                            min number of workers has been reached.
                            Defaults to 30 seconds.
        etcd_prefix - path prefix (from etcd root), inside which all
                      etcd nodes will be created.
                      Default is "/torchelastic/p2p".
        protocol - http (default) or https to access etcd.
        cacert - CA cert to access etcd, only makes sense with https.
        cert - client cert to access etcd, only makes sense with https.
        key - client key to access etcd, only makes sense with https.
    etcd_prefixz/torchelastic/p2pr|   r}   )rh   rx   ry   rz   r{   r|   r}   )r$   r%   )r  r   r   ry   	min_nodes	max_nodes
get_as_intr   r    r   r%   )r  rh   r  rdzvs       r   r   r     s    b !(F**],?@K}}((((!!)-=> ++!;

D !$$ r   )4r   loggingsysr   r   typingr   r1   $torch.distributed.elastic.rendezvousr   r   r   r   r   r	   r
   
etcd_storer   r   utilsr   __all__	Formatter_log_fmtStreamHandlerstderr_log_handlersetFormatter	getLoggerr   r5   	propagatesetLevelINFO
addHandlerr   r   r   r   r   __annotations__r    r   r   r   r   rt   r   r   r  r  r   r   r   r   <module>r0     sQ     
       - , 7DE$w$$SZZ0   ( #			8	$      , 	Y 		Y 	
  #  #% C $   $& !     y- yDrP rPj 4  FC 4 C9J Cr   