Compare commits

...

13 Commits

Author SHA1 Message Date
edutel
637190cfe9
Merge ea4948068d into 37cea84f77 2024-07-02 11:17:19 -06:00
dirkf
37cea84f77 [core,utils] Support unpublicised --no-check-extensions 2024-07-02 15:38:50 +01:00
dirkf
4652109643 [core,utils] Implement unsafe file extension mitigation
* from https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-79w7-vh3h-8g4, thx grub4k
2024-07-02 15:38:50 +01:00
dirkf
3c466186a8 [utils] Back-port Namespace and MEDIA_EXTENSIONS from yt-dlp
Thx pukkandan
* Namespace: https://github.com/yt-dlp/yt-dlp/commit/591bb9d355
* MEDIA_EXTENSIONS: https://github.com/yt-dlp/yt-dlp/commit/8dc5930511
2024-07-02 15:38:50 +01:00
dirkf
ea4948068d
Linted 2022-10-29 16:10:56 +00:00
dirkf
eff6cd4c24
Improve course/category extraction 2022-10-29 15:57:14 +00:00
dirkf
0a99e9f59d
Linted 2022-10-29 07:49:38 +01:00
dirkf
04a7c7a849
Fix test 2022-10-29 07:32:52 +01:00
dirkf
dc80f50f7e
Outdent for linter 2022-10-29 01:44:44 +00:00
dirkf
1ce8590329
Further improve extraction
* detect when login required
* extract further metadata
2022-10-28 22:43:54 +00:00
dirkf
0235e627b9
Improve extraction
* use User-Agent Mozilla/5.0
* use Referer for manifests and downloads
* finalise review comments
2022-10-28 21:55:44 +00:00
EduTel
3ee378c099 refactor and fix 2022-01-23 14:36:52 -06:00
EduTel
6abc344f22 fix platzi 2022-01-21 18:01:06 -06:00
6 changed files with 397 additions and 72 deletions

View File

@ -14,9 +14,11 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import io import io
import itertools import itertools
import json import json
import types
import xml.etree.ElementTree import xml.etree.ElementTree
from youtube_dl.utils import ( from youtube_dl.utils import (
_UnsafeExtensionError,
age_restricted, age_restricted,
args_to_str, args_to_str,
base_url, base_url,
@ -270,6 +272,27 @@ class TestUtil(unittest.TestCase):
expand_path('~/%s' % env('YOUTUBE_DL_EXPATH_PATH')), expand_path('~/%s' % env('YOUTUBE_DL_EXPATH_PATH')),
'%s/expanded' % compat_getenv('HOME')) '%s/expanded' % compat_getenv('HOME'))
_uncommon_extensions = [
('exe', 'abc.exe.ext'),
('de', 'abc.de.ext'),
('../.mp4', None),
('..\\.mp4', None),
]
def assertUnsafeExtension(self, ext=None):
assert_raises = self.assertRaises(_UnsafeExtensionError)
assert_raises.ext = ext
orig_exit = assert_raises.__exit__
def my_exit(self_, exc_type, exc_val, exc_tb):
did_raise = orig_exit(exc_type, exc_val, exc_tb)
if did_raise and assert_raises.ext is not None:
self.assertEqual(assert_raises.ext, assert_raises.exception.extension, 'Unsafe extension not as unexpected')
return did_raise
assert_raises.__exit__ = types.MethodType(my_exit, assert_raises)
return assert_raises
def test_prepend_extension(self): def test_prepend_extension(self):
self.assertEqual(prepend_extension('abc.ext', 'temp'), 'abc.temp.ext') self.assertEqual(prepend_extension('abc.ext', 'temp'), 'abc.temp.ext')
self.assertEqual(prepend_extension('abc.ext', 'temp', 'ext'), 'abc.temp.ext') self.assertEqual(prepend_extension('abc.ext', 'temp', 'ext'), 'abc.temp.ext')
@ -278,6 +301,19 @@ class TestUtil(unittest.TestCase):
self.assertEqual(prepend_extension('.abc', 'temp'), '.abc.temp') self.assertEqual(prepend_extension('.abc', 'temp'), '.abc.temp')
self.assertEqual(prepend_extension('.abc.ext', 'temp'), '.abc.temp.ext') self.assertEqual(prepend_extension('.abc.ext', 'temp'), '.abc.temp.ext')
# Test uncommon extensions
self.assertEqual(prepend_extension('abc.ext', 'bin'), 'abc.bin.ext')
for ext, result in self._uncommon_extensions:
with self.assertUnsafeExtension(ext):
prepend_extension('abc', ext)
if result:
self.assertEqual(prepend_extension('abc.ext', ext, 'ext'), result)
else:
with self.assertUnsafeExtension(ext):
prepend_extension('abc.ext', ext, 'ext')
with self.assertUnsafeExtension(ext):
prepend_extension('abc.unexpected_ext', ext, 'ext')
def test_replace_extension(self): def test_replace_extension(self):
self.assertEqual(replace_extension('abc.ext', 'temp'), 'abc.temp') self.assertEqual(replace_extension('abc.ext', 'temp'), 'abc.temp')
self.assertEqual(replace_extension('abc.ext', 'temp', 'ext'), 'abc.temp') self.assertEqual(replace_extension('abc.ext', 'temp', 'ext'), 'abc.temp')
@ -286,6 +322,16 @@ class TestUtil(unittest.TestCase):
self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp') self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp')
self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp') self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp')
# Test uncommon extensions
self.assertEqual(replace_extension('abc.ext', 'bin'), 'abc.unknown_video')
for ext, _ in self._uncommon_extensions:
with self.assertUnsafeExtension(ext):
replace_extension('abc', ext)
with self.assertUnsafeExtension(ext):
replace_extension('abc.ext', ext, 'ext')
with self.assertUnsafeExtension(ext):
replace_extension('abc.unexpected_ext', ext, 'ext')
def test_subtitles_filename(self): def test_subtitles_filename(self):
self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt'), 'abc.en.vtt') self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt'), 'abc.en.vtt')
self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt', 'ext'), 'abc.en.vtt') self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt', 'ext'), 'abc.en.vtt')

View File

@ -7,6 +7,7 @@ import collections
import copy import copy
import datetime import datetime
import errno import errno
import functools
import io import io
import itertools import itertools
import json import json
@ -53,6 +54,7 @@ from .compat import (
compat_urllib_request_DataHandler, compat_urllib_request_DataHandler,
) )
from .utils import ( from .utils import (
_UnsafeExtensionError,
age_restricted, age_restricted,
args_to_str, args_to_str,
bug_reports_message, bug_reports_message,
@ -129,6 +131,20 @@ if compat_os_name == 'nt':
import ctypes import ctypes
def _catch_unsafe_file_extension(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except _UnsafeExtensionError as error:
self.report_error(
'{0} found; to avoid damaging your system, this value is disallowed.'
' If you believe this is an error{1}').format(
error.message, bug_reports_message(','))
return wrapper
class YoutubeDL(object): class YoutubeDL(object):
"""YoutubeDL class. """YoutubeDL class.
@ -1925,6 +1941,7 @@ class YoutubeDL(object):
if self.params.get('forcejson', False): if self.params.get('forcejson', False):
self.to_stdout(json.dumps(self.sanitize_info(info_dict))) self.to_stdout(json.dumps(self.sanitize_info(info_dict)))
@_catch_unsafe_file_extension
def process_info(self, info_dict): def process_info(self, info_dict):
"""Process a single resolved IE result.""" """Process a single resolved IE result."""

View File

@ -21,6 +21,7 @@ from .compat import (
workaround_optparse_bug9161, workaround_optparse_bug9161,
) )
from .utils import ( from .utils import (
_UnsafeExtensionError,
DateRange, DateRange,
decodeOption, decodeOption,
DEFAULT_OUTTMPL, DEFAULT_OUTTMPL,
@ -173,6 +174,9 @@ def _real_main(argv=None):
if opts.ap_mso and opts.ap_mso not in MSO_INFO: if opts.ap_mso and opts.ap_mso not in MSO_INFO:
parser.error('Unsupported TV Provider, use --ap-list-mso to get a list of supported TV Providers') parser.error('Unsupported TV Provider, use --ap-list-mso to get a list of supported TV Providers')
if opts.no_check_extensions:
_UnsafeExtensionError.lenient = True
def parse_retries(retries): def parse_retries(retries):
if retries in ('inf', 'infinite'): if retries in ('inf', 'infinite'):
parsed_retries = float('inf') parsed_retries = float('inf')

View File

@ -1,16 +1,23 @@
# coding: utf-8 # coding: utf-8
from __future__ import unicode_literals from __future__ import unicode_literals
import re
from .common import InfoExtractor from .common import InfoExtractor
from ..compat import ( from ..compat import (
compat_b64decode, compat_kwargs,
compat_str, compat_str,
compat_urllib_parse_urlparse,
) )
from ..utils import ( from ..utils import (
clean_html, clean_html,
dict_get,
ExtractorError, ExtractorError,
get_element_by_class,
int_or_none, int_or_none,
parse_iso8601,
str_or_none, str_or_none,
strip_or_none,
try_get, try_get,
url_or_none, url_or_none,
urlencode_postdata, urlencode_postdata,
@ -22,6 +29,42 @@ class PlatziBaseIE(InfoExtractor):
_LOGIN_URL = 'https://platzi.com/login/' _LOGIN_URL = 'https://platzi.com/login/'
_NETRC_MACHINE = 'platzi' _NETRC_MACHINE = 'platzi'
def _raise_extractor_error(self, video_id, reason, expected=True):
raise ExtractorError('[%s] %s: %s' % (self.IE_NAME, video_id, reason), expected=expected)
def _download_webpage(self, url_or_request, video_id, *args, **kwargs):
# CF likes Connection: keep-alive and so disfavours Py2
# retry on 403 may get in
kwargs['expected_status'] = 403
# header parameters required fpor Py3 to breach site's CF fence w/o 403
headers = kwargs.get('headers') or {}
new_hdrs = {}
if 'User-Agent' not in headers:
headers['User-Agent'] = 'Mozilla/5.0' # (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.0.0 Safari/537.36'
kwargs['headers'] = new_hdrs = headers
if new_hdrs:
kwargs = compat_kwargs(kwargs)
for _ in range(2):
x = super(PlatziBaseIE, self)._download_webpage_handle(url_or_request, video_id, *args, **kwargs)
if x is False:
return x
if x[1].getcode() != 403:
break
kwargs.pop('expected_status', None)
note = kwargs.pop('note', '')
kwargs['note'] = (note or 'Downloading webpage') + ' - retrying'
kwargs = compat_kwargs(kwargs)
path = compat_urllib_parse_urlparse(x[1].geturl())
if path == '/':
self._raise_extractor_error(video_id, 'Redirected to home page: content expired?')
elif path == '/login':
self.raise_login_required()
else:
errs = clean_html(get_element_by_class('Errorpage-text', x[0]))
if errs:
self._raise_extractor_error(video_id, errs)
return x[0]
def _real_initialize(self): def _real_initialize(self):
self._login() self._login()
@ -75,6 +118,26 @@ class PlatziIE(PlatziBaseIE):
''' '''
_TESTS = [{ _TESTS = [{
'url': 'https://platzi.com/clases/1927-intro-selenium/29383-bienvenida-al-curso',
'md5': '0af120f1ffd18a2246f19099d52b83e2',
'info_dict': {
'id': '29383',
'ext': 'mp4',
'title': 'Por qué aprender Selenium y qué verás',
'description': 'md5:bbe91d2760052ca4054a3149a6580436',
'timestamp': 1627400390,
'upload_date': '20210727',
'creator': 'Héctor Vega',
'series': 'Curso de Introducción a Selenium con Python',
'duration': 11700,
'categories': list,
},
'params': {
'format': 'bestvideo',
# 'skip_download': True,
},
'expected_warnings': ['HTTP Error 401']
}, {
'url': 'https://platzi.com/clases/1311-next-js/12074-creando-nuestra-primera-pagina/', 'url': 'https://platzi.com/clases/1311-next-js/12074-creando-nuestra-primera-pagina/',
'md5': '8f56448241005b561c10f11a595b37e3', 'md5': '8f56448241005b561c10f11a595b37e3',
'info_dict': { 'info_dict': {
@ -84,7 +147,7 @@ class PlatziIE(PlatziBaseIE):
'description': 'md5:4c866e45034fc76412fbf6e60ae008bc', 'description': 'md5:4c866e45034fc76412fbf6e60ae008bc',
'duration': 420, 'duration': 420,
}, },
'skip': 'Requires platzi account credentials', 'skip': 'Content expired',
}, { }, {
'url': 'https://courses.platzi.com/classes/1367-communication-codestream/13430-background/', 'url': 'https://courses.platzi.com/classes/1367-communication-codestream/13430-background/',
'info_dict': { 'info_dict': {
@ -94,10 +157,7 @@ class PlatziIE(PlatziBaseIE):
'description': 'md5:49c83c09404b15e6e71defaf87f6b305', 'description': 'md5:49c83c09404b15e6e71defaf87f6b305',
'duration': 360, 'duration': 360,
}, },
'skip': 'Requires platzi account credentials', 'skip': 'Content expired',
'params': {
'skip_download': True,
},
}] }]
def _real_extract(self, url): def _real_extract(self, url):
@ -105,50 +165,60 @@ class PlatziIE(PlatziBaseIE):
webpage = self._download_webpage(url, lecture_id) webpage = self._download_webpage(url, lecture_id)
data = self._parse_json( data_preloaded_state = self._parse_json(
self._search_regex( self._search_regex(
# client_data may contain "};" so that we have to try more (r'window\s*.\s*__PRELOADED_STATE__\s*=\s*({.*?});?\s*</script'), webpage, 'client data'),
# strict regex first
(r'client_data\s*=\s*({.+?})\s*;\s*\n',
r'client_data\s*=\s*({.+?})\s*;'),
webpage, 'client data'),
lecture_id) lecture_id)
material = data['initialState']['material'] video_player = try_get(data_preloaded_state, lambda x: x['videoPlayer'], dict) or {}
desc = material['description'] title = strip_or_none(video_player.get('name')) or self._og_search_title(webpage)
title = desc['title'] servers = try_get(video_player, lambda x: x['video']['servers'], dict) or {}
if not servers and try_get(video_player, lambda x: x['blockedInfo']['blocked']):
why = video_player['blockedInfo'].get('type') or 'unspecified'
if why == 'unlogged':
self.raise_login_required()
self._raise_extractor_error(lecture_id, 'All video formats blocked because ' + why)
formats = [] formats = []
for server_id, server in material['videos'].items(): headers = {'Referer': url}
if not isinstance(server, dict): extractions = {
'hls': lambda x: formats.extend(self._extract_m3u8_formats(
server_json[x], lecture_id, 'mp4',
entry_protocol='m3u8_native', m3u8_id='hls',
note='Downloading %s m3u8 information' % (server_json.get('id', x), ),
headers=headers, fatal=False)),
'dash': lambda x: formats.extend(self._extract_mpd_formats(
server_json[x], lecture_id, mpd_id='dash',
note='Downloading %s MPD manifest' % (server_json.get('id', x), ),
headers=headers, fatal=False)),
}
for server, server_json in servers.items():
if not isinstance(server_json, dict):
continue continue
for format_id in ('hls', 'dash'): for fmt in server_json.keys():
format_url = url_or_none(server.get(format_id)) extraction = extractions.get(fmt)
if not format_url: if callable(extraction):
continue extraction(fmt)
if format_id == 'hls':
formats.extend(self._extract_m3u8_formats(
format_url, lecture_id, 'mp4',
entry_protocol='m3u8_native', m3u8_id=format_id,
note='Downloading %s m3u8 information' % server_id,
fatal=False))
elif format_id == 'dash':
formats.extend(self._extract_mpd_formats(
format_url, lecture_id, mpd_id=format_id,
note='Downloading %s MPD manifest' % server_id,
fatal=False))
self._sort_formats(formats) self._sort_formats(formats)
for f in formats:
f.setdefault('http_headers', {})['Referer'] = headers['Referer']
content = str_or_none(desc.get('content')) def categories():
description = (clean_html(compat_b64decode(content).decode('utf-8')) cat = strip_or_none(video_player.get('courseCategory'))
if content else None) if cat:
duration = int_or_none(material.get('duration'), invscale=60) return [cat]
return { return {
'id': lecture_id, 'id': lecture_id,
'title': title, 'title': title,
'description': description, 'description': clean_html(video_player.get('courseDescription')) or self._og_search_description(webpage),
'duration': duration, 'duration': int_or_none(video_player.get('duration'), invscale=60),
'thumbnail': url_or_none(video_player.get('thumbnail')) or self._og_search_thumbnail(webpage),
'timestamp': parse_iso8601(dict_get(video_player, ('dateModified', 'datePublished'))),
'creator': strip_or_none(video_player.get('teacherName')) or clean_html(get_element_by_class('TeacherDetails-name', webpage)),
'comment_count': int_or_none(video_player.get('commentsNumber')),
'categories': categories(),
'series': strip_or_none(video_player.get('courseTitle')) or None,
'formats': formats, 'formats': formats,
} }
@ -157,17 +227,35 @@ class PlatziCourseIE(PlatziBaseIE):
_VALID_URL = r'''(?x) _VALID_URL = r'''(?x)
https?:// https?://
(?: (?:
(?P<clas>
platzi\.com/clases| # es version platzi\.com/clases| # es version
courses\.platzi\.com/classes # en version courses\.platzi\.com/classes # en version
)|
platzi\.com(?:/(?P<curs>cursos))?
)/(?P<id>[^/?\#&]+) )/(?P<id>[^/?\#&]+)
''' '''
_TESTS = [{ _TESTS = [{
'url': 'https://platzi.com/web-angular/',
'info_dict': {
'id': 'web-angular',
'title': 'Frontend con Angular',
},
'playlist_count': 9,
}, {
'url': 'https://platzi.com/cursos/angular/',
'info_dict': {
'id': '2478',
'title': 'Curso de Fundamentos de Angular',
},
'playlist_count': 21,
}, {
'url': 'https://platzi.com/clases/next-js/', 'url': 'https://platzi.com/clases/next-js/',
'info_dict': { 'info_dict': {
'id': '1311', 'id': '1311',
'title': 'Curso de Next.js', 'title': 'Curso de Next.js',
}, },
'playlist_count': 22, 'playlist_count': 22,
'skip': 'Oops (updating page)',
}, { }, {
'url': 'https://courses.platzi.com/classes/communication-codestream/', 'url': 'https://courses.platzi.com/classes/communication-codestream/',
'info_dict': { 'info_dict': {
@ -175,23 +263,62 @@ class PlatziCourseIE(PlatziBaseIE):
'title': 'Codestream Course', 'title': 'Codestream Course',
}, },
'playlist_count': 14, 'playlist_count': 14,
'skip': 'Content expired',
}] }]
@classmethod
def _match_valid_url(cls, url):
return re.match(cls._VALID_URL, url)
@classmethod @classmethod
def suitable(cls, url): def suitable(cls, url):
return False if PlatziIE.suitable(url) else super(PlatziCourseIE, cls).suitable(url) return False if PlatziIE.suitable(url) else super(PlatziCourseIE, cls).suitable(url)
def __extract_things(self, webpage, thing_id, thing_pattern):
return self.playlist_from_matches(
re.finditer(thing_pattern, webpage),
playlist_id=thing_id,
playlist_title=self._og_search_title(webpage, default=None),
getter=lambda m: urljoin('https://platzi.com', m.group('path')))
def _extract_classes(self, webpage, course_id):
display_id = course_id
course_id = self._search_regex(
r'''(["'])courseId\1\s*:\s*(?P<id>\d+)''',
webpage, 'course id', group='id', fatal=False) or course_id
return self.__extract_things(
webpage, course_id,
r'''<a\b[^>]+\bhref\s*=\s*['"]?(?P<path>/clases/\d+-%s/[^/]+)'''
% (display_id, ))
def _extract_categories(self, webpage, cat_id):
return self.__extract_things(
webpage, cat_id,
r'''<a\b[^>]+\bhref\s*=\s*['"]?(?P<path>/cursos/[^/]+)''')
def _real_extract(self, url): def _real_extract(self, url):
course_name = self._match_id(url)
webpage = self._download_webpage(url, course_name) m = self._match_valid_url(url)
classes, courses, this_id = m.group('clas', 'curs', 'id')
props = self._parse_json( webpage = self._download_webpage(url, this_id)
self._search_regex(r'data\s*=\s*({.+?})\s*;', webpage, 'data'),
course_name)['initialProps']
if courses:
return self._extract_classes(webpage, this_id)
if not classes:
return self._extract_categories(webpage, this_id)
# this branch now seems always to give "Oops" pages
course_name = this_id
initialData = self._search_regex(
(r'window.initialData\s*=\s*({.+?})\s*;\s*\n', r'window.initialData\s*=\s*({.+?})\s*;'),
webpage, 'initialData')
props = self._parse_json(initialData, course_name, default={})
props = try_get(props, lambda x: x['initialProps'], dict) or {}
entries = [] entries = []
for chapter_num, chapter in enumerate(props['concepts'], 1): for chapter_num, chapter in enumerate(props.get('concepts') or [], 1):
if not isinstance(chapter, dict): if not isinstance(chapter, dict):
continue continue
materials = chapter.get('materials') materials = chapter.get('materials')
@ -221,4 +348,8 @@ class PlatziCourseIE(PlatziBaseIE):
course_id = compat_str(try_get(props, lambda x: x['course']['id'])) course_id = compat_str(try_get(props, lambda x: x['course']['id']))
course_title = try_get(props, lambda x: x['course']['name'], compat_str) course_title = try_get(props, lambda x: x['course']['name'], compat_str)
return self.playlist_result(entries, course_id, course_title) result = self.playlist_result(entries, course_id, course_title)
desc = clean_html(get_element_by_class('RouteDescription-content', webpage))
if desc:
result['description'] = desc
return result

View File

@ -533,6 +533,10 @@ def parseOpts(overrideArguments=None):
'--no-check-certificate', '--no-check-certificate',
action='store_true', dest='no_check_certificate', default=False, action='store_true', dest='no_check_certificate', default=False,
help='Suppress HTTPS certificate validation') help='Suppress HTTPS certificate validation')
workarounds.add_option(
'--no-check-extensions',
action='store_true', dest='no_check_extensions', default=False,
help='Suppress file extension validation')
workarounds.add_option( workarounds.add_option(
'--prefer-insecure', '--prefer-insecure',
'--prefer-unsecure', action='store_true', dest='prefer_insecure', '--prefer-unsecure', action='store_true', dest='prefer_insecure',

View File

@ -1717,21 +1717,6 @@ TIMEZONE_NAMES = {
'PST': -8, 'PDT': -7 # Pacific 'PST': -8, 'PDT': -7 # Pacific
} }
KNOWN_EXTENSIONS = (
'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
'flv', 'f4v', 'f4a', 'f4b',
'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
'mkv', 'mka', 'mk3d',
'avi', 'divx',
'mov',
'asf', 'wmv', 'wma',
'3gp', '3g2',
'mp3',
'flac',
'ape',
'wav',
'f4f', 'f4m', 'm3u8', 'smil')
# needed for sanitizing filenames in restricted mode # needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ', ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'], itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
@ -3959,19 +3944,22 @@ def parse_duration(s):
return duration return duration
def prepend_extension(filename, ext, expected_real_ext=None): def _change_extension(prepend, filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename) name, real_ext = os.path.splitext(filename)
return ( sanitize_extension = _UnsafeExtensionError.sanitize_extension
'{0}.{1}{2}'.format(name, ext, real_ext)
if not expected_real_ext or real_ext[1:] == expected_real_ext if not expected_real_ext or real_ext.partition('.')[0::2] == ('', expected_real_ext):
else '{0}.{1}'.format(filename, ext)) filename = name
if prepend and real_ext:
sanitize_extension(ext, prepend=prepend)
return ''.join((filename, '.', ext, real_ext))
# Mitigate path traversal and file impersonation attacks
return '.'.join((filename, sanitize_extension(ext)))
def replace_extension(filename, ext, expected_real_ext=None): prepend_extension = functools.partial(_change_extension, True)
name, real_ext = os.path.splitext(filename) replace_extension = functools.partial(_change_extension, False)
return '{0}.{1}'.format(
name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
ext)
def check_executable(exe, args=[]): def check_executable(exe, args=[]):
@ -6561,3 +6549,138 @@ def join_nonempty(*values, **kwargs):
if from_dict is not None: if from_dict is not None:
values = (traverse_obj(from_dict, variadic(v)) for v in values) values = (traverse_obj(from_dict, variadic(v)) for v in values)
return delim.join(map(compat_str, filter(None, values))) return delim.join(map(compat_str, filter(None, values)))
class Namespace(object):
"""Immutable namespace"""
def __init__(self, **kw_attr):
self.__dict__.update(kw_attr)
def __iter__(self):
return iter(self.__dict__.values())
@property
def items_(self):
return self.__dict__.items()
MEDIA_EXTENSIONS = Namespace(
common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma', 'weba'),
thumbnails=('jpg', 'png', 'webp'),
# storyboards=('mhtml', ),
subtitles=('srt', 'vtt', 'ass', 'lrc', 'ttml'),
manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
)
MEDIA_EXTENSIONS.video = MEDIA_EXTENSIONS.common_video + MEDIA_EXTENSIONS.video
MEDIA_EXTENSIONS.audio = MEDIA_EXTENSIONS.common_audio + MEDIA_EXTENSIONS.audio
KNOWN_EXTENSIONS = (
MEDIA_EXTENSIONS.video + MEDIA_EXTENSIONS.audio
+ MEDIA_EXTENSIONS.manifests
)
class _UnsafeExtensionError(Exception):
"""
Mitigation exception for unwanted file overwrite/path traversal
Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-79w7-vh3h-8g4j
"""
_ALLOWED_EXTENSIONS = frozenset(itertools.chain(
( # internal
'description',
'json',
'meta',
'orig',
'part',
'temp',
'uncut',
'unknown_video',
'ytdl',
),
# video
MEDIA_EXTENSIONS.video, (
'avif',
'ismv',
'm2ts',
'm4s',
'mng',
'mpeg',
'qt',
'swf',
'ts',
'vp9',
'wvm',
),
# audio
MEDIA_EXTENSIONS.audio, (
'isma',
'mid',
'mpga',
'ra',
),
# image
MEDIA_EXTENSIONS.thumbnails, (
'bmp',
'gif',
'ico',
'heic',
'jng',
'jpeg',
'jxl',
'svg',
'tif',
'wbmp',
),
# subtitle
MEDIA_EXTENSIONS.subtitles, (
'dfxp',
'fs',
'ismt',
'sami',
'scc',
'ssa',
'tt',
),
# others
MEDIA_EXTENSIONS.manifests,
(
# not used in yt-dl
# *MEDIA_EXTENSIONS.storyboards,
# 'desktop',
# 'ism',
# 'm3u',
# 'sbv',
# 'swp',
# 'url',
# 'webloc',
# 'xml',
)))
def __init__(self, extension):
super(_UnsafeExtensionError, self).__init__('unsafe file extension: {0!r}'.format(extension))
self.extension = extension
# support --no-check-extensions
lenient = False
@classmethod
def sanitize_extension(cls, extension, **kwargs):
# ... /, *, prepend=False
prepend = kwargs.get('prepend', False)
if '/' in extension or '\\' in extension:
raise cls(extension)
if not prepend:
last = extension.rpartition('.')[-1]
if last == 'bin':
extension = last = 'unknown_video'
if not (cls.lenient or last.lower() in cls._ALLOWED_EXTENSIONS):
raise cls(extension)
return extension