1
0
mirror of https://github.com/mxpv/podsync.git synced 2024-05-11 05:55:04 +00:00

Experimental SQS based updater

This commit is contained in:
Maksym Pavlenko
2019-05-26 16:19:17 -07:00
parent 1b06c7da60
commit 8f5472dc2b
14 changed files with 345 additions and 300 deletions

View File

@ -14,9 +14,9 @@ import (
"github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/builders"
"github.com/mxpv/podsync/pkg/cache"
"github.com/mxpv/podsync/pkg/feeds"
"github.com/mxpv/podsync/pkg/handler"
"github.com/mxpv/podsync/pkg/queue"
"github.com/mxpv/podsync/pkg/storage"
"github.com/mxpv/podsync/pkg/support"
)
@ -34,6 +34,7 @@ type Opts struct {
DynamoPledgesTableName string `long:"dynamo-pledges-table" env:"DYNAMO_PLEDGES_TABLE_NAME"`
RedisURL string `long:"redis-url" required:"true" env:"REDIS_CONNECTION_URL"`
UpdaterURL string `long:"updater-url" required:"true" env:"UPDATER_URL"`
UpdaterQueueURL string `long:"updater-queue-url" required:"true" env:"UPDATER_SQS_QUEUE_URL"`
Debug bool `long:"debug" env:"DEBUG"`
}
@ -72,12 +73,9 @@ func main() {
patreon := support.NewPatreon(database)
// Cache
// Queue
redisCache, err := cache.NewRedisCache(opts.RedisURL)
if err != nil {
log.WithError(err).Fatal("failed to initialize Redis cache")
}
updateQueue := queue.New(ctx, opts.UpdaterQueueURL)
// Builders
@ -93,7 +91,7 @@ func main() {
generic := builders.NewRemote(opts.UpdaterURL)
feed, err := feeds.NewFeedService(database, redisCache, map[api.Provider]feeds.Builder{
feed, err := feeds.NewFeedService(database, updateQueue, map[api.Provider]feeds.Builder{
api.ProviderYoutube: youtube,
api.ProviderVimeo: vimeo,
api.ProviderGeneric: generic,
@ -131,9 +129,8 @@ func main() {
log.WithError(err).Error("server shutdown failed")
}
if err := redisCache.Close(); err != nil {
log.WithError(err).Error("failed to close redis cache")
}
// Close SQS
updateQueue.Close()
if err := database.Close(); err != nil {
log.WithError(err).Error("failed to close database")

View File

@ -3,12 +3,10 @@ FROM python:alpine3.7
WORKDIR /app
COPY . .
RUN apk add --virtual deps --no-cache build-base && \
pip3 install --no-cache-dir --requirement requirements.txt --target /app && \
apk del deps && \
RUN pip3 install --no-cache-dir --requirement requirements.txt --target /app && \
addgroup -g 1000 -S app && adduser -u 1000 -G app -S app
USER app
ENTRYPOINT ["python3", "-m", "sanic", "server.app", "--host", "0.0.0.0", "--port", "8080"]
CMD ["--workers", "1"]
ENTRYPOINT ["python3"]
CMD ["-u", "main.py"]

View File

@ -1,24 +0,0 @@
from updater import DEFAULT_PAGE_SIZE, get_updates, get_format
# AWS Lambda entry point
def handler(event, context):
url = event.get('url', None)
start = event.get('start', 1)
count = event.get('count', DEFAULT_PAGE_SIZE)
# Last seen video ID
last_id = event.get('last_id', None)
# Detect item format
fmt = event.get('format', 'video')
quality = event.get('quality', 'high')
ytdl_fmt = get_format(fmt, quality)
print('Getting updates for %s (start=%d, count=%d, fmt: %s, last id: %s)' % (url, start, count, ytdl_fmt, last_id))
_, episodes, new_last_id = get_updates(start, count, url, ytdl_fmt, last_id)
return {
'last_id': new_last_id,
'episodes': episodes,
}

View File

@ -1,18 +0,0 @@
import function
import unittest
TEST_URL = 'https://www.youtube.com/user/CNN/videos'
class TestUpdater(unittest.TestCase):
@unittest.skip('heavy test, run manually')
def test_get_50(self):
resp = function.handler({
'url': 'https://www.youtube.com/channel/UCd6MoB9NC6uYN2grvUNT-Zg',
'start': 1,
'count': 50,
'format': 'audio',
'quality': 'low',
}, None)
self.assertEqual(len(resp['Episodes']), 50)
self.assertEqual(resp['Episodes'][0]['ID'], resp['LastID'])

108
cmd/updater/main.py Normal file
View File

@ -0,0 +1,108 @@
import datetime
import gzip
import json
import os
import boto3
import updater
sqs = boto3.client('sqs')
sqs_url = os.getenv('UPDATER_SQS_QUEUE_URL')
print('Using SQS URL: {}'.format(sqs_url))
dynamodb = boto3.resource('dynamodb')
feeds_table_name = os.getenv('DYNAMO_FEEDS_TABLE_NAME', 'Feeds')
print('Using DynamoDB table: {}'.format(feeds_table_name))
feeds_table = dynamodb.Table(feeds_table_name)
def _update(item):
# Unpack fields
feed_id = item['id']
url = item['url']
last_id = item['last_id']
start = int(item['start'])
count = int(item['count'])
fmt = item.get('format', 'video')
quality = item.get('quality', 'high')
ytdl_fmt = updater.get_format(fmt, quality)
# Invoke youtube-dl and pull updates
print('Updating feed {} (last id: {}, start: {}, count: {}, fmt: {})'.format(
feed_id, last_id, start, count, ytdl_fmt))
_, new_episodes, new_last_id = updater.get_updates(start, count, url, ytdl_fmt, last_id)
if new_last_id == last_id:
print('No updates found for {}'.format(feed_id))
return
else:
print('Found {} new episode(s) (new last id: {})'.format(len(new_episodes), new_last_id))
# Get record and DynamoDB and decompress episodes
resp = feeds_table.get_item(
Key={'HashID': feed_id},
ProjectionExpression='#D',
ExpressionAttributeNames={'#D': 'EpisodesData'}
)
old_episodes = []
resp_item = resp['Item']
raw = resp_item.get('EpisodesData')
if raw:
print('Received episodes compressed data of size: {} bytes'.format(len(raw.value)))
old_content = gzip.decompress(raw.value).decode('utf-8') # Decompress from gzip
old_episodes = json.loads(old_content) # Deserialize from string to json
episodes = new_episodes + old_episodes # Prepand new episodes to the list
if len(episodes) > count:
del episodes[count:] # Truncate list
# Compress episodes and submit update query
data = bytes(json.dumps(episodes), 'utf-8')
compressed = gzip.compress(data)
print('Sending new compressed data of size: {} bytes'.format(len(compressed)))
feeds_table.update_item(
Key={
'HashID': feed_id,
},
UpdateExpression='SET #episodes = :data, #last_id = :last_id, #updated_at = :now',
ExpressionAttributeNames={
'#episodes': 'EpisodesData',
'#last_id': 'LastID',
'#updated_at': 'UpdatedAt',
},
ExpressionAttributeValues={
':now': int(datetime.datetime.utcnow().timestamp()),
':last_id': new_last_id,
':data': compressed,
},
)
if __name__ == '__main__':
print('Running updater')
while True:
response = sqs.receive_message(QueueUrl=sqs_url, MaxNumberOfMessages=10)
messages = response.get('Messages')
if not messages:
continue
print('Got {} new message(s) to process'.format(len(messages)))
for msg in messages:
print('-' * 64)
body = msg.get('Body')
try:
# Run updater
_update(json.loads(body))
# Delete message from SQS
receipt_handle = msg.get('ReceiptHandle')
sqs.delete_message(QueueUrl=sqs_url, ReceiptHandle=receipt_handle)
print('Done')
except Exception as e:
print('ERROR: ' + str(e))

View File

@ -1,3 +1,2 @@
boto3==1.9.129
youtube_dl==2019.04.24
sanic==18.12

View File

@ -1,38 +0,0 @@
import updater
from sanic import Sanic, response
from sanic.exceptions import InvalidUsage
app = Sanic()
@app.get('/update')
async def update(req):
url = req.args.get('url', None)
start = int(req.args.get('start', 1))
count = int(req.args.get('count', updater.DEFAULT_PAGE_SIZE))
# Last seen video ID
last_id = req.args.get('last_id', None)
# Detect item format
fmt = req.args.get('format', 'video')
quality = req.args.get('quality', 'high')
ytdl_fmt = updater.get_format(fmt, quality)
try:
_, episodes, new_last_id = updater.get_updates(start, count, url, ytdl_fmt, last_id)
return response.json({
'last_id': new_last_id,
'episodes': episodes,
})
except ValueError:
raise InvalidUsage()
@app.get('/ping')
async def ping(req):
return response.text('pong')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

View File

@ -60,14 +60,14 @@ def get_updates(start, count, url, fmt, last_id=None):
for idx, entry in enumerate(entries):
video_id = entry['id']
# If already seen this video previously, stop pulling updates
if last_id and video_id == last_id:
break
# Remember new last id
if idx == 0:
new_last_id = video_id
# If already seen this video previously, stop pulling updates
if last_id and video_id == last_id:
break
# Query video metadata from YouTube
result = ytdl.process_ie_result(entry, download=False)

View File

@ -1,4 +1,4 @@
version: '2.1'
version: '2.2'
services:
api:
@ -19,11 +19,10 @@ services:
- PATREON_WEBHOOKS_SECRET={PATREON_WEBHOOKS_SECRET}
- COOKIE_SECRET={COOKIE_SECRET}
- GIN_MODE=release
- AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY}
- AWS_SECRET_ACCESS_KEY={AWS_ACCESS_SECRET}
- AWS_REGION=us-east-1
- DYNAMO_FEEDS_TABLE_NAME=Prod_Feeds
- DYNAMO_PLEDGES_TABLE_NAME=Prod_Pledges
- UPDATER_SQS_QUEUE_URL={URL}
redis:
image: redis:5.0.3
container_name: redis
@ -52,9 +51,9 @@ services:
- DYNAMO_RESOLVE_COUNTERS_TABLE=Prod_ResolveCounters
updater:
image: mxpv/updater:latest
container_name: updater
command: --workers 128
restart: always
ports:
- 8080
mem_limit: 5Gb
scale: 8
environment:
- AWS_DEFAULT_REGION=us-east-1
- UPDATER_SQS_QUEUE_URL={URL}
- DYNAMO_FEEDS_TABLE_NAME=Prod_Feeds

View File

@ -11,6 +11,7 @@ import (
"github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/model"
"github.com/mxpv/podsync/pkg/queue"
)
type Builder interface {
@ -25,24 +26,18 @@ type storage interface {
Downgrade(userID string, featureLevel int) ([]string, error)
}
type cacheService interface {
Set(key, value string, ttl time.Duration) error
Get(key string) (string, error)
SaveItem(key string, item interface{}, exp time.Duration) error
GetItem(key string, item interface{}) error
Invalidate(key ...string) error
type Sender interface {
Add(item *queue.Item)
}
type Service struct {
generator IDGen
storage storage
builders map[api.Provider]Builder
cache cacheService
sender Sender
}
func NewFeedService(db storage, cache cacheService, builders map[api.Provider]Builder) (*Service, error) {
func NewFeedService(db storage, sender Sender, builders map[api.Provider]Builder) (*Service, error) {
idGen, err := NewIDGen()
if err != nil {
return nil, err
@ -52,7 +47,7 @@ func NewFeedService(db storage, cache cacheService, builders map[api.Provider]Bu
generator: idGen,
storage: db,
builders: builders,
cache: cache,
sender: sender,
}
return svc, nil
@ -64,12 +59,24 @@ func (s *Service) CreateFeed(req *api.CreateFeedRequest, identity *api.Identity)
return "", err
}
logger := log.WithField("feed_id", feed.HashID)
// Make sure builder exists for this provider
_, ok := s.builders[feed.Provider]
builder, ok := s.builders[feed.Provider]
if !ok {
return "", fmt.Errorf("failed to get builder for URL: %s", req.URL)
}
logger.Infof("creating new feed for %q", feed.ItemURL)
if err := builder.Build(feed); err != nil {
logger.WithError(err).Error("failed to build feed")
return "", err
}
logger.Infof("saving new feed to database")
if err := s.storage.SaveFeed(feed); err != nil {
return "", err
}
@ -135,37 +142,8 @@ func makeEnclosure(feed *model.Feed, id string, lengthInBytes int64) (string, it
return url, contentType, lengthInBytes
}
func (s *Service) updateFromRedis(hashID string, feed *model.Feed) {
key := fmt.Sprintf("feeds/%s", hashID)
redisFeed := &model.Feed{}
if err := s.cache.GetItem(key, redisFeed); err != nil {
return
}
feed.Episodes = redisFeed.Episodes
feed.UpdatedAt = redisFeed.UpdatedAt
feed.LastID = redisFeed.LastID
feed.ItemURL = redisFeed.ItemURL
feed.Author = redisFeed.Author
feed.PubDate = redisFeed.PubDate
feed.Description = redisFeed.Description
feed.Title = redisFeed.Title
feed.Language = redisFeed.Language
feed.Explicit = redisFeed.Explicit
feed.CoverArt = redisFeed.CoverArt
}
func (s *Service) BuildFeed(hashID string) ([]byte, error) {
var (
logger = log.WithField("hash_id", hashID)
now = time.Now().UTC()
errKey = "err/" + hashID
)
cached, err := s.cache.Get(errKey)
if err == nil {
return []byte(cached), nil
}
logger := log.WithField("hash_id", hashID)
feed, err := s.QueryFeed(hashID)
if err != nil {
@ -173,74 +151,28 @@ func (s *Service) BuildFeed(hashID string) ([]byte, error) {
return nil, err
}
s.updateFromRedis(hashID, feed)
// Submit to SQS for background update
s.sender.Add(&queue.Item{
ID: feed.HashID,
URL: feed.ItemURL,
Start: 1,
Count: feed.PageSize,
LastID: feed.LastID,
Format: string(feed.Format),
Quality: string(feed.Quality),
})
oldLastID := feed.LastID
const (
updateTTL = 15 * time.Minute
)
if now.Sub(feed.UpdatedAt) < updateTTL {
if podcast, err := s.buildPodcast(feed); err != nil {
return nil, err
} else {
return []byte(podcast.String()), nil
}
}
builderType := feed.Provider
if feed.LastID != "" {
// Use incremental Lambda updater if have seen this feed before
builderType = api.ProviderGeneric
}
builder, ok := s.builders[builderType]
if !ok {
return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashID)
}
logger.Info("building feed")
if err := builder.Build(feed); err != nil {
logger.WithError(err).Error("failed to build feed")
// Save error to cache to avoid requests spamming
_ = s.cache.Set(errKey, err.Error(), updateTTL)
return nil, err
}
// Format podcast
// Output the feed
if feed.PageSize < len(feed.Episodes) {
feed.Episodes = feed.Episodes[:feed.PageSize]
}
feed.LastAccess = time.Now().UTC()
// Don't zero last seen ID if failed to get updates
if feed.LastID == "" {
feed.LastID = oldLastID
logger.Warnf("failed to get updates for %s (builder: %s)", hashID, builderType)
}
podcast, err := s.buildPodcast(feed)
if err != nil {
return nil, err
}
// Save to storage
if oldLastID != feed.LastID {
logger.Infof("updating feed %s (last id: %q, new id: %q)", hashID, oldLastID, feed.LastID)
if err := s.storage.UpdateFeed(feed); err != nil {
logger.WithError(err).Error("failed to save feed")
return nil, err
}
}
return []byte(podcast.String()), nil
}
@ -342,17 +274,12 @@ func (s Service) Downgrade(patronID string, featureLevel int) error {
logger.Info("downgrading patron")
ids, err := s.storage.Downgrade(patronID, featureLevel)
_, err := s.storage.Downgrade(patronID, featureLevel)
if err != nil {
logger.WithError(err).Error("database error while downgrading patron")
return err
}
if s.cache.Invalidate(ids...) != nil {
logger.WithError(err).Error("failed to invalidate cached feeds")
return err
}
logger.Info("successfully updated user")
return nil
}

View File

@ -7,8 +7,8 @@ package feeds
import (
gomock "github.com/golang/mock/gomock"
model "github.com/mxpv/podsync/pkg/model"
queue "github.com/mxpv/podsync/pkg/queue"
reflect "reflect"
time "time"
)
// MockBuilder is a mock of Builder interface
@ -36,6 +36,7 @@ func (m *MockBuilder) EXPECT() *MockBuilderMockRecorder {
// Build mocks base method
func (m *MockBuilder) Build(feed *model.Feed) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Build", feed)
ret0, _ := ret[0].(error)
return ret0
@ -43,6 +44,7 @@ func (m *MockBuilder) Build(feed *model.Feed) error {
// Build indicates an expected call of Build
func (mr *MockBuilderMockRecorder) Build(feed interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockBuilder)(nil).Build), feed)
}
@ -71,6 +73,7 @@ func (m *Mockstorage) EXPECT() *MockstorageMockRecorder {
// SaveFeed mocks base method
func (m *Mockstorage) SaveFeed(feed *model.Feed) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SaveFeed", feed)
ret0, _ := ret[0].(error)
return ret0
@ -78,11 +81,13 @@ func (m *Mockstorage) SaveFeed(feed *model.Feed) error {
// SaveFeed indicates an expected call of SaveFeed
func (mr *MockstorageMockRecorder) SaveFeed(feed interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveFeed", reflect.TypeOf((*Mockstorage)(nil).SaveFeed), feed)
}
// GetFeed mocks base method
func (m *Mockstorage) GetFeed(hashID string) (*model.Feed, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetFeed", hashID)
ret0, _ := ret[0].(*model.Feed)
ret1, _ := ret[1].(error)
@ -91,11 +96,13 @@ func (m *Mockstorage) GetFeed(hashID string) (*model.Feed, error) {
// GetFeed indicates an expected call of GetFeed
func (mr *MockstorageMockRecorder) GetFeed(hashID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFeed", reflect.TypeOf((*Mockstorage)(nil).GetFeed), hashID)
}
// UpdateFeed mocks base method
func (m *Mockstorage) UpdateFeed(feed *model.Feed) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateFeed", feed)
ret0, _ := ret[0].(error)
return ret0
@ -103,11 +110,13 @@ func (m *Mockstorage) UpdateFeed(feed *model.Feed) error {
// UpdateFeed indicates an expected call of UpdateFeed
func (mr *MockstorageMockRecorder) UpdateFeed(feed interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateFeed", reflect.TypeOf((*Mockstorage)(nil).UpdateFeed), feed)
}
// GetMetadata mocks base method
func (m *Mockstorage) GetMetadata(hashID string) (*model.Feed, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetMetadata", hashID)
ret0, _ := ret[0].(*model.Feed)
ret1, _ := ret[1].(error)
@ -116,11 +125,13 @@ func (m *Mockstorage) GetMetadata(hashID string) (*model.Feed, error) {
// GetMetadata indicates an expected call of GetMetadata
func (mr *MockstorageMockRecorder) GetMetadata(hashID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetadata", reflect.TypeOf((*Mockstorage)(nil).GetMetadata), hashID)
}
// Downgrade mocks base method
func (m *Mockstorage) Downgrade(userID string, featureLevel int) ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Downgrade", userID, featureLevel)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(error)
@ -129,93 +140,41 @@ func (m *Mockstorage) Downgrade(userID string, featureLevel int) ([]string, erro
// Downgrade indicates an expected call of Downgrade
func (mr *MockstorageMockRecorder) Downgrade(userID, featureLevel interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Downgrade", reflect.TypeOf((*Mockstorage)(nil).Downgrade), userID, featureLevel)
}
// MockcacheService is a mock of cacheService interface
type MockcacheService struct {
// MockSender is a mock of Sender interface
type MockSender struct {
ctrl *gomock.Controller
recorder *MockcacheServiceMockRecorder
recorder *MockSenderMockRecorder
}
// MockcacheServiceMockRecorder is the mock recorder for MockcacheService
type MockcacheServiceMockRecorder struct {
mock *MockcacheService
// MockSenderMockRecorder is the mock recorder for MockSender
type MockSenderMockRecorder struct {
mock *MockSender
}
// NewMockcacheService creates a new mock instance
func NewMockcacheService(ctrl *gomock.Controller) *MockcacheService {
mock := &MockcacheService{ctrl: ctrl}
mock.recorder = &MockcacheServiceMockRecorder{mock}
// NewMockSender creates a new mock instance
func NewMockSender(ctrl *gomock.Controller) *MockSender {
mock := &MockSender{ctrl: ctrl}
mock.recorder = &MockSenderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockcacheService) EXPECT() *MockcacheServiceMockRecorder {
func (m *MockSender) EXPECT() *MockSenderMockRecorder {
return m.recorder
}
// Set mocks base method
func (m *MockcacheService) Set(key, value string, ttl time.Duration) error {
ret := m.ctrl.Call(m, "Set", key, value, ttl)
ret0, _ := ret[0].(error)
return ret0
// Add mocks base method
func (m *MockSender) Add(item *queue.Item) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Add", item)
}
// Set indicates an expected call of Set
func (mr *MockcacheServiceMockRecorder) Set(key, value, ttl interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockcacheService)(nil).Set), key, value, ttl)
}
// Get mocks base method
func (m *MockcacheService) Get(key string) (string, error) {
ret := m.ctrl.Call(m, "Get", key)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Get indicates an expected call of Get
func (mr *MockcacheServiceMockRecorder) Get(key interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockcacheService)(nil).Get), key)
}
// SaveItem mocks base method
func (m *MockcacheService) SaveItem(key string, item interface{}, exp time.Duration) error {
ret := m.ctrl.Call(m, "SaveItem", key, item, exp)
ret0, _ := ret[0].(error)
return ret0
}
// SaveItem indicates an expected call of SaveItem
func (mr *MockcacheServiceMockRecorder) SaveItem(key, item, exp interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveItem", reflect.TypeOf((*MockcacheService)(nil).SaveItem), key, item, exp)
}
// GetItem mocks base method
func (m *MockcacheService) GetItem(key string, item interface{}) error {
ret := m.ctrl.Call(m, "GetItem", key, item)
ret0, _ := ret[0].(error)
return ret0
}
// GetItem indicates an expected call of GetItem
func (mr *MockcacheServiceMockRecorder) GetItem(key, item interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetItem", reflect.TypeOf((*MockcacheService)(nil).GetItem), key, item)
}
// Invalidate mocks base method
func (m *MockcacheService) Invalidate(key ...string) error {
varargs := []interface{}{}
for _, a := range key {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Invalidate", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// Invalidate indicates an expected call of Invalidate
func (mr *MockcacheServiceMockRecorder) Invalidate(key ...interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invalidate", reflect.TypeOf((*MockcacheService)(nil).Invalidate), key...)
// Add indicates an expected call of Add
func (mr *MockSenderMockRecorder) Add(item interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockSender)(nil).Add), item)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/model"
"github.com/mxpv/podsync/pkg/queue"
)
var feed = &model.Feed{
@ -35,10 +36,13 @@ func TestService_CreateFeed(t *testing.T) {
gen, _ := NewIDGen()
builder := NewMockBuilder(ctrl)
builder.EXPECT().Build(gomock.Any()).Times(1).Return(nil)
s := Service{
generator: gen,
storage: db,
builders: map[api.Provider]Builder{api.ProviderYoutube: nil},
builders: map[api.Provider]Builder{api.ProviderYoutube: builder},
}
req := &api.CreateFeedRequest{
@ -104,20 +108,18 @@ func TestService_BuildFeed(t *testing.T) {
stor := NewMockstorage(ctrl)
stor.EXPECT().GetFeed(feed.HashID).Times(1).Return(feed, nil)
stor.EXPECT().UpdateFeed(feed).Return(nil)
cache := NewMockcacheService(ctrl)
cache.EXPECT().GetItem("feeds/123", gomock.Any()).Return(errors.New("not found"))
q := NewMockSender(ctrl)
q.EXPECT().Add(gomock.Eq(&queue.Item{
ID: feed.HashID,
URL: feed.ItemURL,
Start: 1,
Count: feed.PageSize,
Format: string(feed.Format),
Quality: string(feed.Quality),
})).Times(1)
builder := NewMockBuilder(ctrl)
builder.EXPECT().Build(feed).Return(nil).Do(func(feed *model.Feed) {
feed.Episodes = append(feed.Episodes, feed.Episodes[0])
feed.LastID = "1"
})
s := Service{storage: stor, cache: cache, builders: map[api.Provider]Builder{
api.ProviderVimeo: builder,
}}
s := Service{storage: stor, sender: q}
_, err := s.BuildFeed(feed.HashID)
require.NoError(t, err)

View File

@ -93,7 +93,7 @@ func New(feed feedService, support patreonService, opts Opts) http.Handler {
r.GET("/api/metadata/:hashId", h.metadata)
r.POST("/api/webhooks", h.webhook)
const feedTTL = 30 * time.Minute
const feedTTL = 1 * time.Hour
r.NoRoute(cache.CachePage(cacheStore, feedTTL, h.getFeed))
return r

136
pkg/queue/sender.go Normal file
View File

@ -0,0 +1,136 @@
package queue
import (
"context"
"encoding/json"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
var (
sess = session.Must(session.NewSession())
queue = sqs.New(sess)
)
const (
chanSize = 1024
maxElementPerBatch = 10 // SQS Batch limit is 10 items per request
)
type Item struct {
ID string `json:"id"`
URL string `json:"url"`
Start int `json:"start"`
Count int `json:"count"`
LastID string `json:"last_id"`
Format string `json:"format"`
Quality string `json:"quality"`
}
type Sender struct {
url *string
items chan *Item
cancel context.CancelFunc
}
func New(ctx context.Context, url string) *Sender {
ctx, cancel := context.WithCancel(ctx)
items := make(chan *Item, chanSize)
sender := &Sender{
url: aws.String(url),
items: items,
cancel: cancel,
}
go sender.transmit(ctx)
return sender
}
func (s *Sender) Add(item *Item) {
s.items <- item
}
func (s *Sender) Close() {
s.cancel()
close(s.items)
}
func (s *Sender) transmit(ctx context.Context) error {
var list = make([]*Item, 0, maxElementPerBatch)
flush := func(ctx context.Context) {
if len(list) == 0 {
return
}
if err := s.send(ctx, list); err != nil {
log.WithError(err).Error("failed to send batch")
}
list = make([]*Item, 0, maxElementPerBatch)
}
for {
select {
case <-time.After(5 * time.Second):
// Flush list if not filled up entirely within 5 seconds
flush(ctx)
case item := <-s.items:
// Append an item to list and flush if filled up
list = append(list, item)
if len(list) == maxElementPerBatch {
flush(ctx)
}
case <-ctx.Done():
// Exiting, flush leftovers
flush(context.Background())
return ctx.Err()
}
}
}
func (s *Sender) send(ctx context.Context, list []*Item) error {
if len(list) == 0 {
return nil
}
log.Debugf("sending a new batch")
sendInput := &sqs.SendMessageBatchInput{
QueueUrl: s.url,
}
for _, item := range list {
data, err := json.Marshal(item)
if err != nil {
return errors.Wrapf(err, "failed to marshal item %q", item.ID)
}
body := string(data)
sendInput.Entries = append(sendInput.Entries, &sqs.SendMessageBatchRequestEntry{
Id: aws.String(item.ID),
MessageBody: aws.String(body),
})
log.Debugf("sending batch: %+v", sendInput)
}
_, err := queue.SendMessageBatchWithContext(ctx, sendInput)
if err != nil {
return errors.Wrap(err, "failed to send message batch")
}
log.Infof("sent %d item(s) to SQS", len(list))
return nil
}