mirror of
https://github.com/mxpv/podsync.git
synced 2024-05-11 05:55:04 +00:00
Use Redis map for cache data (optimize memory usage)
This commit is contained in:
36
pkg/cache/redis.go
vendored
36
pkg/cache/redis.go
vendored
@ -72,6 +72,42 @@ func (c RedisCache) GetItem(key string, item interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c RedisCache) SetMap(key string, fields map[string]interface{}, exp time.Duration) error {
|
||||
if err := c.client.HMSet(key, fields).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.client.TTL(key).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c RedisCache) GetMap(key string, fields ...string) (map[string]string, error) {
|
||||
result, err := c.client.HMGet(key, fields...).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, ErrNotFound
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data := map[string]string{}
|
||||
for idx, key := range fields {
|
||||
if result[idx] == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
data[key] = result[idx].(string)
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (c RedisCache) Invalidate(key... string) error {
|
||||
return c.client.Del(key...).Err()
|
||||
}
|
||||
|
50
pkg/cache/redis_test.go
vendored
50
pkg/cache/redis_test.go
vendored
@ -46,6 +46,56 @@ func TestNewRedisCache_TTL(t *testing.T) {
|
||||
assert.Equal(t, ErrNotFound, err)
|
||||
}
|
||||
|
||||
func TestRedisCache_SaveItem(t *testing.T) {
|
||||
type test struct {
|
||||
Feed []byte `msgpack:"feed"`
|
||||
UpdatedAt time.Time `msgpack:"updated_at"`
|
||||
}
|
||||
|
||||
s := createRedisClient(t)
|
||||
defer s.Close()
|
||||
|
||||
item := &test{
|
||||
Feed: []byte("123"),
|
||||
UpdatedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
err := s.SaveItem("test", item, time.Minute)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var out test
|
||||
err = s.GetItem("test", &out)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.EqualValues(t, item.Feed, &out.Feed)
|
||||
assert.EqualValues(t, item.UpdatedAt.Unix(), out.UpdatedAt.Unix())
|
||||
}
|
||||
|
||||
func TestRedisCache_Map(t *testing.T) {
|
||||
s := createRedisClient(t)
|
||||
defer s.Close()
|
||||
|
||||
data := map[string]interface{}{
|
||||
"1": "123",
|
||||
"2": "test",
|
||||
}
|
||||
|
||||
err := s.SetMap("2", data, time.Minute)
|
||||
assert.NoError(t, err)
|
||||
|
||||
out, err := s.GetMap("2", "1", "2")
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, data, out)
|
||||
}
|
||||
|
||||
func TestRedisCache_GetMapInvalidKey(t *testing.T) {
|
||||
s := createRedisClient(t)
|
||||
defer s.Close()
|
||||
|
||||
_, err := s.GetMap("unknown_key", "1", "2")
|
||||
assert.Equal(t, ErrNotFound, err)
|
||||
}
|
||||
|
||||
// docker run -it --rm -p 6379:6379 redis
|
||||
func createRedisClient(t *testing.T) RedisCache {
|
||||
if testing.Short() {
|
||||
|
@ -2,6 +2,7 @@ package feeds
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
itunes "github.com/mxpv/podcast"
|
||||
@ -12,12 +13,6 @@ import (
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
)
|
||||
|
||||
type CacheItem struct {
|
||||
Feed []byte `msgpack:"feed"`
|
||||
UpdatedAt time.Time `msgpack:"updated_at"`
|
||||
ItemCount uint64 `msgpack:"item_count"`
|
||||
}
|
||||
|
||||
type Builder interface {
|
||||
Build(feed *model.Feed) (podcast *itunes.Podcast, err error)
|
||||
GetVideoCount(feed *model.Feed) (uint64, error)
|
||||
@ -31,8 +26,15 @@ type storage interface {
|
||||
}
|
||||
|
||||
type cacheService interface {
|
||||
Set(key, value string, ttl time.Duration) error
|
||||
Get(key string) (string, error)
|
||||
|
||||
SaveItem(key string, item interface{}, exp time.Duration) error
|
||||
GetItem(key string, item interface{}) error
|
||||
|
||||
SetMap(key string, fields map[string]interface{}, exp time.Duration) error
|
||||
GetMap(key string, fields ...string) (map[string]string, error)
|
||||
|
||||
Invalidate(key ...string) error
|
||||
}
|
||||
|
||||
@ -117,6 +119,48 @@ func (s Service) getVideoCount(feed *model.Feed, builder Builder) (uint64, bool)
|
||||
return videoCount, true
|
||||
}
|
||||
|
||||
const (
|
||||
feedKey = "feed"
|
||||
updatedAtKey = "updatedAt"
|
||||
videoCountKey = "videoCount"
|
||||
)
|
||||
|
||||
func (s Service) getCachedFeed(hashID string) ([]byte, time.Time, uint64, error) {
|
||||
cached, err := s.cache.GetMap(hashID, feedKey, updatedAtKey, videoCountKey)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, 0, err
|
||||
}
|
||||
|
||||
// Get feed body
|
||||
|
||||
body := []byte(cached[feedKey])
|
||||
|
||||
// Parse updated at
|
||||
|
||||
unixTime, err := strconv.ParseInt(cached[updatedAtKey], 10, 64)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, 0, err
|
||||
}
|
||||
|
||||
updatedAt := time.Unix(unixTime, 0)
|
||||
|
||||
// Parse video count
|
||||
|
||||
var videoCount uint64
|
||||
|
||||
if str, ok := cached[videoCountKey]; ok {
|
||||
count, err := strconv.ParseUint(str, 10, 64)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, 0, err
|
||||
}
|
||||
|
||||
videoCount = count
|
||||
}
|
||||
|
||||
// OK
|
||||
return body, updatedAt, videoCount, nil
|
||||
}
|
||||
|
||||
func (s Service) BuildFeed(hashID string) ([]byte, error) {
|
||||
const (
|
||||
feedRecordTTL = 15 * 24 * time.Hour
|
||||
@ -124,26 +168,25 @@ func (s Service) BuildFeed(hashID string) ([]byte, error) {
|
||||
)
|
||||
|
||||
var (
|
||||
cached CacheItem
|
||||
now = time.Now().UTC()
|
||||
verifyCache bool
|
||||
)
|
||||
|
||||
// Check cached version first
|
||||
err := s.cache.GetItem(hashID, &cached)
|
||||
body, updatedAt, currentCount, err := s.getCachedFeed(hashID)
|
||||
if err == nil {
|
||||
// We've succeded to retrieve data from Redis, check if it's up to date
|
||||
|
||||
// 1. If cached less than 15 minutes ago, just return data
|
||||
if now.Sub(cached.UpdatedAt) < cacheRecheckTTL {
|
||||
return cached.Feed, nil
|
||||
if now.Sub(updatedAt) < cacheRecheckTTL {
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// 2. Verify cache integrity by querying the number of episodes from YouTube
|
||||
verifyCache = true
|
||||
}
|
||||
|
||||
// Query feed metadata
|
||||
// Query feed metadata from DynamoDB
|
||||
|
||||
feed, err := s.QueryFeed(hashID)
|
||||
if err != nil {
|
||||
@ -155,55 +198,58 @@ func (s Service) BuildFeed(hashID string) ([]byte, error) {
|
||||
return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashID)
|
||||
}
|
||||
|
||||
// Check if cached version is still valid
|
||||
// 2. Check if cached version is still valid
|
||||
|
||||
if verifyCache {
|
||||
log.Debugf("pulling the number of videos from %q", feed.Provider)
|
||||
// Query YouTube and check the number of videos.
|
||||
// Most likely it'll remain the same, so we can return previously cached feed.
|
||||
videoCount, videoOk := s.getVideoCount(feed, builder)
|
||||
|
||||
// Query YouTube and check the number of videos.
|
||||
// Most likely it'll remain the same, so we can return previously cached feed.
|
||||
count, ok := s.getVideoCount(feed, builder)
|
||||
if ok {
|
||||
if count == cached.ItemCount {
|
||||
// Cache is up to date, renew and save
|
||||
cached.UpdatedAt = now
|
||||
if verifyCache && videoOk {
|
||||
|
||||
if s.cache.SaveItem(hashID, &cached, feedRecordTTL) != nil {
|
||||
return nil, errors.Wrap(err, "failed to cache item")
|
||||
}
|
||||
|
||||
return cached.Feed, nil
|
||||
if currentCount == videoCount {
|
||||
// Cache is up to date, renew and save
|
||||
err = s.cache.SetMap(hashID, map[string]interface{}{updatedAtKey: now.Unix()}, feedRecordTTL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to cache item")
|
||||
}
|
||||
|
||||
log.Debugf("the number of episodes is different (%d != %d)", cached.ItemCount, count)
|
||||
cached.ItemCount = count
|
||||
return body, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Rebuild feed using YouTube API
|
||||
|
||||
log.Infof("building new feed %q", hashID)
|
||||
|
||||
podcast, err := builder.Build(feed)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("feed_id", hashID).Error("failed to build cache")
|
||||
|
||||
// If there is cached version - return it
|
||||
if verifyCache {
|
||||
return body, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data := []byte(podcast.String())
|
||||
newBody := podcast.String()
|
||||
|
||||
// Save to cache
|
||||
|
||||
cached.Feed = data
|
||||
cached.UpdatedAt = now
|
||||
|
||||
if !verifyCache {
|
||||
cached.ItemCount, _ = s.getVideoCount(feed, builder)
|
||||
data := map[string]interface{}{
|
||||
feedKey: newBody,
|
||||
updatedAtKey: now.Unix(),
|
||||
videoCountKey: strconv.FormatUint(videoCount, 10),
|
||||
}
|
||||
|
||||
if err := s.cache.SaveItem(hashID, cached, feedRecordTTL); err != nil {
|
||||
err = s.cache.SetMap(hashID, data, feedRecordTTL)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Warnf("failed to save new feed %q to cache", hashID)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
return []byte(newBody), nil
|
||||
}
|
||||
|
||||
func (s Service) GetMetadata(hashID string) (*api.Metadata, error) {
|
||||
|
@ -170,6 +170,35 @@ func (m *MockcacheService) EXPECT() *MockcacheServiceMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Set mocks base method
|
||||
func (m *MockcacheService) Set(key, value string, ttl time.Duration) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Set", key, value, ttl)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Set indicates an expected call of Set
|
||||
func (mr *MockcacheServiceMockRecorder) Set(key, value, ttl interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockcacheService)(nil).Set), key, value, ttl)
|
||||
}
|
||||
|
||||
// Get mocks base method
|
||||
func (m *MockcacheService) Get(key string) (string, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Get", key)
|
||||
ret0, _ := ret[0].(string)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Get indicates an expected call of Get
|
||||
func (mr *MockcacheServiceMockRecorder) Get(key interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockcacheService)(nil).Get), key)
|
||||
}
|
||||
|
||||
// SaveItem mocks base method
|
||||
func (m *MockcacheService) SaveItem(key string, item interface{}, exp time.Duration) error {
|
||||
m.ctrl.T.Helper()
|
||||
@ -198,6 +227,40 @@ func (mr *MockcacheServiceMockRecorder) GetItem(key, item interface{}) *gomock.C
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetItem", reflect.TypeOf((*MockcacheService)(nil).GetItem), key, item)
|
||||
}
|
||||
|
||||
// SetMap mocks base method
|
||||
func (m *MockcacheService) SetMap(key string, fields map[string]interface{}, exp time.Duration) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SetMap", key, fields, exp)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SetMap indicates an expected call of SetMap
|
||||
func (mr *MockcacheServiceMockRecorder) SetMap(key, fields, exp interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMap", reflect.TypeOf((*MockcacheService)(nil).SetMap), key, fields, exp)
|
||||
}
|
||||
|
||||
// GetMap mocks base method
|
||||
func (m *MockcacheService) GetMap(key string, fields ...string) (map[string]string, error) {
|
||||
m.ctrl.T.Helper()
|
||||
varargs := []interface{}{key}
|
||||
for _, a := range fields {
|
||||
varargs = append(varargs, a)
|
||||
}
|
||||
ret := m.ctrl.Call(m, "GetMap", varargs...)
|
||||
ret0, _ := ret[0].(map[string]string)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetMap indicates an expected call of GetMap
|
||||
func (mr *MockcacheServiceMockRecorder) GetMap(key interface{}, fields ...interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
varargs := append([]interface{}{key}, fields...)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMap", reflect.TypeOf((*MockcacheService)(nil).GetMap), varargs...)
|
||||
}
|
||||
|
||||
// Invalidate mocks base method
|
||||
func (m *MockcacheService) Invalidate(key ...string) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -3,6 +3,7 @@
|
||||
package feeds
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -103,22 +104,19 @@ func TestService_GetFromCache(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
item := CacheItem{
|
||||
UpdatedAt: time.Now().UTC(),
|
||||
Feed: []byte("test"),
|
||||
item := map[string]string{
|
||||
updatedAtKey: strconv.FormatInt(time.Now().UTC().Unix(), 10),
|
||||
feedKey: "test",
|
||||
}
|
||||
|
||||
cache := NewMockcacheService(ctrl)
|
||||
cache.EXPECT().GetItem("123", gomock.Any()).DoAndReturn(func(_ string, ret *CacheItem) error {
|
||||
*ret = item
|
||||
return nil
|
||||
})
|
||||
cache.EXPECT().GetMap("123", feedKey, updatedAtKey, videoCountKey).Return(item, nil)
|
||||
|
||||
s := Service{cache: cache}
|
||||
|
||||
data, err := s.BuildFeed("123")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, item.Feed, data)
|
||||
assert.Equal(t, []byte("test"), data)
|
||||
}
|
||||
|
||||
func TestService_VerifyCache(t *testing.T) {
|
||||
@ -126,14 +124,13 @@ func TestService_VerifyCache(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
|
||||
cache := NewMockcacheService(ctrl)
|
||||
cache.EXPECT().GetItem("123", gomock.Any()).DoAndReturn(func(_ string, ret *CacheItem) error {
|
||||
ret.Feed = []byte("test")
|
||||
ret.UpdatedAt = time.Now().UTC().Add(-20 * time.Minute)
|
||||
ret.ItemCount = 30
|
||||
return nil
|
||||
})
|
||||
cache.EXPECT().GetMap("123", feedKey, updatedAtKey, videoCountKey).Return(map[string]string{
|
||||
feedKey: "test",
|
||||
updatedAtKey: strconv.FormatInt(time.Now().UTC().Add(-20 * time.Minute).Unix(), 10),
|
||||
videoCountKey: "30",
|
||||
}, nil)
|
||||
|
||||
cache.EXPECT().SaveItem("123", gomock.Any(), 15*24*time.Hour).Times(1).Return(nil)
|
||||
cache.EXPECT().SetMap("123", gomock.Any(), 15*24*time.Hour).Times(1).Return(nil)
|
||||
|
||||
stor := NewMockstorage(ctrl)
|
||||
stor.EXPECT().GetFeed(feed.HashID).Times(1).Return(feed, nil)
|
||||
@ -158,8 +155,8 @@ func TestService_BuildFeed(t *testing.T) {
|
||||
stor.EXPECT().GetFeed(feed.HashID).Times(1).Return(feed, nil)
|
||||
|
||||
cache := NewMockcacheService(ctrl)
|
||||
cache.EXPECT().GetItem(feed.HashID, gomock.Any()).Return(errors.New("not found"))
|
||||
cache.EXPECT().SaveItem(feed.HashID, gomock.Any(), gomock.Any()).Return(nil)
|
||||
cache.EXPECT().GetMap(feed.HashID, feedKey, updatedAtKey, videoCountKey).Return(nil, errors.New("not found"))
|
||||
cache.EXPECT().SetMap(feed.HashID, gomock.Any(), gomock.Any()).Return(nil)
|
||||
|
||||
podcast := itunes.New("", "", "", nil, nil)
|
||||
|
||||
@ -183,14 +180,13 @@ func TestService_RebuildCache(t *testing.T) {
|
||||
stor.EXPECT().GetFeed(feed.HashID).Times(1).Return(feed, nil)
|
||||
|
||||
cache := NewMockcacheService(ctrl)
|
||||
cache.EXPECT().GetItem("123", gomock.Any()).DoAndReturn(func(_ string, ret *CacheItem) error {
|
||||
ret.Feed = []byte("test")
|
||||
ret.UpdatedAt = time.Now().UTC().Add(-20 * time.Minute)
|
||||
ret.ItemCount = 30
|
||||
return nil
|
||||
})
|
||||
cache.EXPECT().GetMap("123", feedKey, updatedAtKey, videoCountKey).Return(map[string]string{
|
||||
feedKey: "test",
|
||||
updatedAtKey: strconv.FormatInt(time.Now().UTC().Add(-20 * time.Minute).Unix(), 10),
|
||||
videoCountKey: "30",
|
||||
}, nil)
|
||||
|
||||
cache.EXPECT().SaveItem(feed.HashID, gomock.Any(), gomock.Any()).Return(nil)
|
||||
cache.EXPECT().SetMap(feed.HashID, gomock.Any(), gomock.Any()).Return(nil)
|
||||
|
||||
podcast := itunes.New("", "", "", nil, nil)
|
||||
|
||||
@ -211,7 +207,7 @@ func TestService_WrongID(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
|
||||
cache := NewMockcacheService(ctrl)
|
||||
cache.EXPECT().GetItem(gomock.Any(), gomock.Any()).Return(errors.New("not found"))
|
||||
cache.EXPECT().GetMap(gomock.Any(), feedKey, updatedAtKey, videoCountKey).Return(nil, errors.New("not found"))
|
||||
|
||||
stor := NewMockstorage(ctrl)
|
||||
stor.EXPECT().GetFeed(gomock.Any()).Times(1).Return(nil, errors.New("not found"))
|
||||
|
Reference in New Issue
Block a user