o
    h5x                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" errddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9m:Z: ddl;m<Z<m=Z=m>Z>m?Z?m@Z@ ddlAmBZB ddlCmDZDmEZEmFZFmGZG ddlHmIZImJZJ ddlKmLZL dZMG dd dZNdS )zIThe client-level bulk write operations interface.

.. versionadded:: 4.9
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyMappingOptionalTypeUnion)ObjectId)RawBSONDocument)_csotcommon)AsyncClientSession_validate_session_write_concern)AsyncCollection)AsyncCommandCursor)AsyncDatabase)_handle_reauth)AsyncMongoClient)AsyncConnection)_merge_command"_throw_client_bulk_write_exception)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)ConfigurationErrorConnectionFailureInvalidOperationNotPrimaryErrorOperationFailureWaitQueueTimeoutError)_RETRYABLE_ERROR_CODES)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_ClientBulkWriteContext_convert_client_bulk_exception_convert_exception_convert_write_result_randint)ReadPreference)ClientBulkWriteResultDeleteResultInsertOneResultUpdateResult)_DocumentOut	_Pipeline)WriteConcernFc                   @  s   e Zd ZdZ					dfdgddZedhddZdiddZ					djdkd,d-Z				dldmd/d0Z			dndod1d2Z
edpd?d@ZdqdBdCZdrdIdJZdsdLdMZdtdTdUZ	dudvdZd[Zdwd]d^Zdxd_d`ZdxdadbZdydddeZdS )z_AsyncClientBulkz4The private guts of the client-level bulk write API.TNFclientr   write_concernr2   orderedboolbypass_document_validationOptional[bool]commentOptional[str]letOptional[Any]verbose_resultsreturnNonec                 C  s   || _ || _|| _| jdurtd| j || _|| _|| _|| _g | _	g | _
d| _d| _d| _d| _d| _| j jj| _d| _d| _dS )z'Initialize a _AsyncClientBulk instance.Nr<   r   F)r4   r5   r<   r   r   r6   bypass_doc_valr:   r>   ops
namespaces
idx_offset	total_opsexecuteduses_collationuses_array_filtersoptionsZretry_writesis_retryableretryingstarted_retryable_write)selfr4   r5   r6   r8   r:   r<   r>    rN   S/var/www/html/venv/lib/python3.10/site-packages/pymongo/asynchronous/client_bulk.py__init__Y   s&   

z_AsyncClientBulk.__init__Type[_ClientBulkWriteContext]c                 C  s   t S N)r&   )rM   rN   rN   rO   bulk_ctx_classx   s   z_AsyncClientBulk.bulk_ctx_class	namespacestrdocumentr0   c                 C  s^   t d| t|tsd|v st |d< d|d}| jd|f | j| |  jd7  _dS )z*Add an insert document to the list of ops.rV   _id)insertrV   rY      N)r   
isinstancer   r   rB   appendrC   rE   )rM   rT   rV   cmdrN   rN   rO   
add_insert|   s   


z_AsyncClientBulk.add_insertselectorMapping[str, Any]update#Union[Mapping[str, Any], _Pipeline]multiupsert	collationOptional[Mapping[str, Any]]array_filters!Optional[list[Mapping[str, Any]]]hint Union[str, dict[str, Any], None]sortc
                 C  s   t | d|||d}
|dur||
d< |durd| _||
d< |dur&||
d< |dur1d| _||
d< |	dur9|	|
d	< |r>d
| _| jd|
f | j| |  jd7  _dS )z8Create an update document and add it to the list of ops.rX   ra   filterZ
updateModsrc   Nrd   TZarrayFiltersri   re   rk   Fra   rZ   )r   rH   rG   rJ   rB   r\   rC   rE   )rM   rT   r_   ra   rc   rd   re   rg   ri   rk   r]   rN   rN   rO   
add_update   s.   z_AsyncClientBulk.add_updatereplacementc           	      C  s   t | d||dd}|dur||d< |dur||d< |dur&d| _||d< |dur.||d	< | jd
|f | j| |  jd7  _dS )z8Create a replace document and add it to the list of ops.rX   Frl   Nrd   ri   Tre   rk   replacerZ   )r   rG   rB   r\   rC   rE   )	rM   rT   r_   ro   rd   re   ri   rk   r]   rN   rN   rO   add_replace   s$   z_AsyncClientBulk.add_replacec                 C  sj   d||d}|dur||d< |durd| _ ||d< |rd| _| jd|f | j| |  jd	7  _dS )
z7Create a delete document and add it to the list of ops.rX   )deleterm   rc   Nri   Tre   Frr   rZ   )rG   rJ   rB   r\   rC   rE   )rM   rT   r_   rc   re   ri   r]   rN   rN   rO   
add_delete   s   	z_AsyncClientBulk.add_deletebwcr&   r]   MutableMapping[str, Any]
request_idintmsgUnion[bytes, dict[str, Any]]op_docslist[Mapping[str, Any]]ns_docsdict[str, Any]c                   sX  ||d< ||d< t tjr6tt tj|jj|t	t
||j|||jj|jj|jjd |jjd |jjd |jrA||||| zZ|j|||jI dH }tj |j }	t tjrtt tj|jj|	|t	t
||j|||jj|jj|jjd |jjd |jjd |jr||||	 | j||jI dH  W |S  ty+ }
 ztj |j }	t|
tt fr|
j!}nt"|
}t tjrtt tj#|jj|	|t	t
||j|||jj|jj|jjd |jjd |jjt|
t d |jr|$|||	 d	|
i}t|
t r| j|
j!|jI dH  n| ji |jI dH  W Y d}
~
|S W Y d}
~
|S d}
~
ww )
zHA proxy for AsyncConnection.write_command that handles event publishing.rB   ZnsInfor   rZ   messageclientIdcommandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceIdNr   r   
durationMSreplyr   r   r   r   r   r   r   r   r   r   r   r   failurer   r   r   r   r   r   r   r   r   ZisServerSideErrorerror)%r#   isEnabledForloggingDEBUGr%   r$   STARTED_topology_settings_topology_idnextiterdb_nameconnidserver_connection_idaddress
service_idpublish_startwrite_commandcodecdatetimenow
start_time	SUCCEEDED_succeedr4   _process_responsesession	Exceptionr[   r   r    detailsr(   FAILED_fail)rM   rt   r]   rv   rx   rz   r|   r4   r   durationexcr   rN   rN   rO   r      s   





#


 
z_AsyncClientBulk.write_commandbytesc                   s6  t tjr.tt tj|jj|t	t
||j|||jj|jj|jjd |jjd |jjd |jr9|||||}za|j||jI dH }tj |j }	|durYt|j||}
n2ddi}
t tjrtt tj|jj|	|
t	t
||j|||jj|jj|jjd |jjd |jjd |jr|||
|	 W |
S W |
S  ty } zstj |j }	t|trt|j||j}nt|t r|j}nt!|}t tjrtt tj"|jj|	|t	t
||j|||jj|jj|jjd |jjd |jjt|td |jr|jdusJ |#|||	 d|i}
W Y d}~|
S d}~ww )	zFA proxy for AsyncConnection.unack_write that handles event publishing.r   rZ   r~   Nokr   r   r   )$r#   r   r   r   r%   r$   r   r   r   r   r   r   r   r   r   r   r   r   r   unack_writeZmax_bson_sizer   r   r   r)   namer   r   r   r[   r    r   r   r(   r   r   )rM   rt   r]   rv   rx   rz   r|   r4   resultr   r   r   r   rN   rN   rO   r   ?  s   





 !




z_AsyncClientBulk.unack_writerB   #list[tuple[str, Mapping[str, Any]]]rC   	list[str]7tuple[list[Mapping[str, Any]], list[Mapping[str, Any]]]c           	   	     s>   | |||\}}}}| ||||||| jI dH  ||fS )z6Executes a batch of bulkWrite server commands (unack).N)batch_commandr   r4   )	rM   rt   r]   rB   rC   rv   rx   to_send_ops
to_send_nsrN   rN   rO   _execute_batch_unack  s   z%_AsyncClientBulk._execute_batch_unackGtuple[dict[str, Any], list[Mapping[str, Any]], list[Mapping[str, Any]]]c           
   	     s@   | |||\}}}}| ||||||| jI dH }	|	||fS )z4Executes a batch of bulkWrite server commands (ack).N)r   r   r4   )
rM   rt   r]   rB   rC   rv   rx   r   r   r   rN   rN   rO   _execute_batch  s   

z_AsyncClientBulk._execute_batchfull_resultr   r   r   r   Optional[AsyncClientSession]c              
     sd  | drtt| jddd}t||d |j||du| jd}||I dH  zd|2 z]3 dH W }|d | j }| j	| \}	}
|d sQ|d	 
| | jrQ W dS |d r| jr|	d
krh|
d d }t|dd}|	dv rud}	t|ddd}|	dkrt|dd}|||	 d |< q+6 W dS  ty } z|jr| I dH  t||d< W Y d}~dS d}~ww dS )z?Internal helper for processing the server reply command cursor.cursoradminz$cmd.bulkWrite)Zdatabaser   N)r   Zexplicit_sessionr:   idxr   writeErrorsrY   rV   rW   T)acknowledged)ra   rp   ra   )r   Zin_client_bulkrr   ZResultsr   )getr   r   r4   r   r   r:   Z_maybe_pin_connectionrD   rB   r\   r6   r>   r.   r/   r-   r   alivecloser'   )rM   r   r   r   r   ZcollZ
cmd_cursordocZoriginal_indexZop_typeopZinserted_idresr   rN   rN   rO   _process_results_cursor  sT   

z(_AsyncClientBulk._process_results_cursorop_id	retryablefinal_write_concernOptional[WriteConcern]c              	     sT  d}d}	| j j}
|| j | | ||	|||
|| j j}| j| jk r| j| j |jkr1|p0|}ddi}| j |d< | j	|d< |oE|j
 }|sJ|sPt|| | jdurZ| j|d< | jrb| j|d< | jrj| j|d	< |r|rx| jsx|  d
| _|||tj| |||| j  || || j | t| j| jd}t| j| jd}|jrz| ||||I dH \}}}|}|dr|d }t|dot|j t!o|j ddt"v }t|t#ot|t$t%f }|r|s|rt&'|}t(| j| j|| t)|| j nt(| j| j|| t)|| j d|d< g |d< |ddt*|k r%d
|d< |d s9||d< t(| j| j|| dS |r_|di }|ddt"v r_t&'|}t(| j| j|| t)|| j | +||||I dH  t(| j| j|| d| _,d| _n| -||||I dH \}}|  jt*|7  _|d s| j	r|d rdS | j| jk s$dS dS )z<Internal helper for executing batches of bulkWrite commands.r   	bulkWriterZ   
errorsOnlyr6   NbypassDocumentValidationr:   r<   Tr   r   coder   r   ZnErrorsanySuccessfulr   ZwriteConcernErrorF).r4   _event_listenersZvalidate_sessionrS   codec_optionsrD   rE   Zmax_write_batch_sizer>   r6   Zin_transactionr   Zapply_write_concernrA   r:   r<   rL   Z_start_retryable_writeZ	_apply_tor+   ZPRIMARYZsend_cluster_timeadd_server_apiZapply_timeoutr   rB   rC   r   r   r   hasattrr[   r   dictr"   r   r   r!   copydeepcopyr   r   lenr   rK   r   )rM   r5   r   r   r   r   r   r   r   cmd_name	listenersrt   r]   Znot_in_transactionrB   rC   Z
raw_resultr   _r   r   Zretryable_top_level_errorZretryable_network_errorfullZwcerN   rN   rO   _execute_command  s   












z!_AsyncClientBulk._execute_command	operationc                   s   ddg g dddddi i i d t  d fdd}jjj|||dI dH   d s8 d s8 d r>t j  S )z'Execute commands with w=1 WriteConcern.FNr   )r   r   r   writeConcernErrorsZ	nInsertedZ	nUpsertedZnMatchedZ	nModifiedZnDeletedZinsertResultsZupdateResultsZdeleteResultsr   r   r   r   r   r7   r?   r@   c                   s4   |j dk r
tdj| || I d H  d S )N   <MongoClient.bulk_write requires MongoDB server version 8.0+.)max_wire_versionr   r   r5   )r   r   r   r   r   rM   rN   rO   retryable_bulk~  s   
z8_AsyncClientBulk.execute_command.<locals>.retryable_bulk)ZbulkZoperation_idr   r   r   )r   r   r   r   r   r7   r?   r@   )r*   r4   Z_retryable_writerJ   r   r>   )rM   r   r   r   rN   r   rO   execute_commandh  s6   	z _AsyncClientBulk.execute_commandc              	     s   d}d}| j j}t }| |||||d| j j}| j| jk r~ddi}d|d< d|d< | jdur5| j|d	< d
di|d< | jrC| j|d< | j	rK| j	|d< |
| t| j| jd}t| j| jd}	| ||||	I dH \}
}|  jt|
7  _| j| jk sdS dS )zDExecute commands with OP_MSG and w=0 writeConcern. Always unordered.r   r   NrZ   Tr   Fr6   r   wr   ZwriteConcernr:   r<   )r4   r   r*   rS   r   rD   rE   rA   r:   r<   r   r   rB   rC   r   r   )rM   r   r   r   r   r   rt   r]   rB   rC   r   r   rN   rN   rO   execute_command_unack  s>   





z&_AsyncClientBulk.execute_command_unackc                   s@   | j rtd| jrtd| jdurtd| |I dH S )z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.NzGCannot set bypass_document_validation with unacknowledged write concern)rG   r   rH   rA   r    r   )rM   r   rN   rN   rO   execute_no_results  s   
z#_AsyncClientBulk.execute_no_resultsr   c              	     s   | j std| jrtdd| _t|| j}| jjsW| j||I dH 4 I dH "}|jdk r4td| 	|I dH  t
dddW  d  I dH  S 1 I dH sRw   Y  | ||I dH }t
|| jj| jS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TNr   r   F)rB   r   rF   r   r5   r   r4   Z_conn_for_writesr   r   r,   r   r>   )rM   r   r   
connectionr   rN   rN   rO   execute  s,   

,z_AsyncClientBulk.execute)TNNNF)r4   r   r5   r2   r6   r7   r8   r9   r:   r;   r<   r=   r>   r7   r?   r@   )r?   rQ   )rT   rU   rV   r0   r?   r@   )NNNNN)rT   rU   r_   r`   ra   rb   rc   r7   rd   r9   re   rf   rg   rh   ri   rj   rk   rf   r?   r@   )NNNN)rT   rU   r_   r`   ro   r`   rd   r9   re   rf   ri   rj   rk   rf   r?   r@   )NN)rT   rU   r_   r`   rc   r7   re   rf   ri   rj   r?   r@   )rt   r&   r]   ru   rv   rw   rx   ry   rz   r{   r|   r{   r4   r   r?   r}   )rt   r&   r]   ru   rv   rw   rx   r   rz   r{   r|   r{   r4   r   r?   rf   )
rt   r&   r]   r}   rB   r   rC   r   r?   r   )
rt   r&   r]   r}   rB   r   rC   r   r?   r   )
r   ru   r   ru   r   r   r   r   r?   r@   rR   )r5   r2   r   r   r   r   r   rw   r   r7   r   ru   r   r   r?   r@   )r   r   r   rU   r?   ru   )r   r   r?   r@   )r   r   r   rU   r?   r   )__name__
__module____qualname____doc__rP   propertyrS   r^   rn   rq   rs   r   r   r   r   r   r   r   r   r   r   r   rN   rN   rN   rO   r3   V   sL    
,$
[
Y

; 

5
*r3   )Or   
__future__r   r   r   r   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   Zbson.objectidr   Zbson.raw_bsonr   Zpymongor   r   Z#pymongo.asynchronous.client_sessionr   r   Zpymongo.asynchronous.collectionr   Z#pymongo.asynchronous.command_cursorr   Zpymongo.asynchronous.databaser   Zpymongo.asynchronous.helpersr   Z!pymongo.asynchronous.mongo_clientr   Zpymongo.asynchronous.poolr   Zpymongo._client_bulk_sharedr   r   Zpymongo.commonr   r   r   Zpymongo.errorsr   r   r   r   r    r!   Zpymongo.helpers_sharedr"   Zpymongo.loggerr#   r$   r%   Zpymongo.messager&   r'   r(   r)   r*   Zpymongo.read_preferencesr+   Zpymongo.resultsr,   r-   r.   r/   Zpymongo.typingsr0   r1   Zpymongo.write_concernr2   Z_IS_SYNCr3   rN   rN   rN   rO   <module>   s>    	 