o
    |Mfh                     @   s  d Z dZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlmZmZ ddlZddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlm Z  ddl!m"Z"m#Z# ddl$m%Z%m&Z& ddl'm(Z(m)Z) ddl*m+Z+m,Z, ddl-m.Z. e/ej01ddZ2da3dZ4e/dZ5zddl6m7Z7 dZ8dLddZ9W n e:y   dZ8Y nw G dd dZ;G dd dZ<da=e> Z?e@ ZAd d! ZBejCDeAd"d#  daEdZFG d$d% d%eGZHG d&d' d'ZId(d) ZJG d*d+ d+ZKG d,d- d-ZLG d.d/ d/ZMG d0d1 d1e%ZNd2d3 ZOd4d5 ZPdMd6d7ZQd8d9 ZRG d:d; d;ejSZTdaUdaVd<d= ZWd>d? ZXd@dA ZYG dBdC dCeZZ[G dDdE dEeZG dFdG dGeZ\eZ]G dHdI dIeZZ^G dJdK dKeZ_dS )Na*	  Implements ProcessPoolExecutor.

The follow diagram and text describe the data-flow through the system:

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     +--------+     | 4, result |    |         |
|          |     | ...        |                    | 3, except |    |         |
+----------+     +------------+                    +-----------+    +---------+

Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue

Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
  it is simply removed from the dict, otherwise it is repackaged as a
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
  "Work Items" dict and deletes the dict entry

Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
  _ResultItems in "Result Q"
z,Thomas Moreau (thomas.moreau.2010@gmail.com)    N)timesleep)partial)PicklingError)Executor)LOGGER)BrokenProcessPool)wait   )Future)get_context)	cpu_count_MAX_WINDOWS_WORKERS)QueueSimpleQueue)set_loky_picklerget_loky_pickler_name)kill_process_treeget_exitcodes_terminated_worker)_prepare_initializerLOKY_MAX_DEPTH
   g      ?g    A)ProcessTFc                 C   s0   |rt   t|  j}tjd|  |S )Nzpsutil return memory size: )gccollectr   memory_inforssmputildebug)pidforce_gcmem_size r#   h/var/www/html/analyze/labelStudio/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py_get_memory_usagek   s
   r%   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_ThreadWakeupc                 C   s   d| _ tjdd\| _| _d S )NF)duplex)_closedr   Pipe_reader_writerselfr#   r#   r$   __init__x   s   z_ThreadWakeup.__init__c                 C   s(   | j sd| _ | j  | j  d S d S NT)r(   r+   closer*   r,   r#   r#   r$   r0   |   s
   
z_ThreadWakeup.closec                 C   s   | j s| jd d S d S )N    )r(   r+   
send_bytesr,   r#   r#   r$   wakeup   s   z_ThreadWakeup.wakeupc                 C   s0   | j s| j r| j  | j sd S d S d S N)r(   r*   poll
recv_bytesr,   r#   r#   r$   clear   s   

z_ThreadWakeup.clearN)__name__
__module____qualname__r.   r0   r3   r7   r#   r#   r#   r$   r&   w   s
    r&   c                   @   s*   e Zd ZdZdd Zd	ddZdd ZdS )
_ExecutorFlagsa  necessary references to maintain executor states without preventing gc

    It permits to keep the information needed by executor_manager_thread
    and crash_detection_thread to maintain the pool without preventing the
    garbage collection of unreferenced executors.
    c                 C   s   d| _ d | _d| _|| _d S )NF)shutdownbrokenkill_workersshutdown_lock)r-   r?   r#   r#   r$   r.      s   
z_ExecutorFlags.__init__Nc                 C   sP   | j  d| _|d ur|| _W d    d S W d    d S 1 s!w   Y  d S r/   )r?   r<   r>   )r-   r>   r#   r#   r$   flag_as_shutting_down   s   "z$_ExecutorFlags.flag_as_shutting_downc                 C   s8   | j  d| _|| _W d    d S 1 sw   Y  d S r/   )r?   r<   r=   )r-   r=   r#   r#   r$   flag_as_broken   s   "z_ExecutorFlags.flag_as_brokenr4   )r8   r9   r:   __doc__r.   r@   rA   r#   r#   r#   r$   r;      s
    
r;   c               	   C   s   da tt } t| dkrtjd|   | D ]\}\}}| |  W d    n1 s0w   Y  q| D ]\}}t	 |
  W d    n1 sMw   Y  q8d S )NTr   zJInterpreter shutting down. Waking up {len(items)}executor_manager_thread:
)_global_shutdownlist_threads_wakeupsitemslenr   r   r   r3   _global_shutdown_lockjoin)rF   _r?   thread_wakeupthreadr#   r#   r$   _python_exit   s&   

rM   c                 C   s   |   S r4   )r7   )objr#   r#   r$   <lambda>   s    rO   c                   @   s"   e Zd ZdZdddZdd ZdS )_RemoteTracebackz<Embed stringification of remote traceback in local tracebackNc                 C   s   d| d| _ d S )Nz
"""
z"""tb)r-   rR   r#   r#   r$   r.      s   z_RemoteTraceback.__init__c                 C   s   | j S r4   rQ   r,   r#   r#   r$   __str__   s   z_RemoteTraceback.__str__r4   )r8   r9   r:   rB   r.   rS   r#   r#   r#   r$   rP      s    
rP   c                   @   s   e Zd Zdd Zdd ZdS )_ExceptionWithTracebackc                 C   sN   t |dd }|d u rt \}}}tt|||}d|}|| _|| _d S )N__traceback__ )	getattrsysexc_info	tracebackformat_exceptiontyperI   excrR   )r-   r]   rR   rJ   r#   r#   r$   r.      s   

z _ExceptionWithTraceback.__init__c                 C   s   t | j| jffS r4   )_rebuild_excr]   rR   r,   r#   r#   r$   
__reduce__   s   z"_ExceptionWithTraceback.__reduce__N)r8   r9   r:   r.   r_   r#   r#   r#   r$   rT      s    	rT   c                 C   s   t || _| S r4   )rP   	__cause__)r]   rR   r#   r#   r$   r^      s   
r^   c                   @   s   e Zd Zg dZdd ZdS )	_WorkItemfuturefnargskwargsc                 C   s   || _ || _|| _|| _d S r4   rb   )r-   rc   rd   re   rf   r#   r#   r$   r.   	  s   
z_WorkItem.__init__N)r8   r9   r:   	__slots__r.   r#   r#   r#   r$   ra     s    ra   c                   @   s   e Zd ZdddZdS )_ResultItemNc                 C   s   || _ || _|| _d S r4   )work_id	exceptionresult)r-   ri   rj   rk   r#   r#   r$   r.     s   
z_ResultItem.__init__NN)r8   r9   r:   r.   r#   r#   r#   r$   rh     s    rh   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )	_CallItemc                 C   s$   || _ || _|| _|| _t | _d S r4   )ri   rd   re   rf   r   loky_pickler)r-   ri   rd   re   rf   r#   r#   r$   r.     s
   z_CallItem.__init__c                 C   s   t | j | j| ji | jS r4   )r   rn   rd   re   rf   r,   r#   r#   r$   __call__!  s   
z_CallItem.__call__c              	   C   s&   d| j  d| j d| j d| j d	S )Nz	CallItem(z, ))ri   rd   re   rf   r,   r#   r#   r$   __repr__%  s   $z_CallItem.__repr__N)r8   r9   r:   r.   ro   rq   r#   r#   r#   r$   rm     s    	rm   c                       s:   e Zd ZdZ						d fdd	Z fddZ  ZS )	
_SafeQueuez=Safe Queue set exception to the future object linked to a jobr   Nc                    s(   || _ || _|| _t j|||d d S )Nreducersctx)rK   pending_work_itemsrunning_work_itemssuperr.   )r-   max_sizeru   rv   rw   rK   rt   	__class__r#   r$   r.   .  s   	z_SafeQueue.__init__c                    s   t |trJt |tjrtd}ntd}tt||t	|dd }t
d||_| j|jd }| j|j |d urC|j| ~| j  d S t || d S )NzNThe task could not be sent to the workers as it is too large for `send_bytes`.z4Could not pickle the task to send it to the workers.rU   rV   )
isinstancerm   structerrorRuntimeErrorr   rZ   r[   r\   rW   rP   rI   r`   rv   popri   rw   removerc   set_exceptionrK   r3   rx   _on_queue_feeder_error)r-   erN   raised_errorrR   	work_itemrz   r#   r$   r   <  s&   
z!_SafeQueue._on_queue_feeder_error)r   NNNNN)r8   r9   r:   rB   r.   r   __classcell__r#   r#   rz   r$   rr   +  s    rr   c                 g   s,    t | }	 tt|| }|sdS |V  q)z*Iterates over zip()ed iterables in chunks.TN)ziptuple	itertoolsislice)	chunksize	iterablesitchunkr#   r#   r$   _get_chunksY  s   r   c                    s    fdd|D S )zProcesses a chunk of an iterable passed to map.

    Runs the function passed to map() on a chunk of the
    iterable passed to map.

    This function is run in a separate process.

    c                    s   g | ]} | qS r#   r#   ).0re   rd   r#   r$   
<listcomp>l  s    z"_process_chunk.<locals>.<listcomp>r#   )rd   r   r#   r   r$   _process_chunkc  s   	r   c              
   C   s\   z|  t|||d W dS  ty- } zt|}|  t||d W Y d}~dS d}~ww )z.Safely send back the given result or exception)rk   rj   rj   N)putrh   BaseExceptionrT   )result_queueri   rk   rj   r   r]   r#   r#   r$   _sendback_resulto  s   
 r   c                 C   s  |durz||  W n t y   tjddd Y dS w |ad}d}	t }
tjd|  	 z| j	d|d}|du rBtj
d W nX tjym   tj
d|d	d
 |jddrc|  d}ntj
d Y q/Y n/ t y   t }z	|t| W n t y   t| Y nw tjd td Y nw |du r||
 |jddd}t  |rtjd dS tj
d dS z| }W n  t y } zt|}|t|j|d W Y d}~nd}~ww t||j|d ~~trR|du rt|
dd}t }	q/t |	 tkrQt|
}t }	|| tk rq/t|
dd}t }	|| tk r+q/tj
d ||
 | tjd 	 W d   dS 1 sLw   Y  n|	du s_t |	 tkrft !  t }	q0)ap  Evaluates calls from call_queue and places the results in result_queue.

    This worker is run in a separate process.

    Args:
        call_queue: A ctx.Queue of _CallItems that will be read and
            evaluated by the worker.
        result_queue: A ctx.Queue of _ResultItems that will written
            to by the worker.
        initializer: A callable initializer, or None
        initargs: A tuple of args for the initializer
        processes_management_lock: A ctx.Lock avoiding worker timeout while
            some workers are being spawned.
        timeout: maximum time to wait for a new item in the call_queue. If that
            time is expired, the worker will shutdown.
        worker_exit_lock: Lock to avoid flagging the executor as broken on
            workers timeout.
        current_depth: Nested parallelism level, to avoid infinite spawning.
    NzException in initializer:T)rY   zWorker started with timeout=)blocktimeoutz Shutting down worker on sentinelz#Shutting down worker after timeout z0.3fsFr   z+Could not acquire processes_management_lockzExiting with code 1r
      r   zExited cleanlyz(Main process did not release worker_exitr   )rk   )r!   z*Memory leak detected: shutting down workerzExit due to memory leak)"r   r   critical_CURRENT_DEPTHosgetpidr   r   r   getinfoqueueEmptyacquirereleaserZ   
format_excr   rP   printrX   exitrM   rT   rh   ri   r   _USE_PSUTILr%   r   _MEMORY_LEAK_CHECK_DELAY_MAX_MEMORY_LEAK_SIZEr   r   )
call_queuer   initializerinitargsprocesses_management_lockr   worker_exit_lockcurrent_depth_process_reference_size_last_memory_leak_checkr    	call_itemprevious_tbis_cleanrr   r]   	mem_usager#   r#   r$   _process_workerz  s   


 

"
r   c                       sz   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd ZdddZdd Zdd Zdd Z  ZS )_ExecutorManagerThreadat  Manages the communication between this process and the worker processes.

    The manager is run in a local thread.

    Args:
        executor: A reference to the ProcessPoolExecutor that owns
            this thread. A weakref will be own by the manager as well as
            references to internal objects used to introspect the state of
            the executor.
    c                    s   |j | _|j| _| j| jfdd}t||| _|j| _|j	| _
|j| _|j| _|j| _|j| _|j| _|j| _t jdd tjdk rId| _d S d S )Nc                 S   sF   t d ur
t jd | |  W d    d S 1 sw   Y  d S )Nz?Executor collected: triggering callback for QueueManager wakeup)r   r   r   r3   )rJ   rK   r?   r#   r#   r$   
weakref_cb  s   
"z3_ExecutorManagerThread.__init__.<locals>.weakref_cbExecutorManagerThreadname   	   T)_executor_manager_thread_wakeuprK   _shutdown_lockr?   weakrefrefexecutor_reference_flagsexecutor_flags
_processes	processes_call_queuer   _result_queuer   	_work_idswork_ids_queue_pending_work_itemsrv   _running_work_itemsrw   _processes_management_lockr   rx   r.   rX   version_infodaemon)r-   executorr   rz   r#   r$   r.     s$   



z_ExecutorManagerThread.__init__c                 C   sb   	 |    |  \}}}|r| | d S |d ur| | ~|  r0|   | js0|   d S qr4   )add_call_item_to_queuewait_result_broken_or_wakeupterminate_brokenprocess_result_itemis_shutting_downflag_executor_shutting_downrv   join_executor_internals)r-   result_item	is_brokenbper#   r#   r$   runN  s   

z_ExecutorManagerThread.runc                 C   s   	 | j  rd S z	| jjdd}W n tjy   Y d S w | j| }|j r@|  j	|g7  _	| j j
t||j|j|jdd n| j|= q q)NTFr   )r   fullr   r   r   r   rv   rc   set_running_or_notify_cancelrw   r   rm   rd   re   rf   )r-   ri   r   r#   r#   r$   r   h  s.   



z-_ExecutorManagerThread.add_call_item_to_queuec                    sL  | j j}| jj}||g}dd t| j D }t||  d }d}d }| v riz| }t|t	r:t
d}||_nd}W n^ tyh } zt
d}tt||t|dd }	t	d|	|_W Y d }~n8d }~ww | v rpd}n,d}
tjd	krd
t| j }
tjdt fddt| j D   td|
 }| j  |||fS )Nc                 S   s   g | ]}|j qS r#   )sentinelr   pr#   r#   r$   r     s    zG_ExecutorManagerThread.wait_result_broken_or_wakeup.<locals>.<listcomp>TzfA task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.FzrA result has failed to un-serialize. Please ensure that the objects returned by the function are always picklable.rU   rV   win32z#
The exit codes of the workers are zOA worker unexpectedly terminated. Workers that might have caused the breakage: c                    s(   i | ]}|d ur|j  v r|j|jqS r4   )r   r   exitcoder   readyr#   r$   
<dictcomp>  s
    zG_ExecutorManagerThread.wait_result_broken_or_wakeup.<locals>.<dictcomp>zA worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.
)r   r*   rK   rD   r   valuesr	   recvr|   rP   r   r`   r   rZ   r[   r\   rW   rI   rX   platformr   r   r   r   strTerminatedWorkerErrorr7   )r-   result_readerwakeup_readerreadersworker_sentinelsr   r   r   r   rR   
exit_codesr#   r   r$   r     sh   




z3_ExecutorManagerThread.wait_result_broken_or_wakeupc                 C   sf  t |tr| j | j|d }W d    n1 sw   Y  |d ur<|j  tj	d|j
 d|j d |  ~t| j}t| j}|| dksS|t| jkr|  }|d urt| j|jk rtdt |j |  W d    n1 s{w   Y  d }d S d S d S d S | j|jd }|d ur|jr|j|j n|j|j | j|j d S d S )Njoining z when processing z as result_itemr   zA worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.)r|   intr   r   r   _worker_exit_lockr   r   r   r   r   r    rI   rG   rv   rw   r   _max_workerswarningswarnUserWarningr   _adjust_process_countri   rj   rc   r   
set_resultrk   r   )r-   r   r   	n_pending	n_runningr   r   r#   r#   r$   r     sF   




z*_ExecutorManagerThread.process_result_itemc                 C   s&   |   }tp|d u s| jjo| jj S r4   )r   rC   r   r<   r=   )r-   r   r#   r#   r$   r     s
   z'_ExecutorManagerThread.is_shutting_downc                 C   sL   | j | | j D ]	}|j| ~q| j  | jdd |   d S )Nzbroken executorreason)	r   rA   rv   r   rc   r   r7   r>   r   )r-   r   r   r#   r#   r$   r     s   
z'_ExecutorManagerThread.terminate_brokenc                 C   sR   | j   | j jr'| jr| j \}}|jtd ~| js| jdd d S d S )NzRThe Executor was shutdown with `kill_workers=True` before this job could complete.zexecutor shutting downr  )r   r@   r>   rv   popitemrc   r   ShutdownExecutorError)r-   rJ   r   r#   r#   r$   r   (  s   
z2_ExecutorManagerThread.flag_executor_shutting_downrV   c                 C   s\   | j r,| j  \}}tjd|j d|  zt| W n	 ty&   Y nw | j sd S d S )Nzterminate process z
, reason: )r   r  r   r   r   r   r   ProcessLookupError)r-   r  rJ   r   r#   r#   r$   r>   <  s   z#_ExecutorManagerThread.kill_workersc                 C   s^  | j ' d}t| j D ]}tjd|j  |j	  |d7 }qW d    n1 s-w   Y  tjd| d d}d}||k r| 
 dkrt|| D ]H}z| jd  |d7 }W qP tjy } z-|dkrtjd| jj d	| j  d
 |tjd t| |d9 }W Y d }~ nd }~ww ||k r| 
 dksJtjd| d d S )Nr   zreleasing worker exit lock on r
   zfound z processes to stopgMbP?g      @zBfailed to send all sentinels and exit with error.
call_queue size=z;  full is z; zCfull call_queue prevented to send all sentinels at once, waiting...g333333?zsent z sentinels to the call queue)r   rD   r   r   r   r   r   r   r   r   get_n_children_aliveranger   
put_nowaitr   Fullr   _maxsizer   r   )r-   n_children_to_stopr   n_sentinels_sentcooldown_timerJ   r   r#   r#   r$   shutdown_workersH  sL   

z'_ExecutorManagerThread.shutdown_workersc              	   C   s,  |    tjd | j  | j  tjd | j  tjd | j | j	  W d    n1 s8w   Y  | j
L tjdt| j d d}	 z| j \}}tjd|j d	|  |  |d
7 }W n	 tyx   Y nw qQtjd| d W d    d S 1 sw   Y  d S )Nzclosing call_queuezclosing result_queuezclosing thread_wakeupr   z
 processesr   Tzjoining process z
 with pid r
   z-executor management thread clean shutdown of z workers)r  r   r   r   r   r0   join_threadr   r?   rK   r   rG   r   r  r   rI   KeyError)r-   n_joined_processesr    r   r#   r#   r$   r   w  s:   


	"z._ExecutorManagerThread.join_executor_internalsc                 C   sF   | j  tdd t| j D W  d    S 1 sw   Y  d S )Nc                 s   s    | ]}|  V  qd S r4   )is_aliver   r#   r#   r$   	<genexpr>  s    z>_ExecutorManagerThread.get_n_children_alive.<locals>.<genexpr>)r   sumrD   r   r   r,   r#   r#   r$   r    s   $z+_ExecutorManagerThread.get_n_children_alive)rV   )r8   r9   r:   rB   r.   r   r   r   r   r   r   r   r>   r  r   r  r   r#   r#   rz   r$   r     s    @L4
/'r   c               	   C   sh   t rtrttda ztd} W n ttfy   Y d S w | dkr$d S | dkr*d S d|  datt)NTSC_SEM_NSEMS_MAX   z$system provides too few semaphores (z available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorr   sysconfAttributeError
ValueError)	nsems_maxr#   r#   r$   _check_system_limits  s   
r!  c                 c   s*    | D ]}|   |r| V  |sqdS )z
    Specialized implementation of itertools.chain.from_iterable.
    Each item in *iterable* should be a list.  This function is
    careful not to keep references to yielded objects.
    N)reverser   )iterableelementr#   r#   r$   _chain_from_iterable_of_lists  s   
r%  c                 C   sH   |   dkrtdkrtddtk r td tkr"tdt dd S d S )Nforkr   zCould not spawn extra nested processes at depth superior to MAX_DEPTH=1. It is not possible to increase this limit when using the 'fork' start method.r
   zFCould not spawn extra nested processes at depth superior to MAX_DEPTH=z_. If this is intendend, you can change this limit with the LOKY_MAX_DEPTH environment variable.)get_start_methodr   LokyRecursionError	MAX_DEPTH)contextr#   r#   r$   _check_max_depth  s   r+  c                   @      e Zd ZdZdS )r(  z=A process tries to spawn too many levels of nested processes.Nr8   r9   r:   rB   r#   r#   r#   r$   r(        r(  c                   @   r,  )r   a2  
    Raised when the executor is broken while a future was in the running state.
    The cause can an error raised when unpickling the task in the worker
    process or when unpickling the result value in the parent process. It can
    also be caused by a worker process being terminated unexpectedly.
    Nr-  r#   r#   r#   r$   r     r.  r   c                   @   r,  )r   zy
    Raised when a process in a ProcessPoolExecutor terminated abruptly
    while a future was in the running state.
    Nr-  r#   r#   r#   r$   r     r.  r   c                   @   r,  )r  zo
    Raised when a ProcessPoolExecutor is shutdown while a future was in the
    running or pending state.
    Nr-  r#   r#   r#   r$   r    s    r  c                       s   e Zd ZdZ								dddZdddZdd Zd	d
 Zdd Zdd Z	e
j	je	_ fddZdddZe
jje_  ZS )ProcessPoolExecutorNr#   c	           	      C   s  t   |du rt | _n|dkrtd|| _tjdkr-| jtkr-tdt d t| _|du r4t	 }|| _
|| _t||\| _| _t| j
 |du rN|}|| _d| _i | _i | _d| _i | _g | _t | _| j
 | _d| _t | _t | _t| j| _ | !|| t"j#$d dS )a  Initializes a new ProcessPoolExecutor instance.

        Args:
            max_workers: int, optional (default: cpu_count())
                The maximum number of processes that can be used to execute the
                given calls. If None or not given then as many worker processes
                will be created as the number of CPUs the current process
                can use.
            job_reducers, result_reducers: dict(type: reducer_func)
                Custom reducer for pickling the jobs and the results from the
                Executor. If only `job_reducers` is provided, `result_reducer`
                will use the same reducers
            timeout: int, optional (default: None)
                Idle workers exit after timeout seconds. If a new job is
                submitted after the timeout, the executor will start enough
                new Python processes to make sure the pool of workers is full.
            context: A multiprocessing context to launch the workers. This
                object should provide SimpleQueue, Queue and Process.
            initializer: An callable used to initialize worker processes.
            initargs: A tuple of arguments to pass to the initializer.
            env: A dict of environment variable to overwrite in the child
                process. The environment variables are set before any module is
                loaded. Note that this only works with the loky context.
        Nr   z"max_workers must be greater than 0r   z&On Windows, max_workers cannot exceed z, due to limitations of the operating system.zProcessPoolExecutor is setup)%r!  r   r   r  rX   r   r   r   r   r   _context_envr   _initializer	_initargsr+  _timeout_executor_manager_threadr   _queue_countr   r   r   r   r   Lockr   	threadingr   r&   r   r;   r   _setup_queuesr   r   r   )	r-   max_workersjob_reducersresult_reducersr   r*  r   r   envr#   r#   r$   r.     sJ   #







zProcessPoolExecutor.__init__c                 C   sP   |d u rd| j  t }t|| j| j| j|| jd| _d| j_t	|| jd| _
d S )N   )ry   rv   rw   rK   rt   ru   Trs   )r   EXTRA_QUEUED_CALLSrr   r   r   r   r0  r   _ignore_epiper   r   )r-   r;  r<  
queue_sizer#   r#   r$   r9  m  s   z!ProcessPoolExecutor._setup_queuesc                 C   sz   | j d u r9tjd t| | _ | j   | j| jft| j < t	d u r;t
jdk r2tjjd tdda	d S tta	d S d S d S )Nz%_start_executor_manager_thread calledr      )exitpriority)r5  r   r   r   r   startr   r   rE   process_pool_executor_at_exitrX   r   FinalizerM   r8  _register_atexitr,   r#   r#   r$   _start_executor_manager_thread  s"   





z2ProcessPoolExecutor._start_executor_manager_threadc              	   C   s   t | j| jk rW| jd}| j| j| j| j| j	| j
|td f}|  z| jjt|| jd}W n tyA   | jjt|d}Y nw ||_|  || j|j< t | j| jk stjd| j ddd | j D   d S )Nr
   )targetre   r=  )rI  re   zAdjusted process count to z: c                 S   s   g | ]	\}}|j |fqS r#   r   )r   r    r   r#   r#   r$   r     s    z=ProcessPoolExecutor._adjust_process_count.<locals>.<listcomp>)rG   r   r   r0  BoundedSemaphorer   r   r2  r3  r   r4  r   r   r   r   r1  	TypeErrorr   rD  r    r   r   r   rF   )r-   r   re   r   r#   r#   r$   r     s8   


z)ProcessPoolExecutor._adjust_process_countc                 C   sL   | j  t| j| jkr|   |   W d   dS 1 sw   Y  dS )z5ensures all workers and management thread are runningN)r   rG   r   r   r   rH  r,   r#   r#   r$   _ensure_executor_running  s
   
"z,ProcessPoolExecutor._ensure_executor_runningc                 O   s   | j jI | j jd ur| j j| j jrtdtrtdt }t||||}|| j	| j
< | j| j
 |  j
d7  _
| j  |   |W  d    S 1 sPw   Y  d S )Nz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdownr
   )r   r?   r=   r<   r  rC   r   r   ra   r   r6  r   r   r   r3   rL  )r-   rd   re   rf   fwr#   r#   r$   submit  s(   

$zProcessPoolExecutor.submitc                    sT   | dd}| dd}|dk rtdt jtt|t|g|R  |d}t|S )az  Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: If greater than one, the iterables will be chopped into
                chunks of size chunksize and submitted to the process pool.
                If set to one, the items in the list will be sent one at a
                time.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        r   Nr   r
   zchunksize must be >= 1.r   )r   r  rx   mapr   r   r   r%  )r-   rd   r   rf   r   r   resultsrz   r#   r$   rP    s   zProcessPoolExecutor.mapTFc                 C   s   t jd|   | j| | j}| j}|d ur1| j | j  W d    n1 s,w   Y  |d urS|rSt	 |
  t|d  W d    n1 sNw   Y  d | _d | _d | _d | _d | _d S )Nzshutting down executor )r   r   r   r   r@   r5  r   r   r3   rH   rI   rE   r   r   r   r   )r-   r	   r>   executor_manager_threadexecutor_manager_thread_wakeupr#   r#   r$   r<     s$   
zProcessPoolExecutor.shutdown)NNNNNNr#   Nr4   )TF)r8   r9   r:   _at_exitr.   r9  rH  r   rL  rO  r   rB   rP  r<   r   r#   r#   rz   r$   r/     s(    

i

"r/  )Frl   )`rB   
__author__r   r   rX   r   r}   r   r   r   rZ   r8  r   r   multiprocessingr   	functoolsr   pickler   concurrent.futuresr   concurrent.futures._baser   concurrent.futures.processr   _BPPExceptionmultiprocessing.connectionr	   _baser   backendr   backend.contextr   r   backend.queuesr   r   backend.reductionr   r   backend.utilsr   r   initializersr   r   environr   r)  r   r   r   psutilr   r   r%   ImportErrorr&   r;   rC   r7  rH   WeakKeyDictionaryrE   rM   r   register_after_forkrE  r?  	ExceptionrP   rT   r^   ra   rh   rm   rr   r   r   r   r   Threadr   r  r  r!  r%  r+  r   r(  r   BrokenExecutorr  r/  r#   r#   r#   r$   <module>   s   +).

 	   %		