import os
import youtube_dl
import boto3
from datetime import datetime, time
from dateutil.relativedelta import relativedelta
class InvalidUsage(Exception):
class QuotaExceeded(Exception):
dynamodb = boto3.resource('dynamodb')
feeds_table = dynamodb.Table(os.getenv('RESOLVER_DYNAMO_FEEDS_TABLE', 'Feeds'))
counter_table = dynamodb.Table(os.getenv('RESOLVER_DYNAMO_RESOLVE_COUNTERS_TABLE', 'ResolveCounters'))
opts = {
'quiet': True,
'no_warnings': True,
'forceurl': True,
'simulate': True,
'skip_download': True,
'call_home': False,
'nocheckcertificate': True
url_formats = {
'youtube': 'https://youtube.com/watch?v={}',
'vimeo': 'https://vimeo.com/{}',
def handler(event, lambda_context):
feed_id, video_id = _get_ids(event.get('path'))
redirect_url = download(feed_id, video_id)
return {
'statusCode': 302,
'statusDescription': '302 Found',
'headers': {
'Location': redirect_url,
except QuotaExceeded:
return {
'statusCode': 429,
'statusDescription': '429 Too Many Requests',
'body': 'Too many requests. Daily limit is 1000. Consider upgrading account to get unlimited access',
'headers': {'Content-Type': 'text/plain'}
def _get_ids(path):
if not path or not path.startswith('/download'):
raise InvalidUsage('Invalid path')
sections = path.split('/')
# >>> '/download/feed/video.xml'.split('/', 3)
# ['', 'download', 'feed', 'video.xml']
if len(sections) != 4:
raise InvalidUsage('Invalid path')
feed_id = sections[2]
video_id = sections[3]
if not feed_id or not video_id:
raise InvalidUsage('Invalid feed or video id')
# Trim extension
# >>> os.path.splitext('video.xml')[0]
# 'video'
video_id = os.path.splitext(video_id)[0]
return feed_id, video_id
def download(feed_id, video_id):
if not feed_id:
raise InvalidUsage('Invalid feed id')
# Remove extension and check if video id is ok
video_id = os.path.splitext(video_id)[0]
if not video_id:
raise InvalidUsage('Invalid video id')
# Query feed metadata info from DynamoDB
item = _get_metadata(feed_id)
# Update resolve requests counter
count = _update_resolve_counter(feed_id)
level = int(item['featurelevel'])
if count > ANONYMOUS_FEED_REQUESTS_LIMIT and level == 0:
raise QuotaExceeded('Too many requests. Daily limit is %d. Consider upgrading account to get unlimited '
# Build URL
provider = item['provider']
tpl = url_formats[provider]
if not tpl:
raise InvalidUsage('Invalid feed')
url = tpl.format(video_id)
redirect_url = _resolve(url, item)
return redirect_url
def _get_metadata(feed_id):
response = feeds_table.get_item(
Key={'HashID': feed_id},
'#P': 'Provider',
'#F': 'Format',
'#Q': 'Quality',
'#L': 'FeatureLevel',
item = response['Item']
# Make dict keys lowercase
return dict((k.lower(), v) for k, v in item.items())
def _update_resolve_counter(feed_id):
if not feed_id:
now = datetime.utcnow()
day = now.strftime('%Y%m%d')
expires = now + relativedelta(months=3)
response = counter_table.update_item(
'FeedID': feed_id,
'Day': int(day),
UpdateExpression='ADD #count :one SET #exp = if_not_exists(#exp, :ttl)',
'#count': 'Count',
'#exp': 'Expires',
':one': 1,
':ttl': int(expires.timestamp()),
attrs = response['Attributes']
return attrs['Count']
def _resolve(url, metadata):
if not url:
raise InvalidUsage('Invalid URL')
print('Resolving %s' % url)
provider = metadata['provider']
with youtube_dl.YoutubeDL(opts) as ytdl:
info = ytdl.extract_info(url, download=False)
if provider == 'youtube':
return _yt_choose_url(ytdl, info, metadata)
elif provider == 'vimeo':
return _vimeo_choose_url(info, metadata)
raise ValueError('undefined provider')
except Exception as e:
def _yt_choose_url(ytdl, info, metadata):
is_video = metadata['format'] == 'video'
is_high_quality = metadata['quality'] == 'high'
if not is_video:
fmt = 'bestaudio' if is_high_quality else 'worstaudio'
selector = ytdl.build_format_selector(fmt)
selected = next(selector(info))
if 'fragment_base_url' in selected:
return selected['fragment_base_url']
except KeyError:
# Filter formats by file extension
ext = 'mp4' if is_video else 'm4a'
fmt_list = [x for x in info['formats'] if x['ext'] == ext and 'acodec' in x and x['acodec'] != 'none']
if not len(fmt_list):
return info['url']
# Sort list by field (width for videos, file size for audio)
sort_field = 'width' if is_video else 'filesize'
# Sometime 'filesize' field can be None
if not all(x[sort_field] is not None for x in fmt_list):
sort_field = 'format_id'
ordered = sorted(fmt_list, key=lambda x: x[sort_field], reverse=True)
# Choose an item depending on quality, better at the beginning
item = ordered[0] if is_high_quality else ordered[-1]
return item['url']
def _vimeo_choose_url(info, metadata):
# Query formats with 'extension' = mp4 and 'format_id' = http-1080p/http-720p/../http-360p
fmt_list = [x for x in info['formats'] if x['ext'] == 'mp4' and x['format_id'].startswith('http-')]
ordered = sorted(fmt_list, key=lambda x: x['width'], reverse=True)
is_high_quality = metadata['quality'] == 'high'
item = ordered[0] if is_high_quality else ordered[-1]
return item['url']