
    sg1                        d dl Z d dlZd dlZd dlmZmZ d dlmZmZm	Z	 d dl
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% ddl&m'Z'm(Z(m)Z) g dZ* ede+      	 	 	 	 d$dede#deejX                     de-de.dee   defd       Z/ ed      e'ddddddede	e0ejb                  df   dee#   dee   deejX                     defd               Z2 ed      ddddddede	e0ejb                  df   dee#   dee   deejX                     defd!       Z3dedefd"Z4	 	 	 	 d$dede#deejX                     de-de.dee   defd#Z5y)%    N)FutureThreadPoolExecutor)castOptionalUnion)
deprecated)_offload_state_dict_to_cpu)_storage_setup)DefaultSavePlanner)_dcp_method_logger)MetadataSTATE_DICT_TYPE)SavePlanSavePlanner)AsyncStager)Stateful)StorageWriter)_get_default_group   )_api_bc_check_DistWrapper_profile)save_state_dictsave
async_saveza`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)category
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          |j                          t               5  t        | |||||      cddd       S # 1 sw Y   yxY w)z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r   r   r   r    r!   r"   s         `/var/www/html/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr   r      sF      
 


 
 
s   5>T)log_exceptionscheckpoint_idr   r"   r   r*   c          	      n   t         j                  j                  d       t        j                         xr t        j
                          }|rt        j                  d       t               5  t        t        t        ||d            }t        t        |       ||||      cddd       S # 1 sw Y   yxY w)a  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter("/checkpoint/1")
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezftorch.distributed is unavailable or uninitialized, assuming the intent is to save in a single process.Freader)r   r   r   r!   r"   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r
   r&   _stateful_to_state_dict)r   r*   r   r"   r   r!   s         r'   r   r   9   s    d 
HH  !DE$$&@4+>+>+@AGt	
 
 
>.-PUV
  .z:)'

 
 
s   ,5B++B4c                |   t         j                  j                  d       t        j                         rJt        j
                         r6|xs
 t               }t        j                  d      |j                  v sJ d       t        t        t        ||d            }t        |       } t        |t              r|j                  |       }nt!        | d      }t#        d      j%                  t&        |||||	      }|j)                  fd
       t        |t              r|j*                  r|j-                          |S )aa  Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter("/checkpoint/1")
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'Fr,   )
type_checkr   )max_workersr)   c                 (    j                  d      S )NF)wait)shutdown)fexecutors    r'   <lambda>zasync_save.<locals>.<lambda>   s    ("3"3"3"?     )r.   r/   r0   r1   r2   r3   r   device_device_typesr   r   r
   r6   
isinstancer   stager	   r   submitr   add_done_callback should_synchronize_after_executesynchronize_staging)	r   r*   r   r"   r   pgstaged_state_dictr>   r?   s	           @r'   r   r      s   l 
HH  !JKt224202LL2#3#33	ts	t3 ~nmERN )4J.+.*00<6zeT!a0H#%#   A ?@ 	>;/;;**,HrA   c                     i }| j                         D ]*  \  }}t        |t              r|j                         n|||<   , |S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.)itemsrD   r   r   )r   stateful_state_dictkeyelems       r'   r6   r6      sK    %%' 
	T!+D(!;DOO 	C 
 rA   c                     t         j                  j                  d       t        || |      
t	               J d i }t        dd       x}||d<   t        d	i | fd       }t        d	i |fd       }	j                  d||	      t        d	i |fd       }
t        d	i |fd       }j                  d|
|      S )
Nz,torch.distributed.checkpoint.save_state_dictr*   c                     J j                         } dt        j                  j                        j                  vr2t        j                  d       j                  j                         nj                  | j                         j                  j                         j                         }j                  |      }|S )Nstorage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.)r   rS   is_coordinator)rS   inspect	signatureset_up_planner
parametersr4   r5   rT   set_up_storage_writercreate_local_planprepare_local_plan)rS   
local_plandistWr"   r   r   s     r'   
local_stepz$_save_state_dict.<locals>.local_step  s    """%224!2!273I3I!J!U!UUMM.
 "":u/C/CD""%)$33 # 
 	,,U-A-AB..0
#66zB
rA   c                 Z    J j                  |       \  } j                  |       } | S N)create_global_planprepare_global_plan)all_local_plansglobal_metadatar"   r   s    r'   global_stepz%_save_state_dict.<locals>.global_step3  s<     """+2+E+Eo+V((<<_MrA   planc                      J j                        } j                  |       }|j                          |j                         S r`   )finish_plan
write_datar<   value)final_local_plan
all_writescentral_planr"   r   s     r'   ri   z$_save_state_dict.<locals>.write_data>  sL    """"..|<#../?I
!!rA   c                 6    J j                  |        S )N)metadataresults)finish)all_resultsrd   r   s    r'   finish_checkpointz+_save_state_dict.<locals>.finish_checkpointG  s(    ***LrA   write )	r.   r/   r0   r   r   getattrr   reduce_scatter
all_reduce)r   r   r   r    r!   r"   ckpt_kwargsckpt_idr^   re   ri   rs   rm   r]   rd   s   ``   `      @@@r'   r&   r&     s    
HH  !OPG5EFE$&OK>?DAAN'.O$&+& ', &+& ' #11&*kRL&+&" '" &+& '
 GZ1BCCrA   )Nr   FN)6rU   osr4   concurrent.futuresr   r   typingr   r   r   typing_extensionsr   r.   torch.distributeddistributedr1   #torch.distributed._state_dict_utilsr	   +torch.distributed.checkpoint._storage_utilsr
   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   r   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   "torch.distributed.distributed_c10dr   utilsr   r   r   __all__FutureWarningProcessGroupintboolr   strPathLiker   r   r6   r&   ru   rA   r'   <module>r      s|    	  9 ( ( (    J F K B K F < : > A 8 8 4 ! 26%)

!
 D--.
 	

 
 k"
 


. 4( 48.2%)15c
c
 bkk4/0c
 ]+	c

 k"c
 D--.c
 c
  )c
L 4( 48.2%)15XX bkk4/0X ]+	X
 k"X D--.X X )Xv O  26%)FDFD!FD D--.FD 	FD
 FD k"FD FDrA   