B
    indQ                 @   s$  U d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZ ddlmZ ddlmZmZmZ ddlmZ ddlmZ dZed	ZG d
d deZG dd dee ZG dd dZG dd dZG dd deZG dd deZe Zee e d< G dd dee Z!G dd de!e Z"dS )    N)	AwaitableCallableDequeGenericListOptionalTupleTypeVar   )BaseProtocol)BaseTimerContextset_exception
set_result)internal_logger)Final)EMPTY_PAYLOAD	EofStreamStreamReader	DataQueueFlowControlDataQueue_Tc               @   s   e Zd ZdZdS )r   zeof stream indication.N)__name__
__module____qualname____doc__ r   r   3/tmp/pip-unpacked-wheel-fdo6ttx3/aiohttp/streams.pyr      s   r   c               @   sD   e Zd Zeg ee f ddddZddddZedd	d
ZdS )AsyncStreamIteratorN)	read_funcreturnc             C   s
   || _ d S )N)r   )selfr   r   r   r   __init__   s    zAsyncStreamIterator.__init__zAsyncStreamIterator[_T])r   c             C   s   | S )Nr   )r    r   r   r   	__aiter__   s    zAsyncStreamIterator.__aiter__c                s<   y|   I d H }W n tk
r*   tY nX |dkr8t|S )N    )r   r   StopAsyncIteration)r    rvr   r   r   	__anext__!   s    
zAsyncStreamIterator.__anext__)	r   r   r   r   r   r   r!   r"   r&   r   r   r   r   r      s   r   c               @   s@   e Zd ZdddddZd dddZeeef dd	d
ZdS )ChunkTupleAsyncStreamIteratorr   N)streamr   c             C   s
   || _ d S )N)_stream)r    r(   r   r   r   r!   ,   s    z&ChunkTupleAsyncStreamIterator.__init__)r   c             C   s   | S )Nr   )r    r   r   r   r"   /   s    z'ChunkTupleAsyncStreamIterator.__aiter__c                s    | j  I d H }|dkrt|S )N)r#   F)r)   	readchunkr$   )r    r%   r   r   r   r&   2   s    z'ChunkTupleAsyncStreamIterator.__anext__)	r   r   r   r!   r"   r   bytesboolr&   r   r   r   r   r'   +   s   r'   c               @   sR   e Zd Zee dddZeee dddZee dddZe	dd	d
Z
dS )AsyncStreamReaderMixin)r   c             C   s
   t | jS )N)r   readline)r    r   r   r   r"   :   s    z AsyncStreamReaderMixin.__aiter__)nr   c                s   t  fddS )zzReturns an asynchronous iterator that yields chunks of size n.

        Python-3.5 available for Python 3.5+ only
        c                  s
     S )N)readr   )r/   r    r   r   <lambda>C   r#   z5AsyncStreamReaderMixin.iter_chunked.<locals>.<lambda>)r   )r    r/   r   )r/   r    r   iter_chunked=   s    z#AsyncStreamReaderMixin.iter_chunkedc             C   s
   t | jS )zoYield all available data as soon as it is received.

        Python-3.5 available for Python 3.5+ only
        )r   readany)r    r   r   r   iter_anyF   s    zAsyncStreamReaderMixin.iter_anyc             C   s   t | S )zYield chunks of data as they are received by the server.

        The yielded objects are tuples
        of (bytes, bool) as returned by the StreamReader.readchunk method.

        Python-3.5 available for Python 3.5+ only
        )r'   )r    r   r   r   iter_chunksM   s    z"AsyncStreamReaderMixin.iter_chunksN)r   r   r   r   r+   r"   intr2   r4   r'   r5   r   r   r   r   r-   9   s   	r-   c               @   s  e Zd ZdZdZdddeeee ee	j
 ddddZedd	d
Zeeef dddZee dddZeddddZeg df ddddZddddZedddZedddZddddZeddddZd@eedd d!d"Zddd#d$Zddd%d&Zedd'd(d)Zedd*d+Z dAeed-d.d/Z!dBeed1d2d3Z"edd4d5Z#eeef dd6d7Z$eed1d8d9Z%dCeed1d:d;Z&eed1d<d=Z'eed1d>d?Z(dS )Dr   a*  An enhancement of asyncio.StreamReader.

    Supports asynchronous iteration by line, chunk or as available::

        async for line in reader:
            ...
        async for chunk in reader.iter_chunked(1024):
            ...
        async for slice in reader.iter_any():
            ...

    r   N)timerloop)protocollimitr7   r8   r   c            C   sv   || _ || _|d | _|d kr&t }|| _d| _d| _d | _t	
 | _d| _d| _d | _d | _d | _|| _g | _d S )N   r   F)	_protocol
_low_water_high_waterasyncioZget_event_loop_loop_size_cursor_http_chunk_splitscollectionsdeque_buffer_buffer_offset_eof_waiter_eof_waiter
_exception_timer_eof_callbacks)r    r9   r:   r7   r8   r   r   r   r!   h   s"    

zStreamReader.__init__)r   c             C   s   | j jg}| jr |d| j  | jr0|d | jdkrP|d| j| jf  | jrf|d| j  | jr||d| j  dd	| S )	Nz%d byteseofi   zlow=%d high=%dzw=%rze=%rz<%s> )
	__class__r   rA   appendrH   r=   r>   rI   rK   join)r    infor   r   r   __repr__   s    


zStreamReader.__repr__c             C   s   | j | jfS )N)r=   r>   )r    r   r   r   get_read_buffer_limits   s    z#StreamReader.get_read_buffer_limitsc             C   s   | j S )N)rK   )r    r   r   r   	exception   s    zStreamReader.exception)excr   c             C   sP   || _ | j  | j}|d k	r.d | _t|| | j}|d k	rLd | _t|| d S )N)rK   rM   clearrI   r   rJ   )r    rW   waiterr   r   r   r      s    

zStreamReader.set_exception)callbackr   c             C   sB   | j r2y
|  W q> tk
r.   td Y q>X n| j| d S )NzException in eof callback)rH   	Exceptionr   rV   rM   rQ   )r    rZ   r   r   r   on_eof   s    
zStreamReader.on_eofc          	   C   s   d| _ | j}|d k	r$d | _t|d  | j}|d k	rBd | _t|d  x8| jD ].}y
|  W qJ tk
rv   td Y qJX qJW | j  d S )NTzException in eof callback)	rH   rI   r   rJ   rM   r[   r   rV   rX   )r    rY   cbr   r   r   feed_eof   s    


zStreamReader.feed_eofc             C   s   | j S )z&Return True if  'feed_eof' was called.)rH   )r    r   r   r   is_eof   s    zStreamReader.is_eofc             C   s   | j o| j S )z=Return True if the buffer is empty and 'feed_eof' was called.)rH   rF   )r    r   r   r   at_eof   s    zStreamReader.at_eofc                sB   | j r
d S | jd kst| j | _z| jI d H  W d d | _X d S )N)rH   rJ   AssertionErrorr@   create_future)r    r   r   r   wait_eof   s    zStreamReader.wait_eof)datar   c             C   sx   t jdtdd |sdS | jr>| jd | jd | jd< d| _|  jt|7  _|  jt|8  _| j| d| _	dS )zDrollback reading some data from stream, inserting it to buffer head.zJunread_data() is deprecated and will be removed in future releases (#3260)r;   )
stacklevelNr   )
warningswarnDeprecationWarningrG   rF   rA   lenrB   
appendleft_eof_counter)r    rd   r   r   r   unread_data   s    zStreamReader.unread_data)rd   sizer   c             C   s   | j rtd|sd S |  jt|7  _| j| |  jt|7  _| j}|d k	rdd | _t|d  | j| j	kr| j
js| j
  d S )Nzfeed_data after feed_eof)rH   ra   rA   ri   rF   rQ   total_bytesrI   r   r>   r<   _reading_pausedpause_reading)r    rd   rm   rY   r   r   r   	feed_data   s    
zStreamReader.feed_datac             C   s"   | j d kr| jrtdg | _ d S )Nz?Called begin_http_chunk_receiving whensome data was already fed)rC   rn   RuntimeError)r    r   r   r   begin_http_chunk_receiving   s
    
z'StreamReader.begin_http_chunk_receivingc             C   sd   | j d krtd| j r"| j d nd}| j|kr4d S | j | j | j}|d k	r`d | _t|d  d S )NzFCalled end_chunk_receiving without calling begin_chunk_receiving firstr   )rC   rr   rn   rQ   rI   r   )r    posrY   r   r   r   end_http_chunk_receiving  s    
	
z%StreamReader.end_http_chunk_receiving)	func_namer   c          	      sf   | j d k	rtd| | j  }| _ z2| jrL| j |I d H  W d Q R X n
|I d H  W d d | _ X d S )NzH%s() called while another coroutine is already waiting for incoming data)rI   rr   r@   rb   rL   )r    rw   rY   r   r   r   _wait!  s    
zStreamReader._waitc                s   |   I d H S )N)	readuntil)r    r   r   r   r.   6  s    zStreamReader.readline   
)	separatorr   c       	         s   t |}|dkrtd| jd k	r(| jd}d}d}x|rxn| jr|r| j}| jd ||d }| |rr|| nd}||7 }|t |7 }|rd}|| jkr<tdq<W | jrP |r6| 	d	I d H  q6W |S )
Nr   z,Separator should be at least one-byte stringr#   Tr
   rt   FzChunk too bigry   )
ri   
ValueErrorrK   rF   rG   find_read_nowait_chunkr>   rH   rx   )	r    r{   Zseplenchunk
chunk_sizeZ
not_enoughoffsetZicharrd   r   r   r   ry   9  s0    

zStreamReader.readuntilrt   )r/   r   c                s   | j d k	r| j | jrF| jsFt| ddd | _| jdkrFtjddd |sNdS |dk rg }x"|  I d H }|spP || q\W d	|S x | js| js| 
d	I d H  qW | |S )
Nrk   r   r
      zEMultiple access to StreamReader in eof state, might be infinite loop.T)
stack_infor#   r0   )rK   rH   rF   getattrrk   r   warningr3   rQ   rR   rx   _read_nowait)r    r/   blocksblockr   r   r   r0   [  s*    


zStreamReader.readc                s<   | j d k	r| j x | js0| js0| dI d H  qW | dS )Nr3   rt   )rK   rF   rH   rx   r   )r    r   r   r   r3     s
    
zStreamReader.readanyc                s   x| j dk	r| j xL| jr^| jd}|| jkr4dS || jkrR| || j dfS td qW | jrt| ddfS | j	r~dS | 
d	I dH  qW dS )
a  Returns a tuple of (data, end_of_http_chunk).

        When chunked transfer
        encoding is used, end_of_http_chunk is a boolean indicating if the end
        of the data corresponds to the end of a HTTP chunk , otherwise it is
        always False.
        Nr   )r#   TTzESkipping HTTP chunk end due to data consumption beyond chunk boundaryrt   F)r#   Fr*   )rK   rC   poprB   r   r   r   rF   r~   rH   rx   )r    ru   r   r   r   r*     s     



zStreamReader.readchunkc                st   | j d k	r| j g }xT|dkrh| |I d H }|sPd|}t|t|| || |t|8 }qW d|S )Nr   r#   )rK   r0   rR   r?   IncompleteReadErrorri   rQ   )r    r/   r   r   partialr   r   r   readexactly  s    



zStreamReader.readexactlyc             C   s2   | j d k	r| j | jr(| j s(td| |S )Nz9Called while some coroutine is waiting for incoming data.)rK   rI   donerr   r   )r    r/   r   r   r   read_nowait  s    
zStreamReader.read_nowaitc             C   s   | j d }| j}|dkrHt|| |krH||||  }|  j|7  _n,|rj| j   ||d  }d| _n
| j  }|  jt|8  _|  jt|7  _| j}x |r|d | jk r|d qW | j| jk r| j	j
r| j	  |S )Nr   rt   )rF   rG   ri   popleftrA   rB   rC   r   r=   r<   ro   resume_reading)r    r/   Zfirst_bufferr   rd   Zchunk_splitsr   r   r   r~     s$    



zStreamReader._read_nowait_chunkc             C   sT   g }x<| j r@| |}|| |dkr|t|8 }|dkrP qW |rPd|S dS )z6Read not more than n bytes, or whole buffer if n == -1rt   r   r#   )rF   r~   rQ   ri   rR   )r    r/   chunksr   r   r   r   r     s    

zStreamReader._read_nowait)r   )rz   )rt   )rt   ))r   r   r   r   rn   r   r6   r   r   r?   AbstractEventLoopr!   strrT   r   rU   BaseExceptionrV   r   r   r\   r^   r,   r_   r`   rc   r+   rl   rq   rs   rv   rx   r.   ry   r0   r3   r*   r   r   r~   r   r   r   r   r   r   X   s8   	"*"r   c               @   s  e Zd ZddddZee dddZedddd	Zeg df dd
ddZ	ddddZ
edddZedddZddddZd'eeddddZedddZd(eedddZeddd Zeeef dd!d"Zeedd#d$Zd)eedd%d&ZdS )*EmptyStreamReaderN)r   c             C   s   d S )Nr   )r    r   r   r   r!     s    zEmptyStreamReader.__init__c             C   s   d S )Nr   )r    r   r   r   rV     s    zEmptyStreamReader.exception)rW   r   c             C   s   d S )Nr   )r    rW   r   r   r   r     s    zEmptyStreamReader.set_exception)rZ   r   c             C   s.   y
|  W n t k
r(   td Y nX d S )NzException in eof callback)r[   r   rV   )r    rZ   r   r   r   r\     s    
zEmptyStreamReader.on_eofc             C   s   d S )Nr   )r    r   r   r   r^     s    zEmptyStreamReader.feed_eofc             C   s   dS )NTr   )r    r   r   r   r_     s    zEmptyStreamReader.is_eofc             C   s   dS )NTr   )r    r   r   r   r`     s    zEmptyStreamReader.at_eofc                s   d S )Nr   )r    r   r   r   rc     s    zEmptyStreamReader.wait_eofr   )rd   r/   r   c             C   s   d S )Nr   )r    rd   r/   r   r   r   rq     s    zEmptyStreamReader.feed_datac                s   dS )Nr#   r   )r    r   r   r   r.     s    zEmptyStreamReader.readlinert   )r/   r   c                s   dS )Nr#   r   )r    r/   r   r   r   r0     s    zEmptyStreamReader.readc                s   dS )Nr#   r   )r    r   r   r   r3   "  s    zEmptyStreamReader.readanyc                s   dS )N)r#   Tr   )r    r   r   r   r*   %  s    zEmptyStreamReader.readchunkc                s   t d|d S )Nr#   )r?   r   )r    r/   r   r   r   r   (  s    zEmptyStreamReader.readexactlyc             C   s   dS )Nr#   r   )r    r/   r   r   r   r   +  s    zEmptyStreamReader.read_nowait)r   )rt   )rt   )r   r   r   r!   r   r   rV   r   r   r\   r^   r,   r_   r`   rc   r+   r6   rq   r.   r0   r3   r   r*   r   r   r   r   r   r   r     s   r   r   c               @   s   e Zd ZdZejddddZedddZe	dd	d
Z
e	dddZee dddZeddddZdeeddddZddddZedddZee dddZdS )r   z>DataQueue is a general-purpose blocking queue with one reader.N)r8   r   c             C   s,   || _ d| _d | _d | _d| _t | _d S )NFr   )r@   rH   rI   rK   rA   rD   rE   rF   )r    r8   r   r   r   r!   5  s    zDataQueue.__init__)r   c             C   s
   t | jS )N)ri   rF   )r    r   r   r   __len__=  s    zDataQueue.__len__c             C   s   | j S )N)rH   )r    r   r   r   r_   @  s    zDataQueue.is_eofc             C   s   | j o| j S )N)rH   rF   )r    r   r   r   r`   C  s    zDataQueue.at_eofc             C   s   | j S )N)rK   )r    r   r   r   rV   F  s    zDataQueue.exception)rW   r   c             C   s.   d| _ || _| j}|d k	r*d | _t|| d S )NT)rH   rK   rI   r   )r    rW   rY   r   r   r   r   I  s    zDataQueue.set_exceptionr   )rd   rm   r   c             C   s@   |  j |7  _ | j||f | j}|d k	r<d | _t|d  d S )N)rA   rF   rQ   rI   r   )r    rd   rm   rY   r   r   r   rq   R  s    zDataQueue.feed_datac             C   s(   d| _ | j}|d k	r$d | _t|d  d S )NT)rH   rI   r   )r    rY   r   r   r   r^   [  s
    zDataQueue.feed_eofc          	      s   | j sX| jsX| jrt| j | _y| jI d H  W n$ tjtjfk
rV   d | _ Y nX | j r~| j 	 \}}|  j
|8  _
|S | jd k	r| jntd S )N)rF   rH   rI   ra   r@   rb   r?   ZCancelledErrorTimeoutErrorr   rA   rK   r   )r    rd   rm   r   r   r   r0   c  s    

zDataQueue.readc             C   s
   t | jS )N)r   r0   )r    r   r   r   r"   w  s    zDataQueue.__aiter__)r   )r   r   r   r   r?   r   r!   r6   r   r,   r_   r`   r   r   rV   r   r   rq   r^   r0   r   r"   r   r   r   r   r   2  s   		r   c                   sX   e Zd ZdZeeejdd fddZde	edd fdd	Z
e	d
 fddZ  ZS )r   zlFlowControlDataQueue resumes and pauses an underlying stream.

    It is a destination for parsed data.
    N)r9   r:   r8   r   c               s"   t  j|d || _|d | _d S )N)r8   r;   )superr!   r<   _limit)r    r9   r:   r8   )rP   r   r   r!     s    zFlowControlDataQueue.__init__r   )rd   rm   r   c                s0   t  || | j| jkr,| jjs,| j  d S )N)r   rq   rA   r   r<   ro   rp   )r    rd   rm   )rP   r   r   rq     s    zFlowControlDataQueue.feed_data)r   c                s6   zt   I d H S | j| jk r0| jjr0| j  X d S )N)r   r0   rA   r   r<   ro   r   )r    )rP   r   r   r0     s    zFlowControlDataQueue.read)r   )r   r   r   r   r   r6   r?   r   r!   r   rq   r0   __classcell__r   r   )rP   r   r   {  s
   r   )#r?   rD   rf   typingr   r   r   r   r   r   r   r	   Zbase_protocolr   Zhelpersr   r   r   logr   Ztypedefsr   __all__r   r[   r   r   r'   r-   r   r   r   __annotations__r   r   r   r   r   r   <module>   s,    (   &4I