a dh+@s`ddgZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z m Z ddlmZdZd Zd Zd ZeZd d ZddZGdddeZGdddZddZGdddeZd*ddZddZGdddeZ Gd dde!Z"Gd!d"d"e!Z#e#Z$Gd#d$d$e#Z%Gd%d&d&e!Z&Gd'd(d(e&Z'Gd)dde"Z(dS)+Pool ThreadPoolN)util) get_context TimeoutError)waitINITRUNCLOSE TERMINATEcCs tt|SN)listmapargsr5/opt/imh-python/lib/python3.9/multiprocessing/pool.pymapstar/srcCstt|d|dS)Nrr)r itertoolsstarmaprrrr starmapstar2src@seZdZddZddZdS)RemoteTracebackcCs ||_dSr tb)selfrrrr__init__:szRemoteTraceback.__init__cCs|jSr rrrrr__str__<szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrrrrrr9src@seZdZddZddZdS)ExceptionWithTracebackcCs0tt|||}d|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr(rrrrr@s zExceptionWithTraceback.__init__cCst|j|jffSr ) rebuild_excr(rrrrr __reduce__Esz!ExceptionWithTraceback.__reduce__N)rr r!rr*rrrrr"?sr"cCst||_|Sr )r __cause__)r(rrrrr)Hs r)cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt||j|jdSr )reprr(valuesuperr,r)rr(r. __class__rrrTs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r.r(rrrrrYszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r1rrrrr__repr__]szMaybeEncodingError.__repr__)rr r!__doc__rrr2 __classcell__rrr0rr,Ps r,rFc Cs|dur(t|tr|dks(td||j}|j}t|drR|j|j |durb||d}|dus~|r||krz |} Wn&t t fyt dYqYn0| durt dq| \} } } } }zd| | i|f}WnJty4}z0|r| turt||j}d|f}WYd}~n d}~00z|| | |fWnTty}z:t||d}t d ||| | d|ffWYd}~n d}~00d} } }} } }|d7}qft d |dS) NrzMaxtasks {!r} is not valid_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFz0Possible encoding error while sending result: %szworker exiting after %d tasks) isinstanceintAssertionErrorformatputgethasattrr5close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr" __traceback__r,)inqueueoutqueue initializerinitargsZmaxtaskswrap_exceptionr:r;Z completedtaskjobifuncrkwdsresultewrappedrrrworkerasN        ( rRcCs|dS)z@Pickle-able helper function for use by _guarded_task_generation.Nr)exrrrrCsrCcs2eZdZdZddfdd ZfddZZS) _PoolCachez Class that implements a cache for the Pool class that will notify the pool management threads every time the cache is emptied. The notification is done by the use of a queue that is provided when instantiating the cache. Nnotifiercs||_tj|i|dSr )rVr/r)rrVrrNr0rrrsz_PoolCache.__init__cs t||s|jddSr )r/ __delitem__rVr:)ritemr0rrrWs z_PoolCache.__delitem__)rr r!r3rrWr4rrr0rrTsrTc@seZdZdZdZeddZdLddZej e fd d Z d d Z d dZ eddZeddZddZeddZeddZddZddZdifddZdMdd ZdNd!d"ZdOd#d$Zd%d&ZdPd(d)ZdQd*d+Zdiddfd,d-ZdRd.d/ZdSd0d1ZedTd2d3Ze d4d5Z!ed6d7Z"ed8d9Z#ed:d;Z$dd?Z&d@dAZ'dBdCZ(edDdEZ)e dFdGZ*dHdIZ+dJdKZ,dS)UrzS Class which supports an async version of applying functions to arguments. TcOs|j|i|Sr Process)ctxrrNrrrrZsz Pool.ProcessNrcCsg|_t|_|pt|_|t|_|j|_ t |j d|_ ||_ ||_ ||_|durjtphd}|dkrztd|durt|std||_z |WnFty|jD]}|jdur|q|jD] }|qւYn0|}tjtj|j |j|j|j|j|j|j |j!|j |j|j |j"||j fd|_#d|j#_$t%|j#_|j#&tjtj'|j|j(|j!|j|j fd|_)d|j)_$t%|j)_|j)&tjtj*|j!|j+|j fd|_,d|j,_$t%|j,_|j,&t-j.||j/|j|j |j!|j|j |j#|j)|j,|j f dd|_0t%|_dS) NrUrz&Number of processes must be at least 1zinitializer must be a callabletargetrT)r exitpriority)1_poolr _stater_ctx _setup_queuesqueue SimpleQueue _taskqueue_change_notifierrT_cache_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_repopulate_poolrBexitcode terminater'_get_sentinels threadingThreadr_handle_workersrZ_inqueue _outqueue_wrap_exception_worker_handlerdaemonr start _handle_tasks _quick_put _task_handler_handle_results _quick_get_result_handlerrFinalize_terminate_pool _terminate)r processesrGrHmaxtasksperchildcontextp sentinelsrrrrs~                  z Pool.__init__cCs>|j|kr:|d|t|dt|dddur:|jddS)Nz&unclosed running multiprocessing pool )sourcerg)raResourceWarninggetattrrgr:)r_warnr rrr__del__s   z Pool.__del__c Cs0|j}d|jd|jd|jdt|jd S)N<.z state=z pool_size=>)r1r r!ralenr`)rclsrrrr2s z Pool.__repr__cCs |jjg}|jjg}g||Sr )rzr>rg)rZtask_queue_sentinelsZself_notifier_sentinelsrrrrus  zPool._get_sentinelscCsdd|DS)NcSsg|]}t|dr|jqS)sentinel)r<r).0rRrrr s z.Pool._get_worker_sentinels..rworkersrrr_get_worker_sentinelsszPool._get_worker_sentinelscCsPd}ttt|D]6}||}|jdurtd||d}||=q|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNcleaning up worker %dT)reversedrangerrsrrAr')poolZcleanedrLrRrrr_join_exited_workerss zPool._join_exited_workersc Cs0||j|j|j|j|j|j|j|j|j |j Sr ) _repopulate_pool_staticrbrZrqr`ryrzrjrkrir{rrrrrr.s zPool._repopulate_poolc Csft|t|D]P} ||t|||||| fd} | jdd| _d| _| || t dqdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. r\rZZ PoolWorkerTz added workerN) rrrRnamereplacer}r~appendrrA) r[rZrrrErFrGrHrrIrLwrrrr7s zPool._repopulate_pool_staticc Cs*t|r&t|||||||||| dS)zEClean up any exited workers and start replacements for them. N)rrr) r[rZrrrErFrGrHrrIrrr_maintain_poolJs   zPool._maintain_poolcCs4|j|_|j|_|jjj|_|jjj|_ dSr ) rbreryrzr5sendrr>recvrrrrrrcVs   zPool._setup_queuescCs|jtkrtddS)NzPool not running)rar rnrrrr_check_running\s zPool._check_runningcCs||||S)zT Equivalent of `func(*args, **kwds)`. Pool must be running. ) apply_asyncr;)rrMrrNrrrapply`sz Pool.applycCs|||t|S)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr;rrMiterable chunksizerrrrgszPool.mapcCs|||t|S)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )rrr;rrrrrnsz Pool.starmapcCs|||t|||S)z= Asynchronous version of `starmap()` method. )rrrrMrrcallbackerror_callbackrrr starmap_asyncvs zPool.starmap_asyncc cslz,d}t|D]\}}||||fifVqWn:tyf}z"||dt|fifVWYd}~n d}~00dS)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumeraterBrC)rZ result_jobrMrrLxrPrrr_guarded_task_generation~s zPool._guarded_task_generationrcCs||dkr:t|}|j||j|||jf|S|dkrPtd|t |||}t|}|j||jt ||jfdd|DSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. rzChunksize must be 1+, not {0:n}css|]}|D] }|Vq qdSr rrchunkrXrrr zPool.imap..N) r IMapIteratorrfr:r_job _set_lengthrnr9r _get_tasksrrrMrrrO task_batchesrrrimaps4z Pool.imapcCs||dkr:t|}|j||j|||jf|S|dkrPtd|t |||}t|}|j||jt ||jfdd|DSdS)zL Like `imap()` method but ordering of results is arbitrary. rzChunksize must be 1+, not {0!r}css|]}|D] }|Vq qdSr rrrrrrrz&Pool.imap_unordered..N) rIMapUnorderedIteratorrfr:rrrrnr9rrrrrrrimap_unordereds0zPool.imap_unorderedcCs6|t|||}|j|jd|||fgdf|S)z; Asynchronous version of `apply()` method. rN)r ApplyResultrfr:r)rrMrrNrrrOrrrrs zPool.apply_asynccCs|||t|||S)z9 Asynchronous version of `map()` method. )rrrrrr map_asyncszPool.map_asyncc Cs|t|dst|}|durJtt|t|jd\}}|rJ|d7}t|dkrZd}t|||}t||t|||d} |j | | j ||df| S)zY Helper function to implement map, starmap and their async counterparts. __len__Nrrr) rr<rdivmodrr`rr MapResultrfr:rr) rrMrZmapperrrrextrarrOrrrrs,  zPool._map_asynccCs"t||d|s|q dS)N)timeout)remptyr;)rchange_notifierrrrr_wait_for_updatess zPool._wait_for_updatesc Cstt}|jtks |r\|jtkr\|||||||| | | | g||| }|||q|dt ddS)Nzworker handler exiting) rvcurrent_threadrar r rrrr:rrA)rcache taskqueuer[rZrrrErFrGrHrrIrrthreadZcurrent_sentinelsrrrrxs zPool._handle_workersc Cst}t|jdD] \}}d}z|D]}|jtkrDtdqz ||Wq(ty} zH|dd\} } z||  | d| fWnt yYn0WYd} ~ q(d} ~ 00q(|rtd|r|dnd} || dWd}}} qWd}}} q,Wd}}} qd}}} 0qtdz6td| dtd |D]} |dqPWnt ytd Yn0td dS) Nz'task handler found thread._state != RUNFzdoing set_length()rrztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting) rvriterr;rar rrArB_setKeyErrorr:r@) rr:rFrrrZtaskseqZ set_lengthrJrPrKidxrrrrr sJ            zPool._handle_tasksc Cst}z |}Wn"ttfy4tdYdS0|jtkr^|jtksRJdtdq|durrtdq|\}}}z|| ||Wnt yYn0d}}}q|rJ|jtkrJz |}Wn"ttfytdYdS0|durtdq|\}}}z|| ||Wnt y:Yn0d}}}qt |drtdz,t dD]}|j sq|qjWnttfyYn0td t||jdS) Nz.result handler got EOFError/OSError -- exitingzThread not in TERMINATEz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr>z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)rvrr@r?rrArar r rrr<rr>pollr)rFr;rrrJrKrLobjrrrr:s\                 zPool._handle_resultsccs0t|}tt||}|s dS||fVqdSr )rtuplerislice)rMitsizerrrrrvs zPool._get_taskscCs tddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedErrorrrrrr*szPool.__reduce__cCs2td|jtkr.t|_t|j_|jddS)Nz closing pool)rrArar r r|rgr:rrrrr=s   z Pool.closecCstdt|_|dS)Nzterminating pool)rrAr rarrrrrrts zPool.terminatecCsjtd|jtkrtdn|jttfvr4td|j|j |j |j D] }|qXdS)Nz joining poolzPool is still runningzIn unknown state) rrArar rnr r r|r'rrr`)rrrrrr's       z Pool.joincCs@td|j|r<|jr<|jt dqdS)Nz7removing tasks from inqueue until task handler finishedr) rrA_rlockacquireis_aliver>rrtimesleep)rE task_handlerrrrr_help_stuff_finishs    zPool._help_stuff_finishc CsXtdt|_|dt|_td|||t||sXt| dkrXtdt|_|d|dtdt |ur| |rt |ddrtd|D]} | j dur| qtdt |ur| td t |ur| |rTt |ddrTtd |D](} | r*td | j| q*dS) Nzfinalizing poolz&helping task handler/workers to finishrz.Cannot have cache with result_hander not alivezjoining worker handlerrtzterminating workerszjoining task handlerzjoining result handlerzjoining pool workersr)rrAr rar:rrrr8rvrr'r<rsrtpid) rrrErFrrZworker_handlerrZresult_handlerrrrrrrsB               zPool._terminate_poolcCs ||Sr )rrrrr __enter__szPool.__enter__cCs |dSr )rt)rexc_typeexc_valexc_tbrrr__exit__sz Pool.__exit__)NNrNN)N)N)NNN)r)r)NNN)NNN)N)-rr r!r3r{ staticmethodrZrwarningswarnr rr2rurrrrrrrcrrrrrrrrrrrr classmethodrxrrrr*r=rtr'rrrrrrrrrsv  P                - ;    5c@sJeZdZddZddZddZddd Zdd d Zd d Ze e j Z dS)rcCs>||_t|_tt|_|j|_||_||_ ||j|j<dSr ) r`rvEvent_eventnext job_counterrrh _callback_error_callback)rrrrrrrrs  zApplyResult.__init__cCs |jSr )ris_setrrrrreadyszApplyResult.readycCs|std||jS)Nz{0!r} not ready)rrnr9_successrrrr successfulszApplyResult.successfulNcCs|j|dSr )rrrrrrrrszApplyResult.waitcCs,|||st|jr"|jS|jdSr )rrrr_valuerrrrr;s  zApplyResult.getcCsZ|\|_|_|jr$|jr$||j|jr<|js<||j|j|j|j=d|_dSr ) rrrrrsetrhrr`rrLrrrrrs       zApplyResult._set)N)N) rr r!rrrrr;rrtypes GenericAlias__class_getitem__rrrrrs    rc@seZdZddZddZdS)rcCshtj||||dd|_dg||_||_|dkrNd|_|j|j|j =n||t |||_dS)NrTr) rrrr _chunksize _number_leftrrrhrbool)rrrlengthrrrrrrs    zMapResult.__init__cCs|jd8_|\}}|rv|jrv||j||j|d|j<|jdkr|jrZ||j|j|j=|jd|_ nL|s|jrd|_||_|jdkr|j r| |j|j|j=|jd|_ dS)NrrF) rrrrrrhrrrr`r)rrLZsuccess_resultsuccessrOrrrr&s&          zMapResult._setN)rr r!rrrrrrrs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsT||_tt|_tt|_|j|_t |_ d|_ d|_ i|_||j|j<dS)Nr)r`rv ConditionLock_condrrrrh collectionsdeque_items_index_length _unsorted)rrrrrrDs  zIMapIterator.__init__cCs|Sr rrrrr__iter__OszIMapIterator.__iter__Nc Cs|jz|j}Wnvty|j|jkrrrrrruszThreadPool._get_sentinelscCsgSr rrrrrrsz ThreadPool._get_worker_sentinelscCsDz|jddqWntjy&Yn0t|D]}|dq0dS)NF)block)r;rdEmptyrr:)rErrrLrrrrs  zThreadPool._help_stuff_finishcCst|dSr )rr)rrrrrrrrszThreadPool._wait_for_updates)NNr) rr r!r{rrZrrcrurrrrrrrrs    )NrNF))__all__r rrlrdrvrr$rrr#rrr connectionrr r r r countrrrrBrr"r)r,rRrCdictrTobjectrrZ AsyncResultrrrrrrrr sN    -=++E