From 014d048ae4ee148ac76fd0bd628bd62c981f2fc9 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Fri, 5 Apr 2019 13:23:48 -0700 Subject: [PATCH] Implement incremental YTDL updater with DynamoDB storage --- cmd/updater/requirements.txt | 1 + cmd/updater/updater.py | 194 ++++++++++++++++++++++++++++++----- cmd/updater/updater_test.py | 51 +++++---- 3 files changed, 201 insertions(+), 45 deletions(-) diff --git a/cmd/updater/requirements.txt b/cmd/updater/requirements.txt index 930b5e4..aa8309d 100644 --- a/cmd/updater/requirements.txt +++ b/cmd/updater/requirements.txt @@ -1 +1,2 @@ +boto3==1.9.129 youtube_dl==2019.03.09 diff --git a/cmd/updater/updater.py b/cmd/updater/updater.py index 395dd88..82db10b 100644 --- a/cmd/updater/updater.py +++ b/cmd/updater/updater.py @@ -1,13 +1,15 @@ import youtube_dl +import boto3 +import os +import time +from datetime import datetime BEST_FORMAT = "bestvideo+bestaudio/best" +DEFAULT_PAGE_SIZE = 50 -FORMATS = { - 'video_high': 'best[ext=mp4]', - 'video_low': 'worst[ext=mp4]', - 'audio_high': 'bestaudio', - 'audio_low': 'worstaudio', -} +dynamodb = boto3.resource('dynamodb') + +feeds_table = dynamodb.Table(os.getenv('UPDATER_DYNAMO_FEEDS_TABLE', 'Feeds')) def handler(event, context): @@ -16,7 +18,7 @@ def handler(event, context): raise ValueError('Invalid resource URL %s' % url) start = event.get('start', 1) - count = event.get('count', 50) + count = event.get('count', DEFAULT_PAGE_SIZE) kind = event.get('kind', 'video_high') last_id = event.get('last_id', None) @@ -25,7 +27,151 @@ def handler(event, context): return _get_updates(start, count, url, kind, last_id) -def _get_updates(start, count, url, kind, last_id=None): +def _update_feed(hash_id): + print('Updating feed {}'.format(hash_id)) + feed = _query_feed(hash_id) + + page_size = int(feed.get('PageSize', DEFAULT_PAGE_SIZE)) + last_id = feed.get('LastID', None) + episodes = feed.get('Episodes', []) + + # Rebuild episode list from scratch + if not last_id: + episodes = [] + + start = time.time() + _, items, new_last_id = _get_updates(1, page_size, _get_url(feed), _get_format(feed), last_id) + end = time.time() + + print('Got feed update: new {}, current {}. Update took: {}'.format(len(items), len(episodes), end-start)) + + # Update feed and submit back to Dynamo + + unix_time = int(datetime.utcnow().timestamp()) + feed['UpdatedAt'] = unix_time + + if len(items) > 0: + episodes = items + episodes # Prepand new episodes + del episodes[page_size:] # Truncate list + feed['Episodes'] = episodes + + # Update last seen video ID + feed['LastID'] = new_last_id + + _update_feed_episodes(hash_id, feed) + else: + # Update last access field only + _update_feed_updated_at(hash_id, unix_time) + + +def _query_feed(hash_id): + response = feeds_table.get_item( + Key={'HashID': hash_id}, + ProjectionExpression='#prov,#type,#size,#fmt,#quality,#level,#id,#last_id,#episodes,#updated_at', + ExpressionAttributeNames={ + '#prov': 'Provider', + '#type': 'LinkType', + '#size': 'PageSize', + '#fmt': 'Format', + '#quality': 'Quality', + '#level': 'FeatureLevel', + '#id': 'ItemID', + '#last_id': 'LastID', + '#episodes': 'Episodes', + '#updated_at': 'UpdatedAt', + }, + ) + + item = response['Item'] + return item + + +def _update_feed_episodes(hash_id, feed): + feeds_table.update_item( + Key={ + 'HashID': hash_id, + }, + UpdateExpression='SET #updated_at = :updated_at, #episodes = :episodes, #last_id = :last_id', + ExpressionAttributeNames={ + '#updated_at': 'UpdatedAt', + '#episodes': 'Episodes', + '#last_id': 'LastID', + }, + ExpressionAttributeValues={ + ':updated_at': feed['UpdatedAt'], + ':episodes': feed['Episodes'], + ':last_id': feed['LastID'], + }, + ReturnValues='NONE', + ) + + +def _update_feed_updated_at(hash_id, updated_at): + feeds_table.update_item( + Key={ + 'HashID': hash_id, + }, + UpdateExpression='SET #updated_at = :updated_at', + ExpressionAttributeNames={ + '#updated_at': 'UpdatedAt', + }, + ExpressionAttributeValues={ + ':updated_at': updated_at, + }, + ReturnValues='NONE', + ) + + +def _get_format(feed): + fmt = feed.get('Format', 'video') + quality = feed.get('Quality', 'high') + + if fmt == 'video': + # Video + if quality == 'high': + return 'best[ext=mp4]' + else: + return 'worst[ext=mp4]' + else: + # Audio + if quality == 'high': + return 'bestaudio' + else: + return 'worstaudio' + + +def _get_url(feed): + provider = feed['Provider'] + link_type = feed['LinkType'] + item_id = feed['ItemID'] + + if provider == 'youtube': + + if link_type == 'playlist': + return 'https://www.youtube.com/playlist?list={}'.format(item_id) + elif link_type == 'channel': + return 'https://www.youtube.com/channel/{}'.format(item_id) + elif link_type == 'user': + return 'https://www.youtube.com/user/{}'.format(item_id) + else: + raise ValueError('Unsupported link type') + + elif provider == 'vimeo': + + if link_type == 'channel': + return 'https://vimeo.com/channels/{}'.format(item_id) + elif link_type == 'group': + return 'http://vimeo.com/groups/{}'.format(item_id) + elif link_type == 'user': + return 'https://vimeo.com/{}'.format(item_id) + else: + raise ValueError('Unsupported link type') + + else: + raise ValueError('Unsupported provider') + + +def _get_updates(start, count, url, fmt, last_id=None): if start < 1: raise ValueError('Invalid start value') @@ -45,14 +191,14 @@ def _get_updates(start, count, url, kind, last_id=None): } with youtube_dl.YoutubeDL(opts) as ytdl: - selector = ytdl.build_format_selector(FORMATS[kind]) + selector = ytdl.build_format_selector(fmt) feed_info = ytdl.extract_info(url, download=False) # Record basic feed metadata feed = { - 'id': feed_info.get('id'), - 'title': feed_info.get('uploader'), - 'page_url': feed_info.get('webpage_url'), + 'ID': feed_info.get('id'), + 'Title': feed_info.get('uploader'), + 'PageURL': feed_info.get('webpage_url'), } videos = [] @@ -74,22 +220,18 @@ def _get_updates(start, count, url, kind, last_id=None): result = ytdl.process_ie_result(entry, download=False) videos.append({ - 'id': video_id, - 'title': result.get('title'), - 'description': result.get('description'), - 'thumbnail': result.get('thumbnail'), - 'duration': result.get('duration'), - 'video_url': result.get('webpage_url'), - 'upload_date': result.get('upload_date'), - 'ext': result.get('ext'), - 'size': _get_size(result, selector), + 'ID': video_id, + 'Title': result.get('title'), + 'Description': result.get('description'), + 'Thumbnail': result.get('thumbnail'), + 'Duration': result.get('duration'), + 'VideoURL': result.get('webpage_url'), + 'UploadDate': result.get('upload_date'), + 'Ext': result.get('ext'), + 'Size': _get_size(result, selector), }) - return { - 'feed': feed, - 'items': videos, - 'last_id': new_last_id, - } + return feed, videos, new_last_id def _get_size(video, selector): diff --git a/cmd/updater/updater_test.py b/cmd/updater/updater_test.py index c0222a6..3eff9b9 100644 --- a/cmd/updater/updater_test.py +++ b/cmd/updater/updater_test.py @@ -1,38 +1,51 @@ import updater import unittest +TEST_URL = 'https://www.youtube.com/user/CNN/videos' + class TestUpdater(unittest.TestCase): def test_get_updates(self): - kinds = ['video_high', 'video_low', 'audio_high', 'audio_low'] + kinds = [ + updater._get_format({'Format': 'video', 'Quality': 'high'}), + updater._get_format({'Format': 'video', 'Quality': 'low'}), + updater._get_format({'Format': 'audio', 'Quality': 'high'}), + updater._get_format({'Format': 'audio', 'Quality': 'low'}), + ] for kind in kinds: with self.subTest(kind): - result = updater._get_updates(1, 1, 'https://www.youtube.com/user/CNN/videos', kind) - self.assertIsNotNone(result['feed']) - self.assertIsNotNone(result['items']) + feed, items, _ = updater._get_updates(1, 1, TEST_URL, kind) + self.assertIsNotNone(feed) + self.assertIsNotNone(items) def test_get_change_list(self): - result = updater._get_updates(1, 5, 'https://www.youtube.com/user/CNN/videos', 'video_low') - self.assertEqual(len(result['items']), 5) - self.assertEqual(result['items'][0]['id'], result['last_id']) - last_id = result['items'][2]['id'] - self.assertIsNotNone(last_id) - result = updater._get_updates(1, 5, 'https://www.youtube.com/user/CNN/videos', 'video_low', last_id) - self.assertEqual(len(result['items']), 2) - self.assertEqual(result['items'][0]['id'], result['last_id']) + feed, items, last_id = updater._get_updates(1, 5, TEST_URL, 'worst[ext=mp4]') + + self.assertEqual(len(items), 5) + self.assertEqual(items[0]['ID'], last_id) + test_last_id = items[2]['ID'] + self.assertIsNotNone(test_last_id) + + feed, items, last_id = updater._get_updates(1, 5, TEST_URL, 'worst[ext=mp4]', test_last_id) + self.assertEqual(len(items), 2) + self.assertEqual(items[0]['ID'], last_id) def test_last_id(self): - result = updater._get_updates(1, 1, 'https://www.youtube.com/user/CNN/videos', 'audio_low') - self.assertEqual(len(result['items']), 1) - self.assertEqual(result['items'][0]['id'], result['last_id']) + feed, items, last_id = updater._get_updates(1, 1, TEST_URL, 'worstaudio') + self.assertEqual(len(items), 1) + self.assertEqual(items[0]['ID'], last_id) @unittest.skip('heavy test, run manually') def test_get_50(self): - result = updater.handler({ + _, items, last_id = updater.handler({ 'url': 'https://www.youtube.com/channel/UCd6MoB9NC6uYN2grvUNT-Zg', 'start': 1, 'count': 50, - 'kind': 'video_low', + 'kind': 'best[ext=mp4]', }, None) - self.assertEqual(len(result['items']), 50) - self.assertEqual(result['items'][0]['id'], result['last_id']) + self.assertEqual(len(items), 50) + self.assertEqual(items[0]['ID'], last_id) + + @unittest.skip + def test_update_feed(self): + updater._update_feed('86qZ')