#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
urlfetch
~~~~~~~~~~
An easy to use HTTP client based on httplib.
:copyright: (c) 2011-2020 by Yue Du.
:license: BSD 2-clause License, see LICENSE for more details.
"""
__version__ = "2.0.1"
__author__ = "Yue Du <ifduyue@gmail.com>"
__url__ = "https://github.com/ifduyue/urlfetch"
__license__ = "BSD 2-Clause License"
import zlib
import os, sys, base64, codecs, uuid, stat, time, socket
import ssl
from os.path import basename, dirname, abspath, join as pathjoin
from functools import partial
from io import BytesIO
import re
try:
import simplejson as json
except ImportError:
import json
from http.client import HTTPConnection, HTTPSConnection
from urllib.parse import parse_qs, urlencode, urlsplit, urljoin
import http.cookies as Cookie
basestring = (str, bytes)
b = lambda s: s.encode("latin-1")
u = lambda s: s
__all__ = (
"request",
"fetch",
"Session",
"get",
"head",
"put",
"post",
"delete",
"options",
"trace",
"patch",
"UrlfetchException",
"ContentLimitExceeded",
"URLError",
"ContentDecodingError",
"TooManyRedirects",
)
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
HEAD = "HEAD"
OPTIONS = "OPTIONS"
TRACE = "TRACE"
PATCH = "PATCH"
[docs]class UrlfetchException(IOError):
"Base exception. All exceptions and errors will subclass from this."
[docs]class ContentLimitExceeded(UrlfetchException):
"Content length is beyond the limit."
[docs]class URLError(UrlfetchException, ValueError):
"Error parsing or handling the URL."
[docs]class ContentDecodingError(UrlfetchException):
"Failed to decode the content."
[docs]class TooManyRedirects(UrlfetchException):
"""Too many redirects."""
[docs]class Timeout(UrlfetchException):
"""Request timed out."""
class cached_property(object):
"""Cached property.
A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
property.
"""
def __init__(self, fget, fset=None, fdel=None, doc=None):
self.__get = fget
self.__set = fset
self.__del = fdel
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
def __get__(self, instance, owner):
if instance is None:
# attribute is accessed through the owner class
return self
try:
return instance.__dict__[self.__name__]
except KeyError:
value = instance.__dict__[self.__name__] = self.__get(instance)
return value
def __set__(self, instance, value):
if instance is None:
return self
if self.__set is not None:
value = self.__set(instance, value)
instance.__dict__[self.__name__] = value
def __delete__(self, instance):
if instance is None:
return self
try:
value = instance.__dict__.pop(self.__name__)
except KeyError:
pass
else:
if self.__del is not None:
self.__del(instance, value)
def setter(self, fset):
return self.__class__(self.__get, fset, self.__del)
def deleter(self, fdel):
return self.__class__(self.__get, self.__set, fdel)
##############################################################################
# Core Methods and Classes ####################################################
##############################################################################
[docs]class Response(object):
"""A Response object.
>>> import urlfetch
>>> response = urlfetch.get("http://docs.python.org/")
>>> response.total_time
0.033042049407959
>>> response.status, response.reason, response.version
(200, 'OK', 10)
>>> type(response.body), len(response.body)
(<type 'str'>, 8719)
>>> type(response.text), len(response.text)
(<type 'unicode'>, 8719)
>>> response.getheader('server')
'Apache/2.2.16 (Debian)'
>>> response.getheaders()
[
('content-length', '8719'),
('x-cache', 'MISS from localhost'),
('accept-ranges', 'bytes'),
('vary', 'Accept-Encoding'),
('server', 'Apache/2.2.16 (Debian)'),
('last-modified', 'Tue, 26 Jun 2012 19:23:18 GMT'),
('connection', 'close'),
('etag', '"13cc5e4-220f-4c36507ded580"'),
('date', 'Wed, 27 Jun 2012 06:50:30 GMT'),
('content-type', 'text/html'),
('x-cache-lookup', 'MISS from localhost:8080')
]
>>> response.headers
{
'content-length': '8719',
'x-cache': 'MISS from localhost',
'accept-ranges': 'bytes',
'vary': 'Accept-Encoding',
'server': 'Apache/2.2.16 (Debian)',
'last-modified': 'Tue, 26 Jun 2012 19:23:18 GMT',
'connection': 'close',
'etag': '"13cc5e4-220f-4c36507ded580"',
'date': 'Wed, 27 Jun 2012 06:50:30 GMT',
'content-type': 'text/html',
'x-cache-lookup': 'MISS from localhost:8080'
}
:raises: :class:`ContentLimitExceeded`
"""
def __init__(self, r, **kwargs):
for k in kwargs:
setattr(self, k, kwargs[k])
self._r = r # httplib.HTTPResponse
self.msg = r.msg
#: Status code returned by server.
self.status = r.status
# compatible with requests
#: An alias of :attr:`status`.
self.status_code = r.status
#: Reason phrase returned by server.
self.reason = r.reason
#: HTTP protocol version used by server.
#: 10 for HTTP/1.0, 11 for HTTP/1.1.
self.version = r.version
#: total time
self.total_time = kwargs.pop("total_time", None)
self.getheader = r.getheader
self.getheaders = r.getheaders
self._content_encoding = self.getheader("content-encoding", None)
self._decoder = None
try:
self.length_limit = int(kwargs.get("length_limit", 0)) or None
except:
self.length_limit = None
# if content (length) size is more than length_limit, skip
content_length = int(self.getheader("Content-Length", 0))
if self.length_limit and content_length > self.length_limit:
self.close()
raise ContentLimitExceeded(
"Content length is more than %d bytes" % self.length_limit
)
[docs] def read(self, chunk_size=65536):
"""Read content (for streaming and large files)
:arg int chunk_size: size of chunk, default is 65536, i.e. 64KiB.
"""
return self._r.read(chunk_size)
def __iter__(self):
return self
def __next__(self):
chunk = self.read()
if not chunk:
if self._decoder:
chunk = self._decoder.flush()
self._decoder = None
return chunk
else:
raise StopIteration
else:
ce = self._content_encoding
if ce in ("gzip", "deflate"):
if not self._decoder:
if ce == "gzip":
self._decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
else:
self._decoder = zlib.decompressobj()
try:
return self._decoder.decompress(chunk)
except zlib.error:
self._decoder = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self._decoder.decompress(chunk)
except (IOError, zlib.error) as e:
self.close()
raise ContentDecodingError(e)
if ce:
self.close()
raise ContentDecodingError("Unknown encoding: %s" % ce)
return chunk
next = __next__
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
[docs] @classmethod
def from_httplib(cls, connection, **kwargs):
"""Make an :class:`~urlfetch.Response` object from a httplib response
object."""
return cls(connection, **kwargs)
[docs] @cached_property
def body(self):
"""Response body.
:raises: :class:`ContentLimitExceeded`, :class:`ContentDecodingError`
"""
content = []
length = 0
for chunk in self:
content.append(chunk)
length += len(chunk)
if self.length_limit and length > self.length_limit:
self.close()
raise ContentLimitExceeded(
"Content length is more than %d " "bytes" % self.length_limit
)
return b("").join(content)
# compatible with requests
#: An alias of :attr:`body`.
@property
def content(self):
return self.body
[docs] @cached_property
def text(self):
"""Response body in str."""
return mb_code(self.content)
[docs] @cached_property
def json(self):
"""Load response body as json.
:raises: :class:`ContentDecodingError`
"""
try:
return json.loads(self.text)
except Exception as e:
raise ContentDecodingError(e)
[docs] @cached_property
def cookies(self):
"""Cookies in dict"""
c = Cookie.SimpleCookie(self.getheader("set-cookie"))
return dict((i.key, i.value) for i in c.values())
[docs] @cached_property
def cookiestring(self):
"""Cookie string"""
return "; ".join("%s=%s" % (k, v) for k, v in self.cookies.items())
[docs] @cached_property
def links(self):
"""Links parsed from HTTP Link header"""
ret = []
linkheader = self.getheader("link")
if not linkheader:
return ret
for i in linkheader.split(","):
try:
url, params = i.split(";", 1)
except ValueError:
url, params = i, ""
link = {}
link["url"] = url.strip('''<> '"''')
for param in params.split(";"):
try:
k, v = param.split("=")
except ValueError:
break
link[k.strip(''' '"''')] = v.strip(''' '"''')
ret.append(link)
return ret
[docs] def close(self):
"""Close the connection."""
self._r.close()
def __del__(self):
self.close()
[docs]class Session(object):
"""A session object.
:class:`urlfetch.Session` can hold common headers and cookies.
Every request issued by a :class:`urlfetch.Session` object will bring u
these headers and cookies.
:class:`urlfetch.Session` plays a role in handling cookies, just like a
cookiejar.
:arg dict headers: Init headers.
:arg dict cookies: Init cookies.
:arg tuple auth: (username, password) for basic authentication.
"""
def __init__(self, headers={}, cookies={}, auth=None):
"""Init a :class:`~urlfetch.Session` object"""
#: headers
self.headers = headers.copy()
#: cookies
self.cookies = cookies.copy()
if auth and isinstance(auth, (list, tuple)):
auth = "%s:%s" % tuple(auth)
auth = base64.b64encode(auth.encode("utf-8"))
self.headers["Authorization"] = "Basic " + auth.decode("utf-8")
[docs] def putcookie(self, key, value=""):
"""Add an cookie to default cookies."""
self.cookies[key] = value
[docs] def popcookie(self, key):
"""Remove an cookie from default cookies."""
return self.cookies.pop(key)
@property
def cookiestring(self):
"""Cookie string.
It's assignalbe, and will change :attr:`~.Session.cookies`
correspondingly.
>>> s = Session()
>>> s.cookiestring = 'foo=bar; 1=2'
>>> s.cookies
{'1': '2', 'foo': 'bar'}
"""
return "; ".join("%s=%s" % (k, v) for k, v in self.cookies.items())
@cookiestring.setter
def cookiestring(self, value):
""""Cookie string setter"""
c = Cookie.SimpleCookie(value)
sc = [(i.key, i.value) for i in c.values()]
self.cookies = dict(sc)
[docs] def snapshot(self):
session = {"headers": self.headers.copy(), "cookies": self.cookies.copy()}
return session
[docs] def request(self, *args, **kwargs):
"""Issue a request."""
headers = self.headers.copy()
if self.cookiestring:
headers["Cookie"] = self.cookiestring
headers.update(kwargs.get("headers", {}))
kwargs["headers"] = headers
r = request(*args, **kwargs)
self.cookies.update(r.cookies)
return r
[docs] def fetch(self, *args, **kwargs):
"""Fetch an URL"""
data = kwargs.get("data", None)
files = kwargs.get("files", {})
if data and isinstance(data, (basestring, dict)) or files:
return self.post(*args, **kwargs)
return self.get(*args, **kwargs)
[docs] def get(self, *args, **kwargs):
"""Issue a get request."""
kwargs["method"] = "GET"
return self.request(*args, **kwargs)
[docs] def post(self, *args, **kwargs):
"""Issue a post request."""
kwargs["method"] = POST
return self.request(*args, **kwargs)
[docs] def put(self, *args, **kwargs):
"""Issue a put request."""
kwargs["method"] = PUT
return self.request(*args, **kwargs)
[docs] def delete(self, *args, **kwargs):
"""Issue a delete request."""
kwargs["method"] = DELETE
return self.request(*args, **kwargs)
[docs] def head(self, *args, **kwargs):
"""Issue a head request."""
kwargs["method"] = HEAD
return self.request(*args, **kwargs)
[docs] def options(self, *args, **kwargs):
"""Issue a options request."""
kwargs["method"] = OPTIONS
return self.request(*args, **kwargs)
[docs] def trace(self, *args, **kwargs):
"""Issue a trace request."""
kwargs["method"] = TRACE
return self.request(*args, **kwargs)
[docs] def patch(self, *args, **kwargs):
"""Issue a patch request."""
kwargs["method"] = PATCH
return self.request(*args, **kwargs)
[docs]def fetch(*args, **kwargs):
"""fetch an URL.
:func:`~urlfetch.fetch` is a wrapper of :func:`~urlfetch.request`.
It calls :func:`~urlfetch.get` by default. If one of parameter ``data``
or parameter ``files`` is supplied, :func:`~urlfetch.post` is called.
"""
data = kwargs.get("data", None)
files = kwargs.get("files", {})
if data and isinstance(data, (basestring, dict)) or files:
return post(*args, **kwargs)
return get(*args, **kwargs)
def match_no_proxy(host, no_proxy):
ip_regex = r"(\d{1,3}).(\d{1,3}).(\d{1,3}).(\d{1,3})"
no_proxy_ip_regex = r"(\d{1,3}).(\d{1,3}).(\d{1,3}).(\d{1,3})(?=/(\d+))?"
ip_match = re.match(ip_regex, host)
no_proxy_ip_match = re.match(no_proxy_ip_regex, no_proxy)
if no_proxy_ip_match and ip_match:
host_bits = "".join(
"{:08b}".format(int(section)) for section in ip_match.group(1, 2, 3, 4)
)
no_proxy_bits = "".join(
"{:08b}".format(int(section))
for section in no_proxy_ip_match.group(1, 2, 3, 4)
)
if no_proxy_ip_match.group(5) is not None:
bit_match_count = int(no_proxy_ip_match.group(5))
return host_bits[:bit_match_count] == no_proxy_bits[:bit_match_count]
else:
return host_bits == no_proxy_bits
else:
return host.endswith(no_proxy)
[docs]def request(
url,
method="GET",
params=None,
data=None,
headers={},
timeout=None,
files={},
randua=False,
auth=None,
length_limit=None,
proxies=None,
trust_env=True,
max_redirects=0,
source_address=None,
validate_certificate=None,
**kwargs
):
"""request an URL
:arg string url: URL to be fetched.
:arg string method: (optional) HTTP method, one of ``GET``, ``DELETE``,
``HEAD``, ``OPTIONS``, ``PUT``, ``POST``, ``TRACE``,
``PATCH``. ``GET`` is the default.
:arg dict/string params: (optional) Dict or string to attach to url as
querystring.
:arg dict headers: (optional) HTTP request headers.
:arg float timeout: (optional) Timeout in seconds
:arg files: (optional) Files to be sended
:arg randua: (optional) If ``True`` or ``path string``, use a random
user-agent in headers, instead of
``'urlfetch/' + __version__``
:arg tuple auth: (optional) (username, password) for basic authentication
:arg int length_limit: (optional) If ``None``, no limits on content length,
if the limit reached raised exception 'Content length
is more than ...'
:arg dict proxies: (optional) HTTP proxy, like {'http': '127.0.0.1:8888',
'https': '127.0.0.1:563'}
:arg bool trust_env: (optional) If ``True``, urlfetch will get infomations
from env, such as HTTP_PROXY, HTTPS_PROXY
:arg int max_redirects: (integer, optional) Max redirects allowed within a
request. Default is 0, which means redirects are
not allowed.
:arg tuple source_address: (optional) A tuple of (host, port) to
specify the source_address to bind to. This
argument is ignored if you're using Python prior
to 2.7/3.2.
:arg bool validate_certificate: (optional) If ``False``, urlfetch skips
all the necessary certificate and hostname checks
:returns: A :class:`~urlfetch.Response` object
:raises: :class:`URLError`, :class:`UrlfetchException`,
:class:`TooManyRedirects`,
"""
def make_connection(conn_type, host, port, timeout, source_address):
"""Return HTTP or HTTPS connection."""
kwargs = {"timeout": timeout, "source_address": source_address}
ssl_context = None
if validate_certificate is False:
ssl_context = ssl._create_unverified_context()
if conn_type == "http":
conn = HTTPConnection(host, port, **kwargs)
elif conn_type == "https":
conn = HTTPSConnection(host, port, context=ssl_context, **kwargs)
else:
raise URLError("Unknown Connection Type: %s" % conn_type)
return conn
via_proxy = False
method = method.upper()
if method not in ALLOWED_METHODS:
raise UrlfetchException("Method should be one of " + ", ".join(ALLOWED_METHODS))
if params:
if isinstance(params, dict):
url = url_concat(url, params)
elif isinstance(params, basestring):
if url[-1] not in ("?", "&"):
url += "&" if ("?" in url) else "?"
url += params
parsed_url = parse_url(url)
reqheaders = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, compress, identity, *",
"User-Agent": random_useragent(randua),
"Host": parsed_url["http_host"],
}
# Proxy support
scheme = parsed_url["scheme"]
if proxies is None and trust_env:
proxies = PROXIES
if not proxies:
proxies = {}
ignore_hosts = PROXY_IGNORE_HOSTS
if trust_env:
no_proxy = os.getenv("no_proxy") or os.getenv("NO_PROXY")
if no_proxy:
ignore_hosts = no_proxy.split(",")
proxy = proxies.get(scheme)
if proxy and not any(
match_no_proxy(parsed_url["host"], host) for host in ignore_hosts
):
via_proxy = True
if "://" not in proxy:
proxy = "%s://%s" % ('http', proxy)
parsed_proxy = parse_url(proxy)
# Proxy-Authorization
if parsed_proxy["username"] and parsed_proxy["password"]:
proxyauth = "%s:%s" % (parsed_proxy["username"], parsed_proxy["password"])
proxyauth = base64.b64encode(proxyauth.encode("utf-8"))
reqheaders["Proxy-Authorization"] = "Basic " + proxyauth.decode("utf-8")
conn = make_connection(
parsed_proxy["scheme"], parsed_proxy["host"], parsed_proxy["port"], timeout, source_address
)
else:
conn = make_connection(
scheme, parsed_url["host"], parsed_url["port"], timeout, source_address
)
if not auth and parsed_url["username"] and parsed_url["password"]:
auth = (parsed_url["username"], parsed_url["password"])
if auth:
if isinstance(auth, (list, tuple)):
auth = "%s:%s" % tuple(auth)
auth = base64.b64encode(auth.encode("utf-8"))
reqheaders["Authorization"] = "Basic " + auth.decode("utf-8")
if files:
content_type, data = encode_multipart(data, files)
reqheaders["Content-Type"] = content_type
elif isinstance(data, dict):
data = urlencode(data, True)
if isinstance(data, basestring) and not files:
# httplib will set 'Content-Length', also you can set it by yourself
reqheaders["Content-Type"] = "application/x-www-form-urlencoded"
# what if the method is GET, HEAD or DELETE
# just do not make so much decisions for users
reqheaders.update(headers)
start_time = time.time()
try:
request_url = url if via_proxy else parsed_url["uri"]
conn.request(method, request_url, data, reqheaders)
resp = conn.getresponse()
except socket.timeout as e:
raise Timeout(e)
except Exception as e:
raise UrlfetchException(e)
end_time = time.time()
total_time = end_time - start_time
history = []
response = Response.from_httplib(
resp,
reqheaders=reqheaders,
length_limit=length_limit,
history=history[:],
url=url,
total_time=total_time,
start_time=start_time,
)
while (
response.status in (301, 302, 303, 307)
and "location" in response.headers
and max_redirects
):
response.body, response.close(), history.append(response)
if len(history) > max_redirects:
raise TooManyRedirects("max_redirects exceeded")
method = method if response.status == 307 else "GET"
location = response.headers["location"]
if location[:2] == "//":
url = parsed_url["scheme"] + ":" + location
else:
url = urljoin(url, location)
parsed_url = parse_url(url)
reqheaders["Host"] = parsed_url["http_host"]
reqheaders["Referer"] = response.url
# Proxy
scheme = parsed_url["scheme"]
proxy = proxies.get(scheme)
if proxy and parsed_url["host"] not in PROXY_IGNORE_HOSTS:
via_proxy = True
if "://" not in proxy:
proxy = "%s://%s" % ('http', proxy)
parsed_proxy = parse_url(proxy)
# Proxy-Authorization
if parsed_proxy["username"] and parsed_proxy["password"]:
proxyauth = "%s:%s" % (
parsed_proxy["username"],
parsed_proxy["username"],
)
proxyauth = base64.b64encode(proxyauth.encode("utf-8"))
reqheaders["Proxy-Authorization"] = "Basic " + proxyauth.decode("utf-8")
conn = make_connection(
parsed_proxy["scheme"],
parsed_proxy["host"],
parsed_proxy["port"],
timeout,
source_address,
)
else:
via_proxy = False
reqheaders.pop("Proxy-Authorization", None)
conn = make_connection(
scheme, parsed_url["host"], parsed_url["port"], timeout, source_address
)
try:
request_url = url if via_proxy else parsed_url["uri"]
conn.request(method, request_url, data, reqheaders)
resp = conn.getresponse()
except socket.timeout as e:
raise Timeout(e)
except Exception as e:
raise UrlfetchException(e)
response = Response.from_httplib(
resp,
reqheaders=reqheaders,
length_limit=length_limit,
history=history[:],
url=url,
total_time=total_time,
start_time=start_time,
)
return response
##############################################################################
# Shortcuts and Helpers #######################################################
##############################################################################
def _partial_method(method):
func = partial(request, method=method)
func.__doc__ = "Issue a %s request" % method.lower()
func.__name__ = method.lower()
func.__module__ = request.__module__
return func
get = _partial_method(GET)
post = _partial_method(POST)
put = _partial_method(PUT)
delete = _partial_method(DELETE)
head = _partial_method(HEAD)
options = _partial_method(OPTIONS)
trace = _partial_method(TRACE)
patch = _partial_method(PATCH)
del _partial_method
class ObjectDict(dict):
"""Makes a dictionary behave like an object."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self[name] = value
[docs]def parse_url(url):
"""Return a dictionary of parsed url
Including scheme, netloc, path, params, query, fragment, uri, username,
password, host, port and http_host
"""
try:
url = str(url)
except UnicodeDecodeError:
pass
if "://" in url:
scheme, url = url.split("://", 1)
else:
scheme = "http"
url = "http://" + url
parsed = urlsplit(url)
r = ObjectDict()
r["scheme"] = scheme
r["netloc"] = parsed.netloc
r["path"] = parsed.path
r["query"] = parsed.query
r["fragment"] = parsed.fragment
r["uri"] = parsed.path
if parsed.query:
r["uri"] += "?" + parsed.query
r["username"] = parsed.username
r["password"] = parsed.password
host = parsed.hostname.encode("idna").decode("utf-8")
r["host"] = r["hostname"] = host
try:
r["port"] = parsed.port
except ValueError:
r["port"] = None
if r["port"]:
r["http_host"] = "%s:%d" % (r["host"], r["port"])
else:
r["http_host"] = r["host"]
return r
[docs]def get_proxies_from_environ():
"""Get proxies from os.environ."""
proxies = {}
http_proxy = os.getenv("http_proxy") or os.getenv("HTTP_PROXY")
https_proxy = os.getenv("https_proxy") or os.getenv("HTTPS_PROXY")
if http_proxy:
proxies["http"] = http_proxy
if https_proxy:
proxies["https"] = https_proxy
return proxies
[docs]def mb_code(s, coding=None, errors="replace"):
"""encoding/decoding helper."""
if isinstance(s, str):
return s if coding is None else s.encode(coding, errors=errors)
for c in ("utf-8", "gb2312", "gbk", "gb18030", "big5"):
try:
s = s.decode(c)
return s if coding is None else s.encode(coding, errors=errors)
except:
pass
return str(s, errors=errors)
[docs]def random_useragent(filename=True):
"""Returns a User-Agent string randomly from file.
:arg string filename: (Optional) Path to the file from which a random
useragent is generated. By default it's ``True``, a file shipped
with this module will be used.
:returns: An user-agent string.
"""
import random
default_ua = "urlfetch/%s" % __version__
if isinstance(filename, basestring):
filenames = [filename]
else:
filenames = []
if filename and UAFILE:
filenames.append(UAFILE)
for filename in filenames:
try:
st = os.stat(filename)
if stat.S_ISREG(st.st_mode) and os.access(filename, os.R_OK):
break
except:
pass
else:
return default_ua
with open(filename, "rb") as f:
filesize = st.st_size
pos = 0
r = random.Random()
# try getting a valid line for no more than 3 times
for i in range(3):
pos += r.randint(0, filesize)
pos %= filesize
f.seek(pos)
# in case we are in middle of a line
f.readline()
line = f.readline()
if not line:
if f.tell() == filesize:
# end of file
f.seek(0)
line = f.readline()
line = line.strip()
if line and line[0] != "#":
return line
return default_ua
[docs]def url_concat(url, args, keep_existing=True):
"""Concatenate url and argument dictionary
>>> url_concat("http://example.com/foo?a=b", dict(c="d"))
'http://example.com/foo?a=b&c=d'
:arg string url: URL being concat to.
:arg dict args: Args being concat.
:arg bool keep_existing: (Optional) Whether to keep the args which are
alreay in url, default is ``True``.
"""
if not args:
return url
if keep_existing:
if url[-1] not in ("?", "&"):
url += "&" if ("?" in url) else "?"
return url + urlencode(args, True)
else:
url, seq, query = url.partition("?")
query = parse_qs(query, True)
query.update(args)
return url + "?" + urlencode(query, True)
[docs]def choose_boundary():
"""Generate a multipart boundry.
:returns: A boundary string
"""
global BOUNDARY_PREFIX
if BOUNDARY_PREFIX is None:
BOUNDARY_PREFIX = "urlfetch"
try:
uid = repr(os.getuid())
BOUNDARY_PREFIX += "." + uid
except AttributeError:
pass
try:
pid = repr(os.getpid())
BOUNDARY_PREFIX += "." + pid
except AttributeError:
pass
return "%s.%s" % (BOUNDARY_PREFIX, uuid.uuid4().hex)
[docs]def encode_multipart(data, files):
"""Encode multipart.
:arg dict data: Data to be encoded
:arg dict files: Files to be encoded
:returns: Encoded binary string
:raises: :class:`UrlfetchException`
"""
body = BytesIO()
boundary = choose_boundary()
part_boundary = b("--%s\r\n" % boundary)
writer = codecs.lookup("utf-8")[3]
if isinstance(data, dict):
for name, values in data.items():
if not isinstance(values, (list, tuple, set)):
# behave like urllib.urlencode(dict, True)
values = (values,)
for value in values:
body.write(part_boundary)
writer(body).write(
"Content-Disposition: form-data; " 'name="%s"\r\n' % name
)
body.write(b"Content-Type: text/plain\r\n\r\n")
if isinstance(value, int):
value = str(value)
if isinstance(value, str):
writer(body).write(value)
else:
body.write(value)
body.write(b"\r\n")
for fieldname, f in files.items():
if isinstance(f, tuple):
filename, f = f
elif hasattr(f, "name"):
filename = basename(f.name)
else:
filename = None
raise UrlfetchException("file must has filename")
if hasattr(f, "read"):
value = f.read()
elif isinstance(f, basestring):
value = f
else:
value = str(f)
body.write(part_boundary)
if filename:
writer(body).write(
'Content-Disposition: form-data; name="%s"; '
'filename="%s"\r\n' % (fieldname, filename)
)
body.write(b"Content-Type: application/octet-stream\r\n\r\n")
else:
writer(body).write(
'Content-Disposition: form-data; name="%s"' "\r\n" % fieldname
)
body.write(b"Content-Type: text/plain\r\n\r\n")
if isinstance(value, str):
writer(body).write(value)
else:
body.write(value)
body.write(b"\r\n")
body.write(b("--" + boundary + "--\r\n"))
content_type = "multipart/form-data; boundary=%s" % boundary
return content_type, body.getvalue()
##############################################################################
# Constants and Globals #######################################################
##############################################################################
ALLOWED_METHODS = (GET, DELETE, HEAD, OPTIONS, PUT, POST, TRACE, PATCH)
PROXY_IGNORE_HOSTS = ("127.0.0.1", "localhost")
PROXIES = get_proxies_from_environ()
BOUNDARY_PREFIX = None
UAFILENAME = "urlfetch.useragents.list"
UAFILE = next(
(
i
for i in set(
(
pathjoin(sys.prefix, UAFILENAME),
pathjoin(sys.prefix, "local", UAFILENAME),
pathjoin(dirname(abspath(__file__)), UAFILENAME),
)
)
if os.path.isfile(i)
),
None,
)