youtube-dl/youtube_dl/compat.py

503 lines
16 KiB
Python
Raw Normal View History

2014-11-02 04:36:29 -06:00
from __future__ import unicode_literals
import collections
import getpass
import optparse
import os
import re
import shutil
import socket
import subprocess
import sys
import itertools
try:
import urllib.request as compat_urllib_request
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import urllib2 as compat_urllib_request
try:
import urllib.error as compat_urllib_error
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import urllib2 as compat_urllib_error
try:
import urllib.parse as compat_urllib_parse
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import urllib as compat_urllib_parse
try:
from urllib.parse import urlparse as compat_urllib_parse_urlparse
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
from urlparse import urlparse as compat_urllib_parse_urlparse
try:
import urllib.parse as compat_urlparse
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import urlparse as compat_urlparse
try:
import http.cookiejar as compat_cookiejar
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import cookielib as compat_cookiejar
try:
import html.entities as compat_html_entities
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import htmlentitydefs as compat_html_entities
try:
import http.client as compat_http_client
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
import httplib as compat_http_client
try:
from urllib.error import HTTPError as compat_HTTPError
except ImportError: # Python 2
from urllib2 import HTTPError as compat_HTTPError
try:
from urllib.request import urlretrieve as compat_urlretrieve
except ImportError: # Python 2
from urllib import urlretrieve as compat_urlretrieve
try:
from subprocess import DEVNULL
compat_subprocess_get_DEVNULL = lambda: DEVNULL
except ImportError:
compat_subprocess_get_DEVNULL = lambda: open(os.path.devnull, 'w')
2015-01-29 19:57:37 -06:00
try:
import http.server as compat_http_server
except ImportError:
import BaseHTTPServer as compat_http_server
try:
from urllib.parse import unquote as compat_urllib_parse_unquote
except ImportError:
def compat_urllib_parse_unquote_to_bytes(string):
"""unquote_to_bytes('abc%20def') -> b'abc def'."""
# Note: strings are encoded as UTF-8. This is only an issue if it contains
# unescaped non-ASCII characters, which URIs should not.
if not string:
# Is it a string-like object?
string.split
return b''
if isinstance(string, str):
string = string.encode('utf-8')
# string = encode('utf-8')
# python3 -> 2: must implicitly convert to bits
bits = bytes(string).split(b'%')
if len(bits) == 1:
return string
res = [bits[0]]
append = res.append
for item in bits[1:]:
try:
append(item[:2].decode('hex'))
append(item[2:])
except:
append(b'%')
append(item)
return b''.join(res)
compat_urllib_parse_asciire = re.compile('([\x00-\x7f]+)')
def new_compat_urllib_parse_unquote(string, encoding='utf-8', errors='replace'):
"""Replace %xx escapes by their single-character equivalent. The optional
encoding and errors parameters specify how to decode percent-encoded
sequences into Unicode characters, as accepted by the bytes.decode()
method.
By default, percent-encoded sequences are decoded with UTF-8, and invalid
sequences are replaced by a placeholder character.
unquote('abc%20def') -> 'abc def'.
"""
if '%' not in string:
string.split
return string
if encoding is None:
encoding = 'utf-8'
if errors is None:
errors = 'replace'
bits = compat_urllib_parse_asciire.split(string)
res = [bits[0]]
append = res.append
for i in range(1, len(bits), 2):
foo = compat_urllib_parse_unquote_to_bytes(bits[i])
foo = foo.decode(encoding, errors)
append(foo)
if bits[i + 1]:
bar = bits[i + 1]
if not isinstance(bar, unicode):
bar = bar.decode('utf-8')
append(bar)
return ''.join(res)
def old_compat_urllib_parse_unquote(string, encoding='utf-8', errors='replace'):
if string == '':
return string
res = string.split('%')
if len(res) == 1:
return string
if encoding is None:
encoding = 'utf-8'
if errors is None:
errors = 'replace'
# pct_sequence: contiguous sequence of percent-encoded bytes, decoded
pct_sequence = b''
string = res[0]
for item in res[1:]:
try:
if not item:
raise ValueError
if not re.match('[0-9a-fA-F][0-9a-fA-F]',item[:2]):
raise ValueError
pct_sequence += item[:2].decode('hex')
rest = item[2:]
if not rest:
# This segment was just a single percent-encoded character.
# May be part of a sequence of code units, so delay decoding.
# (Stored in pct_sequence).
continue
except ValueError:
rest = '%' + item
# Encountered non-percent-encoded characters. Flush the current
# pct_sequence.
string += pct_sequence.decode(encoding, errors) + rest
pct_sequence = b''
if pct_sequence:
# Flush the final pct_sequence
string += pct_sequence.decode(encoding, errors)
return string
compat_urllib_parse_unquote = new_compat_urllib_parse_unquote
2015-02-01 04:30:56 -06:00
try:
compat_str = unicode # Python 2
except NameError:
compat_str = str
try:
compat_basestring = basestring # Python 2
2015-02-01 04:30:56 -06:00
except NameError:
compat_basestring = str
2015-02-01 04:30:56 -06:00
try:
compat_chr = unichr # Python 2
except NameError:
compat_chr = chr
try:
from xml.etree.ElementTree import ParseError as compat_xml_parse_error
except ImportError: # Python 2.6
from xml.parsers.expat import ExpatError as compat_xml_parse_error
try:
from urllib.parse import parse_qs as compat_parse_qs
2014-11-23 13:41:03 -06:00
except ImportError: # Python 2
# HACK: The following is the correct parse_qs implementation from cpython 3's stdlib.
# Python 2's version is apparently totally broken
def _parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
2014-11-23 14:39:15 -06:00
encoding='utf-8', errors='replace'):
2015-02-01 04:30:56 -06:00
qs, _coerce_result = qs, compat_str
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
r = []
for name_value in pairs:
if not name_value and not strict_parsing:
continue
nv = name_value.split('=', 1)
if len(nv) != 2:
if strict_parsing:
raise ValueError("bad query field: %r" % (name_value,))
# Handle case of a control-name with no equal sign
if keep_blank_values:
nv.append('')
else:
continue
if len(nv[1]) or keep_blank_values:
name = nv[0].replace('+', ' ')
name = compat_urllib_parse_unquote(
name, encoding=encoding, errors=errors)
name = _coerce_result(name)
value = nv[1].replace('+', ' ')
value = compat_urllib_parse_unquote(
value, encoding=encoding, errors=errors)
value = _coerce_result(value)
r.append((name, value))
return r
def compat_parse_qs(qs, keep_blank_values=False, strict_parsing=False,
2014-11-23 14:39:15 -06:00
encoding='utf-8', errors='replace'):
parsed_result = {}
pairs = _parse_qsl(qs, keep_blank_values, strict_parsing,
2014-11-23 14:39:15 -06:00
encoding=encoding, errors=errors)
for name, value in pairs:
if name in parsed_result:
parsed_result[name].append(value)
else:
parsed_result[name] = [value]
return parsed_result
try:
from shlex import quote as shlex_quote
except ImportError: # Python < 3.3
def shlex_quote(s):
if re.match(r'^[-_\w./]+$', s):
return s
else:
return "'" + s.replace("'", "'\"'\"'") + "'"
def compat_ord(c):
2014-11-23 13:41:03 -06:00
if type(c) is int:
return c
else:
return ord(c)
if sys.version_info >= (3, 0):
compat_getenv = os.getenv
compat_expanduser = os.path.expanduser
else:
# Environment variables should be decoded with filesystem encoding.
# Otherwise it will fail if any non-ASCII characters present (see #3854 #3217 #2918)
def compat_getenv(key, default=None):
from .utils import get_filesystem_encoding
env = os.getenv(key, default)
if env:
env = env.decode(get_filesystem_encoding())
return env
# HACK: The default implementations of os.path.expanduser from cpython do not decode
# environment variables with filesystem encoding. We will work around this by
# providing adjusted implementations.
# The following are os.path.expanduser implementations from cpython 2.7.8 stdlib
# for different platforms with correct environment variables decoding.
if os.name == 'posix':
def compat_expanduser(path):
"""Expand ~ and ~user constructions. If user or $HOME is unknown,
do nothing."""
if not path.startswith('~'):
return path
i = path.find('/', 1)
if i < 0:
i = len(path)
if i == 1:
if 'HOME' not in os.environ:
import pwd
userhome = pwd.getpwuid(os.getuid()).pw_dir
else:
userhome = compat_getenv('HOME')
else:
import pwd
try:
pwent = pwd.getpwnam(path[1:i])
except KeyError:
return path
userhome = pwent.pw_dir
userhome = userhome.rstrip('/')
return (userhome + path[i:]) or '/'
elif os.name == 'nt' or os.name == 'ce':
def compat_expanduser(path):
"""Expand ~ and ~user constructs.
If user or $HOME is unknown, do nothing."""
if path[:1] != '~':
return path
i, n = 1, len(path)
while i < n and path[i] not in '/\\':
i = i + 1
if 'HOME' in os.environ:
userhome = compat_getenv('HOME')
elif 'USERPROFILE' in os.environ:
userhome = compat_getenv('USERPROFILE')
2014-12-09 16:11:26 -06:00
elif 'HOMEPATH' not in os.environ:
return path
else:
try:
drive = compat_getenv('HOMEDRIVE')
except KeyError:
drive = ''
userhome = os.path.join(drive, compat_getenv('HOMEPATH'))
2014-11-23 13:41:03 -06:00
if i != 1: # ~user
userhome = os.path.join(os.path.dirname(userhome), path[1:i])
return userhome + path[i:]
else:
compat_expanduser = os.path.expanduser
if sys.version_info < (3, 0):
def compat_print(s):
from .utils import preferredencoding
print(s.encode(preferredencoding(), 'xmlcharrefreplace'))
else:
def compat_print(s):
2014-11-26 04:48:09 -06:00
assert isinstance(s, compat_str)
print(s)
try:
subprocess_check_output = subprocess.check_output
except AttributeError:
def subprocess_check_output(*args, **kwargs):
assert 'input' not in kwargs
p = subprocess.Popen(*args, stdout=subprocess.PIPE, **kwargs)
output, _ = p.communicate()
ret = p.poll()
if ret:
raise subprocess.CalledProcessError(ret, p.args, output=output)
return output
if sys.version_info < (3, 0) and sys.platform == 'win32':
def compat_getpass(prompt, *args, **kwargs):
if isinstance(prompt, compat_str):
2014-11-02 04:26:40 -06:00
from .utils import preferredencoding
prompt = prompt.encode(preferredencoding())
return getpass.getpass(prompt, *args, **kwargs)
else:
compat_getpass = getpass.getpass
# Old 2.6 and 2.7 releases require kwargs to be bytes
try:
def _testfunc(x):
pass
_testfunc(**{'x': 0})
except TypeError:
def compat_kwargs(kwargs):
return dict((bytes(k), v) for k, v in kwargs.items())
else:
compat_kwargs = lambda kwargs: kwargs
if sys.version_info < (2, 7):
def compat_socket_create_connection(address, timeout, source_address=None):
host, port = address
err = None
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")
else:
compat_socket_create_connection = socket.create_connection
# Fix https://github.com/rg3/youtube-dl/issues/4223
# See http://bugs.python.org/issue9161 for what is broken
def workaround_optparse_bug9161():
op = optparse.OptionParser()
og = optparse.OptionGroup(op, 'foo')
try:
og.add_option('-t')
2014-11-20 09:35:55 -06:00
except TypeError:
real_add_option = optparse.OptionGroup.add_option
def _compat_add_option(self, *args, **kwargs):
enc = lambda v: (
v.encode('ascii', 'replace') if isinstance(v, compat_str)
else v)
bargs = [enc(a) for a in args]
bkwargs = dict(
(k, enc(v)) for k, v in kwargs.items())
return real_add_option(self, *bargs, **bkwargs)
optparse.OptionGroup.add_option = _compat_add_option
if hasattr(shutil, 'get_terminal_size'): # Python >= 3.3
compat_get_terminal_size = shutil.get_terminal_size
else:
_terminal_size = collections.namedtuple('terminal_size', ['columns', 'lines'])
def compat_get_terminal_size():
columns = compat_getenv('COLUMNS', None)
if columns:
columns = int(columns)
else:
columns = None
lines = compat_getenv('LINES', None)
if lines:
lines = int(lines)
else:
lines = None
try:
sp = subprocess.Popen(
['stty', 'size'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = sp.communicate()
lines, columns = map(int, out.split())
except Exception:
pass
return _terminal_size(columns, lines)
try:
itertools.count(start=0, step=1)
compat_itertools_count = itertools.count
except TypeError: # Python 2.6
def compat_itertools_count(start=0, step=1):
n = start
while True:
yield n
n += step
__all__ = [
'compat_HTTPError',
'compat_basestring',
'compat_chr',
'compat_cookiejar',
'compat_expanduser',
'compat_get_terminal_size',
'compat_getenv',
'compat_getpass',
'compat_html_entities',
'compat_http_client',
2015-01-29 19:57:37 -06:00
'compat_http_server',
'compat_itertools_count',
'compat_kwargs',
'compat_ord',
'compat_parse_qs',
'compat_print',
'compat_socket_create_connection',
'compat_str',
'compat_subprocess_get_DEVNULL',
'compat_urllib_error',
'compat_urllib_parse',
'compat_urllib_parse_unquote',
'compat_urllib_parse_urlparse',
'compat_urllib_request',
'compat_urlparse',
'compat_urlretrieve',
'compat_xml_parse_error',
'shlex_quote',
'subprocess_check_output',
'workaround_optparse_bug9161',
]