Compare commits

...

7 Commits

Author SHA1 Message Date
Zenon Mousmoulas
e4e8f7c5b8
Merge 3872619ed5 into 37cea84f77 2024-07-02 18:20:20 +03:00
dirkf
37cea84f77 [core,utils] Support unpublicised --no-check-extensions 2024-07-02 15:38:50 +01:00
dirkf
4652109643 [core,utils] Implement unsafe file extension mitigation
* from https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-79w7-vh3h-8g4, thx grub4k
2024-07-02 15:38:50 +01:00
dirkf
3c466186a8 [utils] Back-port Namespace and MEDIA_EXTENSIONS from yt-dlp
Thx pukkandan
* Namespace: https://github.com/yt-dlp/yt-dlp/commit/591bb9d355
* MEDIA_EXTENSIONS: https://github.com/yt-dlp/yt-dlp/commit/8dc5930511
2024-07-02 15:38:50 +01:00
Zenon Mousmoulas
3872619ed5 Ant1NewsGrEmbedIE._extract_urls: Simplify redundant statement 2021-11-13 11:55:12 +02:00
Zenon Mousmoulas
264544f90e Remove unnecessary quote escape 2021-11-13 08:45:46 +02:00
Zenon Mousmoulas
9af0f299bf Add Ant1NewsGr IEs
* Add new IEs
  * Ant1NewsGrBaseIE: Base IE class
  * Ant1NewsGrWatchIE: Extract videos from TV VOD pages
  * Ant1NewsGrArticleIE: Extract videos from news articles
  * Ant1NewsGrEmbedIE: Extract iframe-embeddable ant1news.gr videos
* NB: There is a different platform at vod.antenna.gr, which is not
  covered here
* The Generic extractor can also be used to extract videos from news
  article pages (through Ant1NewsGrEmbed._extract_urls), however a
  specific IE is used to extract JSON-LD @type='NewsArticle' metadata
  * However that does not work currently, as _json_ld does not support
    @graph nesting
* Ant1NewsGrArticleIE defers to Ant1NewsGrEmbedIE, either as a playlist
  or a single video
* Ant1NewsGrWatchIE and Ant1NewsGrEmbedIE query an API endpoint to
  extract metadata, get the respective stream/source URLs and detect
  video formats
  * The endpoint HTTP path varies per IE
  * Ant1NewsGrEmbedIE first resolves any redirects for its' own URL, to
    derive the properl base URL for the API query
2021-11-11 15:47:41 +02:00
8 changed files with 420 additions and 25 deletions

View File

@ -14,9 +14,11 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import io import io
import itertools import itertools
import json import json
import types
import xml.etree.ElementTree import xml.etree.ElementTree
from youtube_dl.utils import ( from youtube_dl.utils import (
_UnsafeExtensionError,
age_restricted, age_restricted,
args_to_str, args_to_str,
base_url, base_url,
@ -270,6 +272,27 @@ class TestUtil(unittest.TestCase):
expand_path('~/%s' % env('YOUTUBE_DL_EXPATH_PATH')), expand_path('~/%s' % env('YOUTUBE_DL_EXPATH_PATH')),
'%s/expanded' % compat_getenv('HOME')) '%s/expanded' % compat_getenv('HOME'))
_uncommon_extensions = [
('exe', 'abc.exe.ext'),
('de', 'abc.de.ext'),
('../.mp4', None),
('..\\.mp4', None),
]
def assertUnsafeExtension(self, ext=None):
assert_raises = self.assertRaises(_UnsafeExtensionError)
assert_raises.ext = ext
orig_exit = assert_raises.__exit__
def my_exit(self_, exc_type, exc_val, exc_tb):
did_raise = orig_exit(exc_type, exc_val, exc_tb)
if did_raise and assert_raises.ext is not None:
self.assertEqual(assert_raises.ext, assert_raises.exception.extension, 'Unsafe extension not as unexpected')
return did_raise
assert_raises.__exit__ = types.MethodType(my_exit, assert_raises)
return assert_raises
def test_prepend_extension(self): def test_prepend_extension(self):
self.assertEqual(prepend_extension('abc.ext', 'temp'), 'abc.temp.ext') self.assertEqual(prepend_extension('abc.ext', 'temp'), 'abc.temp.ext')
self.assertEqual(prepend_extension('abc.ext', 'temp', 'ext'), 'abc.temp.ext') self.assertEqual(prepend_extension('abc.ext', 'temp', 'ext'), 'abc.temp.ext')
@ -278,6 +301,19 @@ class TestUtil(unittest.TestCase):
self.assertEqual(prepend_extension('.abc', 'temp'), '.abc.temp') self.assertEqual(prepend_extension('.abc', 'temp'), '.abc.temp')
self.assertEqual(prepend_extension('.abc.ext', 'temp'), '.abc.temp.ext') self.assertEqual(prepend_extension('.abc.ext', 'temp'), '.abc.temp.ext')
# Test uncommon extensions
self.assertEqual(prepend_extension('abc.ext', 'bin'), 'abc.bin.ext')
for ext, result in self._uncommon_extensions:
with self.assertUnsafeExtension(ext):
prepend_extension('abc', ext)
if result:
self.assertEqual(prepend_extension('abc.ext', ext, 'ext'), result)
else:
with self.assertUnsafeExtension(ext):
prepend_extension('abc.ext', ext, 'ext')
with self.assertUnsafeExtension(ext):
prepend_extension('abc.unexpected_ext', ext, 'ext')
def test_replace_extension(self): def test_replace_extension(self):
self.assertEqual(replace_extension('abc.ext', 'temp'), 'abc.temp') self.assertEqual(replace_extension('abc.ext', 'temp'), 'abc.temp')
self.assertEqual(replace_extension('abc.ext', 'temp', 'ext'), 'abc.temp') self.assertEqual(replace_extension('abc.ext', 'temp', 'ext'), 'abc.temp')
@ -286,6 +322,16 @@ class TestUtil(unittest.TestCase):
self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp') self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp')
self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp') self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp')
# Test uncommon extensions
self.assertEqual(replace_extension('abc.ext', 'bin'), 'abc.unknown_video')
for ext, _ in self._uncommon_extensions:
with self.assertUnsafeExtension(ext):
replace_extension('abc', ext)
with self.assertUnsafeExtension(ext):
replace_extension('abc.ext', ext, 'ext')
with self.assertUnsafeExtension(ext):
replace_extension('abc.unexpected_ext', ext, 'ext')
def test_subtitles_filename(self): def test_subtitles_filename(self):
self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt'), 'abc.en.vtt') self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt'), 'abc.en.vtt')
self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt', 'ext'), 'abc.en.vtt') self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt', 'ext'), 'abc.en.vtt')

View File

@ -7,6 +7,7 @@ import collections
import copy import copy
import datetime import datetime
import errno import errno
import functools
import io import io
import itertools import itertools
import json import json
@ -53,6 +54,7 @@ from .compat import (
compat_urllib_request_DataHandler, compat_urllib_request_DataHandler,
) )
from .utils import ( from .utils import (
_UnsafeExtensionError,
age_restricted, age_restricted,
args_to_str, args_to_str,
bug_reports_message, bug_reports_message,
@ -129,6 +131,20 @@ if compat_os_name == 'nt':
import ctypes import ctypes
def _catch_unsafe_file_extension(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except _UnsafeExtensionError as error:
self.report_error(
'{0} found; to avoid damaging your system, this value is disallowed.'
' If you believe this is an error{1}').format(
error.message, bug_reports_message(','))
return wrapper
class YoutubeDL(object): class YoutubeDL(object):
"""YoutubeDL class. """YoutubeDL class.
@ -1925,6 +1941,7 @@ class YoutubeDL(object):
if self.params.get('forcejson', False): if self.params.get('forcejson', False):
self.to_stdout(json.dumps(self.sanitize_info(info_dict))) self.to_stdout(json.dumps(self.sanitize_info(info_dict)))
@_catch_unsafe_file_extension
def process_info(self, info_dict): def process_info(self, info_dict):
"""Process a single resolved IE result.""" """Process a single resolved IE result."""

View File

@ -21,6 +21,7 @@ from .compat import (
workaround_optparse_bug9161, workaround_optparse_bug9161,
) )
from .utils import ( from .utils import (
_UnsafeExtensionError,
DateRange, DateRange,
decodeOption, decodeOption,
DEFAULT_OUTTMPL, DEFAULT_OUTTMPL,
@ -173,6 +174,9 @@ def _real_main(argv=None):
if opts.ap_mso and opts.ap_mso not in MSO_INFO: if opts.ap_mso and opts.ap_mso not in MSO_INFO:
parser.error('Unsupported TV Provider, use --ap-list-mso to get a list of supported TV Providers') parser.error('Unsupported TV Provider, use --ap-list-mso to get a list of supported TV Providers')
if opts.no_check_extensions:
_UnsafeExtensionError.lenient = True
def parse_retries(retries): def parse_retries(retries):
if retries in ('inf', 'infinite'): if retries in ('inf', 'infinite'):
parsed_retries = float('inf') parsed_retries = float('inf')

View File

@ -0,0 +1,188 @@
# coding: utf-8
from __future__ import unicode_literals
import re
from .common import InfoExtractor
from ..compat import (
compat_parse_qs,
compat_urllib_parse_urlparse,
compat_urlparse,
)
from ..utils import (
HEADRequest,
ExtractorError,
determine_ext,
smuggle_url,
unsmuggle_url,
unescapeHTML,
)
class Ant1NewsGrBaseIE(InfoExtractor):
@staticmethod
def _smuggle_parent_info(url, **info_dict):
return smuggle_url(url, {'parent_info': info_dict})
@staticmethod
def _unsmuggle_parent_info(url):
unsmuggled_url, data = unsmuggle_url(url, default={'parent_info': {}})
return unsmuggled_url, data['parent_info']
def _download_api_data(self, netloc, cid, scheme='https'):
url_parts = (scheme, netloc, self._API_PATH, None, None, None)
url = compat_urlparse.urlunparse(url_parts)
query = {'cid': cid}
return self._download_json(
url, cid,
'Downloading JSON',
'Unable to download JSON',
query=query)
def _download_and_extract_api_data(self, video_id, *args, **kwargs):
info = self._download_api_data(*args, **kwargs)
try:
source = info['url']
except KeyError:
raise ExtractorError('no source found for %s' % video_id)
formats = self._extract_m3u8_formats(source, video_id, 'mp4') \
if determine_ext(source) == 'm3u8' else [source]
self._sort_formats(formats)
return {
'id': video_id,
'title': info['title'],
'thumbnail': info['thumb'],
'formats': formats,
}
class Ant1NewsGrWatchIE(Ant1NewsGrBaseIE):
IE_NAME = 'ant1newsgr:watch'
IE_DESC = 'ant1news.gr videos'
_VALID_URL = r'https?://(?:www\.)?ant1news\.gr/watch/(?P<id>\d+)/'
_API_PATH = '/templates/data/player'
_TEST = {
'url': 'https://www.ant1news.gr/watch/1506168/ant1-news-09112021-stis-18-45',
'md5': '60a984da5ffc98c9924e6d9dd46c6f04',
'info_dict': {
'id': '1506168',
'ext': 'mp4',
'title': 'md5:0ad00fa66ecf8aa233d26ab0dba7514a',
'description': 'md5:18665af715a6dcfeac1d6153a44f16b0',
},
}
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
scheme, netloc, _, _, _, _ = compat_urllib_parse_urlparse(url)
info = self._download_and_extract_api_data(
video_id, netloc, video_id, scheme=scheme)
info['description'] = self._og_search_description(webpage)
return info
class Ant1NewsGrArticleIE(Ant1NewsGrBaseIE):
IE_NAME = 'ant1newsgr:article'
IE_DESC = 'ant1news.gr articles'
_VALID_URL = r'https?://(?:www\.)?ant1news\.gr/[^/]+/article/(?P<id>\d+)/'
_TESTS = [{
'url': 'https://www.ant1news.gr/afieromata/article/549468/o-tzeims-mpont-sta-meteora-oi-apeiles-kai-o-xesikomos-ton-kalogeron',
'md5': 'eb635a194c15272c2611a751766b0200',
'info_dict': {
'id': '_xvg/m_cmbatw=',
'ext': 'mp4',
'title': 'md5:a93e8ecf2e4073bfdffcb38f59945411',
},
'expected_warnings': [r'^[Uu]nable to extract JSON-LD'],
}, {
'url': 'https://ant1news.gr/Society/article/620286/symmoria-anilikon-dikigoros-thymaton-ithelan-na-toys-apoteleiosoyn',
'info_dict': {
'id': '620286',
'title': 'md5:91fe569e952e4d146485740ae927662b',
},
'expected_warnings': [r'^[Uu]nable to extract JSON-LD'],
'playlist_mincount': 2,
'params': {
'skip_download': True,
},
}]
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
info = self._search_json_ld(webpage, video_id,
expected_type='NewsArticle',
fatal=False)
# workaround as _json_ld does not recognize @graph nesting
if not info:
info['title'] = self._og_search_title(webpage)
embed_urls = list(Ant1NewsGrEmbedIE._extract_urls(webpage, url, **info))
if not embed_urls:
raise ExtractorError('no videos found for %s' % video_id)
if len(embed_urls) == 1:
return self.url_result(embed_urls[0], ie=Ant1NewsGrEmbedIE.ie_key(),
video_title=info['title'])
return self.playlist_from_matches(
embed_urls, video_id, info['title'], ie=Ant1NewsGrEmbedIE.ie_key())
class Ant1NewsGrEmbedIE(Ant1NewsGrBaseIE):
IE_NAME = 'ant1newsgr:embed'
IE_DESC = 'ant1news.gr embedded videos'
_VALID_URL = r'''(?x)https?://(?:[a-zA-Z0-9\-]+\.)?
(?:antenna|ant1news)\.gr/templates/pages/player
\?(?:(?:cid=(?P<id>[^&#]+)|[^&=#]+=[^&#]+)&?)+'''
_API_PATH = '/news/templates/data/jsonPlayer'
_TEST = {
'url': 'https://www.antenna.gr/templates/pages/player?cid=3f_li_c_az_jw_y_u=&w=670&h=377',
'md5': '12872b12af18b5dbf76528786728de8c',
'info_dict': {
'id': '3f_li_c_az_jw_y_u=',
'ext': 'mp4',
'title': 'md5:a30c93332455f53e1e84ae0724f0adf7',
},
}
@classmethod
def _extract_urls(cls, webpage, origin_url=None, **parent_info):
# make the scheme in _VALID_URL optional
_URL_RE = r'(?:https?:)?//' + cls._VALID_URL.split('://', 1)[1]
# simplify the query string part of _VALID_URL; after extracting iframe
# src, the URL will be matched again
_URL_RE = _URL_RE.split(r'\?', 1)[0] + r'\?(?:(?!(?P=_q1)).)+'
EMBED_RE = r'''(?x)
<iframe[^>]+?src=(?P<_q1>%(quot_re)s)(?P<url>%(url_re)s)(?P=_q1)
''' % {'quot_re': r'["\']', 'url_re': _URL_RE}
for mobj in re.finditer(EMBED_RE, webpage):
url = unescapeHTML(mobj.group('url'))
if url.startswith('//'):
scheme = compat_urllib_parse_urlparse(origin_url).scheme \
if origin_url else 'https'
url = '%s:%s' % (scheme, url)
if not cls.suitable(url):
continue
yield cls._smuggle_parent_info(url, **parent_info)
def _real_extract(self, url):
url, parent_info = type(self)._unsmuggle_parent_info(url)
video_id = self._match_id(url)
# resolve any redirects, to derive the proper base URL for the API query
canonical_url = self._request_webpage(
HEADRequest(url), video_id,
note='Resolve canonical player URL',
errnote='Could not resolve canonical player URL').geturl()
scheme, netloc, _, _, query, _ = compat_urllib_parse_urlparse(
canonical_url)
query = compat_parse_qs(query)
cid = query['cid'][0]
info = self._download_and_extract_api_data(
video_id, netloc, cid, scheme=scheme)
if 'timestamp' not in info and 'timestamp' in parent_info:
info['timestamp'] = parent_info['timestamp']
return info

View File

@ -1078,6 +1078,11 @@ from .rutube import (
RutubePersonIE, RutubePersonIE,
RutubePlaylistIE, RutubePlaylistIE,
) )
from .ant1newsgr import (
Ant1NewsGrWatchIE,
Ant1NewsGrArticleIE,
Ant1NewsGrEmbedIE,
)
from .rutv import RUTVIE from .rutv import RUTVIE
from .ruutu import RuutuIE from .ruutu import RuutuIE
from .ruv import RuvIE from .ruv import RuvIE

View File

@ -102,6 +102,7 @@ from .ustream import UstreamIE
from .arte import ArteTVEmbedIE from .arte import ArteTVEmbedIE
from .videopress import VideoPressIE from .videopress import VideoPressIE
from .rutube import RutubeIE from .rutube import RutubeIE
from .ant1newsgr import Ant1NewsGrEmbedIE
from .limelight import LimelightBaseIE from .limelight import LimelightBaseIE
from .anvato import AnvatoIE from .anvato import AnvatoIE
from .washingtonpost import WashingtonPostIE from .washingtonpost import WashingtonPostIE
@ -3400,6 +3401,13 @@ class GenericIE(InfoExtractor):
return self.playlist_from_matches( return self.playlist_from_matches(
rutube_urls, video_id, video_title, ie=RutubeIE.ie_key()) rutube_urls, video_id, video_title, ie=RutubeIE.ie_key())
# Look for ant1news.gr embeds
ant1newsgr_urls = list(Ant1NewsGrEmbedIE._extract_urls(webpage, url,
title=video_title))
if ant1newsgr_urls:
return self.playlist_from_matches(
ant1newsgr_urls, video_id, video_title, ie=Ant1NewsGrEmbedIE.ie_key())
# Look for WashingtonPost embeds # Look for WashingtonPost embeds
wapo_urls = WashingtonPostIE._extract_urls(webpage) wapo_urls = WashingtonPostIE._extract_urls(webpage)
if wapo_urls: if wapo_urls:

View File

@ -533,6 +533,10 @@ def parseOpts(overrideArguments=None):
'--no-check-certificate', '--no-check-certificate',
action='store_true', dest='no_check_certificate', default=False, action='store_true', dest='no_check_certificate', default=False,
help='Suppress HTTPS certificate validation') help='Suppress HTTPS certificate validation')
workarounds.add_option(
'--no-check-extensions',
action='store_true', dest='no_check_extensions', default=False,
help='Suppress file extension validation')
workarounds.add_option( workarounds.add_option(
'--prefer-insecure', '--prefer-insecure',
'--prefer-unsecure', action='store_true', dest='prefer_insecure', '--prefer-unsecure', action='store_true', dest='prefer_insecure',

View File

@ -1717,21 +1717,6 @@ TIMEZONE_NAMES = {
'PST': -8, 'PDT': -7 # Pacific 'PST': -8, 'PDT': -7 # Pacific
} }
KNOWN_EXTENSIONS = (
'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
'flv', 'f4v', 'f4a', 'f4b',
'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
'mkv', 'mka', 'mk3d',
'avi', 'divx',
'mov',
'asf', 'wmv', 'wma',
'3gp', '3g2',
'mp3',
'flac',
'ape',
'wav',
'f4f', 'f4m', 'm3u8', 'smil')
# needed for sanitizing filenames in restricted mode # needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ', ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'], itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
@ -3959,19 +3944,22 @@ def parse_duration(s):
return duration return duration
def prepend_extension(filename, ext, expected_real_ext=None): def _change_extension(prepend, filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename) name, real_ext = os.path.splitext(filename)
return ( sanitize_extension = _UnsafeExtensionError.sanitize_extension
'{0}.{1}{2}'.format(name, ext, real_ext)
if not expected_real_ext or real_ext[1:] == expected_real_ext if not expected_real_ext or real_ext.partition('.')[0::2] == ('', expected_real_ext):
else '{0}.{1}'.format(filename, ext)) filename = name
if prepend and real_ext:
sanitize_extension(ext, prepend=prepend)
return ''.join((filename, '.', ext, real_ext))
# Mitigate path traversal and file impersonation attacks
return '.'.join((filename, sanitize_extension(ext)))
def replace_extension(filename, ext, expected_real_ext=None): prepend_extension = functools.partial(_change_extension, True)
name, real_ext = os.path.splitext(filename) replace_extension = functools.partial(_change_extension, False)
return '{0}.{1}'.format(
name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
ext)
def check_executable(exe, args=[]): def check_executable(exe, args=[]):
@ -6561,3 +6549,138 @@ def join_nonempty(*values, **kwargs):
if from_dict is not None: if from_dict is not None:
values = (traverse_obj(from_dict, variadic(v)) for v in values) values = (traverse_obj(from_dict, variadic(v)) for v in values)
return delim.join(map(compat_str, filter(None, values))) return delim.join(map(compat_str, filter(None, values)))
class Namespace(object):
"""Immutable namespace"""
def __init__(self, **kw_attr):
self.__dict__.update(kw_attr)
def __iter__(self):
return iter(self.__dict__.values())
@property
def items_(self):
return self.__dict__.items()
MEDIA_EXTENSIONS = Namespace(
common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma', 'weba'),
thumbnails=('jpg', 'png', 'webp'),
# storyboards=('mhtml', ),
subtitles=('srt', 'vtt', 'ass', 'lrc', 'ttml'),
manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
)
MEDIA_EXTENSIONS.video = MEDIA_EXTENSIONS.common_video + MEDIA_EXTENSIONS.video
MEDIA_EXTENSIONS.audio = MEDIA_EXTENSIONS.common_audio + MEDIA_EXTENSIONS.audio
KNOWN_EXTENSIONS = (
MEDIA_EXTENSIONS.video + MEDIA_EXTENSIONS.audio
+ MEDIA_EXTENSIONS.manifests
)
class _UnsafeExtensionError(Exception):
"""
Mitigation exception for unwanted file overwrite/path traversal
Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-79w7-vh3h-8g4j
"""
_ALLOWED_EXTENSIONS = frozenset(itertools.chain(
( # internal
'description',
'json',
'meta',
'orig',
'part',
'temp',
'uncut',
'unknown_video',
'ytdl',
),
# video
MEDIA_EXTENSIONS.video, (
'avif',
'ismv',
'm2ts',
'm4s',
'mng',
'mpeg',
'qt',
'swf',
'ts',
'vp9',
'wvm',
),
# audio
MEDIA_EXTENSIONS.audio, (
'isma',
'mid',
'mpga',
'ra',
),
# image
MEDIA_EXTENSIONS.thumbnails, (
'bmp',
'gif',
'ico',
'heic',
'jng',
'jpeg',
'jxl',
'svg',
'tif',
'wbmp',
),
# subtitle
MEDIA_EXTENSIONS.subtitles, (
'dfxp',
'fs',
'ismt',
'sami',
'scc',
'ssa',
'tt',
),
# others
MEDIA_EXTENSIONS.manifests,
(
# not used in yt-dl
# *MEDIA_EXTENSIONS.storyboards,
# 'desktop',
# 'ism',
# 'm3u',
# 'sbv',
# 'swp',
# 'url',
# 'webloc',
# 'xml',
)))
def __init__(self, extension):
super(_UnsafeExtensionError, self).__init__('unsafe file extension: {0!r}'.format(extension))
self.extension = extension
# support --no-check-extensions
lenient = False
@classmethod
def sanitize_extension(cls, extension, **kwargs):
# ... /, *, prepend=False
prepend = kwargs.get('prepend', False)
if '/' in extension or '\\' in extension:
raise cls(extension)
if not prepend:
last = extension.rpartition('.')[-1]
if last == 'bin':
extension = last = 'unknown_video'
if not (cls.lenient or last.lower() in cls._ALLOWED_EXTENSIONS):
raise cls(extension)
return extension