import asyncio
import json
import urllib
import urllib.parse
import zlib
from httptools import HttpResponseParser, parse_url
from uvhttp import pool
from uvhttp.utils import HeaderDict
class EOFError(Exception):
pass
[docs]class Session:
"""
A Session is an HTTP request pool that allows up to request_limit requests
in flight at once, with up to conn_limit connections per ip/port::
from uvhttp.utils import start_loop
import uvhttp.http
NUM_CONNS_PER_HOST = 10
@start_loop
async def main(loop):
session = uvhttp.http.Session(NUM_CONNS_PER_HOST, loop)
for _ in range(6):
response = await session.get(b'http://www.google.com/', headers={
b'User-Agent': b'fast-af'
})
print(response.text)
if __name__ == '__main__':
main()
The module is designed to send HTTP requests very quickly, so all methods
require ``bytes`` objects instead of strings.
"""
def __init__(self, conn_limit, loop, resolver=None):
self.conn_limit = conn_limit
self.loop = loop
self.resolver = resolver
self.hosts = {}
[docs] async def head(self, *args, **kwargs):
"""
Make an HTTP HEAD request to url, see :meth:`.head`.
"""
return await self.request(b'HEAD', *args, **kwargs)
[docs] async def get(self, *args, **kwargs):
"""
Make an HTTP GET request to url, see :meth:`.request`.
"""
return await self.request(b'GET', *args, **kwargs)
[docs] async def post(self, *args, **kwargs):
"""
Make an HTTP POST request to url, see :meth:`.request`.
"""
return await self.request(b'POST', *args, **kwargs)
[docs] async def put(self, *args, **kwargs):
"""
Make an HTTP PUT request to url, see :meth:`.request`.
"""
return await self.request(b'PUT', *args, **kwargs)
[docs] async def delete(self, *args, **kwargs):
"""
Make an HTTP DELETE request to url, see :meth:`.request`.
"""
return await self.request(b'DELETE', *args, **kwargs)
[docs] async def request(self, method, url, headers=None, data=None, ssl=None):
"""
Make a new HTTP request in the pool.
``headers`` can be passed as a dictionary of :class:`byte` (not :class:`str`).
``data`` is a byte array of data to include in the request.
``ssl`` can be a :class:`ssl.SSLContext` or True and must match
the schema in the URL.
"""
# Parse the URL for the hostname, port, and query string.
parsed_url = parse_url(url)
use_ssl = parsed_url.schema == b'https'
if not use_ssl:
ssl = None
else:
ssl = ssl or True
port = parsed_url.port
if not port:
port = 443 if use_ssl else 80
host = parsed_url.host
path = parsed_url.path
if parsed_url.query:
path += b'?' + parsed_url.query
# Find or create a pool for this host/port/scheme combination.
addr = parsed_url.schema + b':' + host + b':' + str(port).encode()
session = self.hosts.get(addr)
if not session:
session = pool.Pool(host, port, self.conn_limit, self.loop, resolver=self.resolver, ssl=ssl)
self.hosts[addr] = session
# Create and send the new HTTP request.
request = HTTPRequest(await session.connect())
await request.send(method, host, path, headers, data)
return request
async def connections(self):
connections = 0
for host, pool in self.hosts.items():
connections += await pool.stats()
return connections
[docs]class HTTPRequest:
"""
An HTTP request instantiated from a :class:`.Session`. HTTP requests are returned by the HTTP
session once they are sent and contain all information about the request and response.
"""
def __init__(self, connection):
self.connection = connection
[docs] async def send(self, method, host, path, headers=None, data=None):
"""
Send the request (usually called by the Session object).
"""
self.__keep_alive = None
self.__gzipped = None
self.headers_complete = False
self.contains_body = False
self.body_done = True
self.__text = b''
self.content = b''
self.__headers = {}
self.__header_dict = None
self.parser = HttpResponseParser(self)
self.method = method
self.request_headers = {
b"Host": host,
b"User-Agent": b"uvloop http client"
}
if data:
self.request_headers[b"Content-Length"] = str(len(data)).encode()
if headers:
self.request_headers.update(headers)
request = b"\r\n".join(
[ b" ".join([method, path, b"HTTP/1.1"]) ] +
[ b": ".join(header) for header in self.request_headers.items() ] +
[ b"\r\n" ]
)
if data:
request += data
await self.connection.send(request)
try:
await self.fetch()
except EOFError as e:
if self.headers[b'transfer-encoding'] \
or self.headers[b'content-encoding'] or self.headers[b'content-length']:
raise e
self.status_code = self.parser.get_status_code()
async def fetch(self):
# TODO: support streaming
while not self.headers_complete or not self.body_done:
data = await self.connection.read(65535)
if not data:
self.close()
raise EOFError()
self.parser.feed_data(data)
self.close()
[docs] def close(self):
"""
Closes the request, signalling that we're done with the request. The
connection is kept open and released back to the pool for re-use.
"""
if not self.keep_alive:
self.connection.close()
self.connection.release()
@property
def keep_alive(self):
if self.__keep_alive == None:
connection = self.headers[b'connection']
self.__keep_alive = not connection or connection != b'close'
return self.__keep_alive
@property
def gzipped(self):
"""
Return true if the response is gzipped.
"""
if self.__gzipped == None:
encoding = self.headers[b'content-encoding'] + self.headers[b'transfer-encoding']
self.__gzipped = b'gzip' in encoding or b'deflate' in encoding
return self.__gzipped
[docs] def json(self):
"""
Return the JSON decoded version of the body.
"""
# TODO: Possibly should use a better library.
return json.loads(self.text)
@property
def text(self):
"""
The string representation of the response body. It will be ungzipped
and encoded as a unicode string.
"""
if self.__text:
return self.__text
if self.gzipped:
self.__text = zlib.decompress(self.content, 16 + zlib.MAX_WBITS)
else:
self.__text = self.content
self.__text = self.__text.decode('utf-8')
return self.__text
@property
def headers(self):
"""
Return the headers from the request in a case-insensitive dictionary.
"""
if self.headers_complete and self.__header_dict:
return self.__header_dict
self.__header_dict = HeaderDict(self.__headers)
return self.__header_dict
def on_header(self, name, value):
self.__headers[name] = value
def on_body(self, body):
self.content += body
def on_headers_complete(self):
self.headers_complete = True
def on_chunk_complete(self):
self.body_done = True
def on_message_complete(self):
self.body_done = True
def on_message_begin(self):
if self.method != b"HEAD":
self.body_done = False