Compare commits

...

8 Commits

Author SHA1 Message Date
Zenon Mousmoulas
e6b6d9640f
Merge 96a0ad4778 into 37cea84f77 2024-07-02 18:20:20 +03:00
dirkf
37cea84f77 [core,utils] Support unpublicised --no-check-extensions 2024-07-02 15:38:50 +01:00
dirkf
4652109643 [core,utils] Implement unsafe file extension mitigation
* from https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-79w7-vh3h-8g4, thx grub4k
2024-07-02 15:38:50 +01:00
dirkf
3c466186a8 [utils] Back-port Namespace and MEDIA_EXTENSIONS from yt-dlp
Thx pukkandan
* Namespace: https://github.com/yt-dlp/yt-dlp/commit/591bb9d355
* MEDIA_EXTENSIONS: https://github.com/yt-dlp/yt-dlp/commit/8dc5930511
2024-07-02 15:38:50 +01:00
Zenon Mousmoulas
96a0ad4778 MegaTVComEmbedIE: Make canonical URL extraction more robust 2021-11-13 11:50:22 +02:00
Zenon Mousmoulas
28fddc1758 Fix copy/paste typo 2021-11-13 11:39:29 +02:00
Zenon Mousmoulas
a5ec30e106 Address PR comments about escapes 2021-11-13 08:42:15 +02:00
Zenon Mousmoulas
34c3b06402 Add MegaTVCom IEs
* Add new IEs
  * MegaTVComBaseIE: Base IE class
  * MegaTVComIE: Extract from TV VOD pages and news articles, i.e. all
    sorts of pages showing videos on megatv.com
  * MegaTVComEmbedIE: Extract iframe-embeddable megatv.com videos
* When video_id is not matched in the URL, namely for news articles,
  extract it (article_id) from a particular element on the web page
* Derive metadata and sources directly from the web page, from data
  attributes of the player placeholder element and other commonly used
  elements
* Let MegaTVComEmbedIE defer to MegaTVComIE for extraction, as the
  metadata on the embeddable page are some times slightly different, for
  the same video
2021-11-11 15:40:14 +02:00
8 changed files with 433 additions and 25 deletions

View File

@ -14,9 +14,11 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import io
import itertools
import json
import types
import xml.etree.ElementTree
from youtube_dl.utils import (
_UnsafeExtensionError,
age_restricted,
args_to_str,
base_url,
@ -270,6 +272,27 @@ class TestUtil(unittest.TestCase):
expand_path('~/%s' % env('YOUTUBE_DL_EXPATH_PATH')),
'%s/expanded' % compat_getenv('HOME'))
_uncommon_extensions = [
('exe', 'abc.exe.ext'),
('de', 'abc.de.ext'),
('../.mp4', None),
('..\\.mp4', None),
]
def assertUnsafeExtension(self, ext=None):
assert_raises = self.assertRaises(_UnsafeExtensionError)
assert_raises.ext = ext
orig_exit = assert_raises.__exit__
def my_exit(self_, exc_type, exc_val, exc_tb):
did_raise = orig_exit(exc_type, exc_val, exc_tb)
if did_raise and assert_raises.ext is not None:
self.assertEqual(assert_raises.ext, assert_raises.exception.extension, 'Unsafe extension not as unexpected')
return did_raise
assert_raises.__exit__ = types.MethodType(my_exit, assert_raises)
return assert_raises
def test_prepend_extension(self):
self.assertEqual(prepend_extension('abc.ext', 'temp'), 'abc.temp.ext')
self.assertEqual(prepend_extension('abc.ext', 'temp', 'ext'), 'abc.temp.ext')
@ -278,6 +301,19 @@ class TestUtil(unittest.TestCase):
self.assertEqual(prepend_extension('.abc', 'temp'), '.abc.temp')
self.assertEqual(prepend_extension('.abc.ext', 'temp'), '.abc.temp.ext')
# Test uncommon extensions
self.assertEqual(prepend_extension('abc.ext', 'bin'), 'abc.bin.ext')
for ext, result in self._uncommon_extensions:
with self.assertUnsafeExtension(ext):
prepend_extension('abc', ext)
if result:
self.assertEqual(prepend_extension('abc.ext', ext, 'ext'), result)
else:
with self.assertUnsafeExtension(ext):
prepend_extension('abc.ext', ext, 'ext')
with self.assertUnsafeExtension(ext):
prepend_extension('abc.unexpected_ext', ext, 'ext')
def test_replace_extension(self):
self.assertEqual(replace_extension('abc.ext', 'temp'), 'abc.temp')
self.assertEqual(replace_extension('abc.ext', 'temp', 'ext'), 'abc.temp')
@ -286,6 +322,16 @@ class TestUtil(unittest.TestCase):
self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp')
self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp')
# Test uncommon extensions
self.assertEqual(replace_extension('abc.ext', 'bin'), 'abc.unknown_video')
for ext, _ in self._uncommon_extensions:
with self.assertUnsafeExtension(ext):
replace_extension('abc', ext)
with self.assertUnsafeExtension(ext):
replace_extension('abc.ext', ext, 'ext')
with self.assertUnsafeExtension(ext):
replace_extension('abc.unexpected_ext', ext, 'ext')
def test_subtitles_filename(self):
self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt'), 'abc.en.vtt')
self.assertEqual(subtitles_filename('abc.ext', 'en', 'vtt', 'ext'), 'abc.en.vtt')

View File

@ -7,6 +7,7 @@ import collections
import copy
import datetime
import errno
import functools
import io
import itertools
import json
@ -53,6 +54,7 @@ from .compat import (
compat_urllib_request_DataHandler,
)
from .utils import (
_UnsafeExtensionError,
age_restricted,
args_to_str,
bug_reports_message,
@ -129,6 +131,20 @@ if compat_os_name == 'nt':
import ctypes
def _catch_unsafe_file_extension(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except _UnsafeExtensionError as error:
self.report_error(
'{0} found; to avoid damaging your system, this value is disallowed.'
' If you believe this is an error{1}').format(
error.message, bug_reports_message(','))
return wrapper
class YoutubeDL(object):
"""YoutubeDL class.
@ -1925,6 +1941,7 @@ class YoutubeDL(object):
if self.params.get('forcejson', False):
self.to_stdout(json.dumps(self.sanitize_info(info_dict)))
@_catch_unsafe_file_extension
def process_info(self, info_dict):
"""Process a single resolved IE result."""

View File

@ -21,6 +21,7 @@ from .compat import (
workaround_optparse_bug9161,
)
from .utils import (
_UnsafeExtensionError,
DateRange,
decodeOption,
DEFAULT_OUTTMPL,
@ -173,6 +174,9 @@ def _real_main(argv=None):
if opts.ap_mso and opts.ap_mso not in MSO_INFO:
parser.error('Unsupported TV Provider, use --ap-list-mso to get a list of supported TV Providers')
if opts.no_check_extensions:
_UnsafeExtensionError.lenient = True
def parse_retries(retries):
if retries in ('inf', 'infinite'):
parsed_retries = float('inf')

View File

@ -1078,6 +1078,10 @@ from .rutube import (
RutubePersonIE,
RutubePlaylistIE,
)
from .megatvcom import (
MegaTVComIE,
MegaTVComEmbedIE,
)
from .rutv import RUTVIE
from .ruutu import RuutuIE
from .ruv import RuvIE

View File

@ -102,6 +102,7 @@ from .ustream import UstreamIE
from .arte import ArteTVEmbedIE
from .videopress import VideoPressIE
from .rutube import RutubeIE
from .megatvcom import MegaTVComEmbedIE
from .limelight import LimelightBaseIE
from .anvato import AnvatoIE
from .washingtonpost import WashingtonPostIE
@ -3400,6 +3401,12 @@ class GenericIE(InfoExtractor):
return self.playlist_from_matches(
rutube_urls, video_id, video_title, ie=RutubeIE.ie_key())
# Look for megatv.com embeds
megatvcom_urls = list(MegaTVComEmbedIE._extract_urls(webpage, url))
if megatvcom_urls:
return self.playlist_from_matches(
megatvcom_urls, video_id, video_title, ie=MegaTVComEmbedIE.ie_key())
# Look for WashingtonPost embeds
wapo_urls = WashingtonPostIE._extract_urls(webpage)
if wapo_urls:

View File

@ -0,0 +1,203 @@
# coding: utf-8
from __future__ import unicode_literals
import hashlib
import re
from .common import InfoExtractor
from ..compat import (
compat_str,
compat_parse_qs,
compat_urllib_parse_urlparse,
)
from ..utils import (
HEADRequest,
ExtractorError,
determine_ext,
get_element_by_class,
unified_timestamp,
extract_attributes,
clean_html,
unescapeHTML,
)
class MegaTVComBaseIE(InfoExtractor):
_PLAYER_DIV_ID = 'player_div_id'
def _extract_player_attrs(self, webpage):
PLAYER_DIV_RE = r'''(?x)
<div(?:
id=(?P<_q1>["'])(?P<%(pdi)s>%(pdi)s)(?P=_q1)|
[^>]*?
)+>
''' % {'pdi': self._PLAYER_DIV_ID}
for mobj in re.finditer(PLAYER_DIV_RE, webpage):
if mobj.group(self._PLAYER_DIV_ID):
player_el = mobj.group(0)
break
else:
raise ExtractorError('no <div id="%s"> element found in webpage' %
self._PLAYER_DIV_ID)
return {
re.sub(r'^data-(?:kwik_)?', '', k): v
for k, v in extract_attributes(player_el).items()
if k not in ('id',)
}
class MegaTVComIE(MegaTVComBaseIE):
IE_NAME = 'megatvcom'
IE_DESC = 'megatv.com videos'
_VALID_URL = r'https?://(?:www\.)?megatv\.com/(?:(?!\d{4})[^/]+/(?P<id>\d+)/[^/]+|\d{4}/\d{2}/\d{2}/.+)'
_TESTS = [{
'url': 'https://www.megatv.com/2021/10/23/egkainia-gia-ti-nea-skini-omega-tou-dimotikou-theatrou-peiraia/',
'md5': '2ebe96661cb81854889053cebb661068',
'info_dict': {
'id': '520979',
'ext': 'mp4',
'title': 'md5:70eef71a9cd2c1ecff7ee428354dded2',
'description': 'md5:0209fa8d318128569c0d256a5c404db1',
'timestamp': 1634975747,
'upload_date': '20211023',
},
}, {
'url': 'https://www.megatv.com/tvshows/527800/epeisodio-65-12/',
'md5': '8ab0c9d664cea11678670202b87bb2b1',
'info_dict': {
'id': '527800',
'ext': 'mp4',
'title': 'md5:fc322cb51f682eecfe2f54cd5ab3a157',
'description': 'md5:b2b7ed3690a78f2a0156eb790fdc00df',
'timestamp': 1636048859,
'upload_date': '20211104',
},
}]
def _match_article_id(self, webpage):
ART_RE = r'''(?x)
<article(?:
id=(?P<_q2>["'])Article_(?P<article>\d+)(?P=_q2)|
[^>]*?
)+>
'''
return compat_str(self._search_regex(ART_RE, webpage, 'article_id',
group='article'))
def _real_extract(self, url):
video_id = self._match_id(url)
_is_article = video_id == 'None'
webpage = self._download_webpage(url,
'N/A' if _is_article else
video_id)
if _is_article:
video_id = self._match_article_id(webpage)
player_attrs = self._extract_player_attrs(webpage)
title = player_attrs.get('label') or self._og_search_title(webpage)
description = clean_html(get_element_by_class(
'article-wrapper' if _is_article else 'story_content',
webpage))
if not description:
description = self._og_search_description(webpage)
thumbnail = player_attrs.get('image') or \
self._og_search_thumbnail(webpage)
timestamp = unified_timestamp(self._html_search_meta(
'article:published_time', webpage))
try:
source = player_attrs['source']
except KeyError:
raise ExtractorError('no source found for %s' % video_id)
formats = self._extract_m3u8_formats(source, video_id, 'mp4') \
if determine_ext(source) == 'm3u8' else [source]
self._sort_formats(formats)
return {
'id': video_id,
'title': title,
'description': description,
'thumbnail': thumbnail,
'timestamp': timestamp,
'formats': formats,
}
class MegaTVComEmbedIE(MegaTVComBaseIE):
IE_NAME = 'megatvcom:embed'
IE_DESC = 'megatv.com embedded videos'
_VALID_URL = r'https?://(?:www\.)?megatv\.com/embed/?\?p=\d+'
_TESTS = [{
'url': 'https://www.megatv.com/embed/?p=2020520979',
'md5': '2ebe96661cb81854889053cebb661068',
'info_dict': {
'id': '520979',
'ext': 'mp4',
'title': 'md5:70eef71a9cd2c1ecff7ee428354dded2',
'description': 'md5:0209fa8d318128569c0d256a5c404db1',
'timestamp': 1634975747,
'upload_date': '20211023',
},
}, {
'url': 'https://www.megatv.com/embed/?p=2020534081',
'md5': 'f9a15e315acbf01b128e8efa3f75aab3',
'info_dict': {
'id': '534081',
'ext': 'mp4',
'title': 'md5:062e9d5976ef854d8bdc1f5724d9b2d0',
'description': 'md5:36dbe4c3762d2ede9513eea8d07f6d52',
'timestamp': 1636376351,
'upload_date': '20211108',
},
}]
@classmethod
def _extract_urls(cls, webpage, origin_url=None):
# make the scheme in _VALID_URL optional
_URL_RE = r'(?:https?:)?//' + cls._VALID_URL.split('://', 1)[1]
EMBED_RE = r'''(?x)
<iframe[^>]+?src=(?P<_q1>%(quot_re)s)
(?P<url>%(url_re)s)(?P=_q1)
''' % {'quot_re': r'["\']', 'url_re': _URL_RE}
for mobj in re.finditer(EMBED_RE, webpage):
url = unescapeHTML(mobj.group('url'))
if url.startswith('//'):
scheme = compat_urllib_parse_urlparse(origin_url).scheme \
if origin_url else 'https'
url = '%s:%s' % (scheme, url)
yield url
def _match_canonical_url(self, webpage):
LINK_RE = r'''(?x)
<link(?:
rel=(?P<_q1>%(quot_re)s)(?P<canonical>canonical)(?P=_q1)|
href=(?P<_q2>%(quot_re)s)(?P<href>(?:(?!(?P=_q2)).)+)(?P=_q2)|
[^>]*?
)+>
''' % {'quot_re': r'["\']'}
for mobj in re.finditer(LINK_RE, webpage):
canonical, href = mobj.group('canonical', 'href')
if canonical and href:
return unescapeHTML(href)
def _real_extract(self, url):
webpage = self._download_webpage(url, 'N/A')
player_attrs = self._extract_player_attrs(webpage)
canonical_url = player_attrs.get('share_url') or \
self._match_canonical_url(webpage)
if not canonical_url:
raise ExtractorError('canonical URL not found')
video_id = compat_parse_qs(compat_urllib_parse_urlparse(
canonical_url).query)['p'][0]
# Resolve the canonical URL, following redirects, and defer to
# megatvcom, as the metadata extracted from the embeddable page some
# times are slightly different, for the same video
canonical_url = self._request_webpage(
HEADRequest(canonical_url), video_id,
note='Resolve canonical URL',
errnote='Could not resolve canonical URL').geturl()
return self.url_result(
canonical_url,
MegaTVComIE.ie_key(),
video_id
)

View File

@ -533,6 +533,10 @@ def parseOpts(overrideArguments=None):
'--no-check-certificate',
action='store_true', dest='no_check_certificate', default=False,
help='Suppress HTTPS certificate validation')
workarounds.add_option(
'--no-check-extensions',
action='store_true', dest='no_check_extensions', default=False,
help='Suppress file extension validation')
workarounds.add_option(
'--prefer-insecure',
'--prefer-unsecure', action='store_true', dest='prefer_insecure',

View File

@ -1717,21 +1717,6 @@ TIMEZONE_NAMES = {
'PST': -8, 'PDT': -7 # Pacific
}
KNOWN_EXTENSIONS = (
'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
'flv', 'f4v', 'f4a', 'f4b',
'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
'mkv', 'mka', 'mk3d',
'avi', 'divx',
'mov',
'asf', 'wmv', 'wma',
'3gp', '3g2',
'mp3',
'flac',
'ape',
'wav',
'f4f', 'f4m', 'm3u8', 'smil')
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
@ -3959,19 +3944,22 @@ def parse_duration(s):
return duration
def prepend_extension(filename, ext, expected_real_ext=None):
def _change_extension(prepend, filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
return (
'{0}.{1}{2}'.format(name, ext, real_ext)
if not expected_real_ext or real_ext[1:] == expected_real_ext
else '{0}.{1}'.format(filename, ext))
sanitize_extension = _UnsafeExtensionError.sanitize_extension
if not expected_real_ext or real_ext.partition('.')[0::2] == ('', expected_real_ext):
filename = name
if prepend and real_ext:
sanitize_extension(ext, prepend=prepend)
return ''.join((filename, '.', ext, real_ext))
# Mitigate path traversal and file impersonation attacks
return '.'.join((filename, sanitize_extension(ext)))
def replace_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
return '{0}.{1}'.format(
name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
ext)
prepend_extension = functools.partial(_change_extension, True)
replace_extension = functools.partial(_change_extension, False)
def check_executable(exe, args=[]):
@ -6561,3 +6549,138 @@ def join_nonempty(*values, **kwargs):
if from_dict is not None:
values = (traverse_obj(from_dict, variadic(v)) for v in values)
return delim.join(map(compat_str, filter(None, values)))
class Namespace(object):
"""Immutable namespace"""
def __init__(self, **kw_attr):
self.__dict__.update(kw_attr)
def __iter__(self):
return iter(self.__dict__.values())
@property
def items_(self):
return self.__dict__.items()
MEDIA_EXTENSIONS = Namespace(
common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma', 'weba'),
thumbnails=('jpg', 'png', 'webp'),
# storyboards=('mhtml', ),
subtitles=('srt', 'vtt', 'ass', 'lrc', 'ttml'),
manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
)
MEDIA_EXTENSIONS.video = MEDIA_EXTENSIONS.common_video + MEDIA_EXTENSIONS.video
MEDIA_EXTENSIONS.audio = MEDIA_EXTENSIONS.common_audio + MEDIA_EXTENSIONS.audio
KNOWN_EXTENSIONS = (
MEDIA_EXTENSIONS.video + MEDIA_EXTENSIONS.audio
+ MEDIA_EXTENSIONS.manifests
)
class _UnsafeExtensionError(Exception):
"""
Mitigation exception for unwanted file overwrite/path traversal
Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-79w7-vh3h-8g4j
"""
_ALLOWED_EXTENSIONS = frozenset(itertools.chain(
( # internal
'description',
'json',
'meta',
'orig',
'part',
'temp',
'uncut',
'unknown_video',
'ytdl',
),
# video
MEDIA_EXTENSIONS.video, (
'avif',
'ismv',
'm2ts',
'm4s',
'mng',
'mpeg',
'qt',
'swf',
'ts',
'vp9',
'wvm',
),
# audio
MEDIA_EXTENSIONS.audio, (
'isma',
'mid',
'mpga',
'ra',
),
# image
MEDIA_EXTENSIONS.thumbnails, (
'bmp',
'gif',
'ico',
'heic',
'jng',
'jpeg',
'jxl',
'svg',
'tif',
'wbmp',
),
# subtitle
MEDIA_EXTENSIONS.subtitles, (
'dfxp',
'fs',
'ismt',
'sami',
'scc',
'ssa',
'tt',
),
# others
MEDIA_EXTENSIONS.manifests,
(
# not used in yt-dl
# *MEDIA_EXTENSIONS.storyboards,
# 'desktop',
# 'ism',
# 'm3u',
# 'sbv',
# 'swp',
# 'url',
# 'webloc',
# 'xml',
)))
def __init__(self, extension):
super(_UnsafeExtensionError, self).__init__('unsafe file extension: {0!r}'.format(extension))
self.extension = extension
# support --no-check-extensions
lenient = False
@classmethod
def sanitize_extension(cls, extension, **kwargs):
# ... /, *, prepend=False
prepend = kwargs.get('prepend', False)
if '/' in extension or '\\' in extension:
raise cls(extension)
if not prepend:
last = extension.rpartition('.')[-1]
if last == 'bin':
extension = last = 'unknown_video'
if not (cls.lenient or last.lower() in cls._ALLOWED_EXTENSIONS):
raise cls(extension)
return extension