Use compression to fit dynamodb's limits, rework Redis caching

This commit is contained in:
Maksym Pavlenko
2019-05-18 00:23:40 -07:00
parent c5473e3bba
commit eb166c7d49
10 changed files with 225 additions and 124 deletions
+1 -3
View File
@@ -4,13 +4,12 @@ require (
cloud.google.com/go v0.25.0 // indirect
github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20170929212804-61590edac4c7
github.com/aws/aws-lambda-go v1.10.0
github.com/aws/aws-sdk-go v1.15.81
github.com/boj/redistore v0.0.0-20160128113310-fc113767cd6b // indirect
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20180621172731-4e5d6d543851 // indirect
github.com/eduncan911/podcast v1.3.0 // indirect
github.com/garyburd/redigo v1.6.0 // indirect
github.com/gin-contrib/cache v1.1.0
github.com/gin-contrib/gzip v0.0.1
github.com/gin-contrib/sessions v0.0.0-20170731012558-a71ea9167c61
github.com/gin-gonic/gin v1.3.0
@@ -21,7 +20,6 @@ require (
github.com/jessevdk/go-flags v1.4.0
github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a // indirect
github.com/kidstuff/mongostore v0.0.0-20180412085134-db2a8b4fac1f // indirect
github.com/memcachier/mc v2.0.1+incompatible // indirect
github.com/mxpv/patreon-go v0.0.0-20180807002359-67dbab1ad14c
github.com/mxpv/podcast v0.0.0-20170823220358-fe328ad87d18
github.com/onsi/ginkgo v1.7.0 // indirect
+6 -2
View File
@@ -4,8 +4,6 @@ github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec h1:1VPruZMM1WQC7P
github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec/go.mod h1:+hWo/MWgY8VtjZvdrYM2nPRMaK40zX2iPsH/qD0+Xs0=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20170929212804-61590edac4c7 h1:Clo7QBZv+fHzjCgVp4ELlbIsY5rScCmj+4VCfoMfqtQ=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20170929212804-61590edac4c7/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
github.com/aws/aws-lambda-go v1.10.0 h1:uafgdfYGQD0UeT7d2uKdyWW8j/ZYRifRPIdmeqLzLCk=
github.com/aws/aws-lambda-go v1.10.0/go.mod h1:zUsUQhAUjYzR8AuduJPCfhBuKWUaDbQiPOG+ouzmE1A=
github.com/aws/aws-sdk-go v1.15.81 h1:va7uoFaV9uKAtZ6BTmp1u7paoMsizYRRLvRuoC07nQ8=
github.com/aws/aws-sdk-go v1.15.81/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
github.com/boj/redistore v0.0.0-20160128113310-fc113767cd6b h1:PfxLkkgJYE095CKZji++BNwZjxWfoAF21WFPzkzOZEs=
@@ -22,6 +20,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc=
github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/gin-contrib/cache v1.1.0 h1:lM8B4YtzdQQM6ThTlvtNPeBNfW1mNdh/CMFQfenH1dk=
github.com/gin-contrib/cache v1.1.0/go.mod h1:9ylpYjLq309/y5hTpyuDxfPG+V6QlSB56vrWe6OhoLQ=
github.com/gin-contrib/gzip v0.0.1 h1:ezvKOL6jH+jlzdHNE4h9h8q8uMpDQjyl0NN0Jd7jozc=
github.com/gin-contrib/gzip v0.0.1/go.mod h1:fGBJBCdt6qCZuCAOwWuFhBB4OOq9EFqlo5dEaFhhu5w=
github.com/gin-contrib/sessions v0.0.0-20170731012558-a71ea9167c61 h1:PK7FuRfHB1/teDEyN/+STsl3BHJn3Uf7ZbZTknsSj0Q=
@@ -38,6 +38,8 @@ github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
@@ -79,6 +81,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 h1:pyecQtsPmlkCsMkYhT5iZ+sUXuwee+OvfuJjinEA3ko=
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62/go.mod h1:65XQgovT59RWatovFwnwocoUxiI/eENTnOY5GK3STuY=
github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c h1:KhHx/Ta3c9C1gcSo5UhDeo/D4JnhnxJTrlcOEOFiMfY=
github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c/go.mod h1:10FeaKUMy5t3KLsYfy54dFrq0rpwcfyKkKcF7vRGIRY=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
+45 -39
View File
@@ -10,7 +10,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/cache"
"github.com/mxpv/podsync/pkg/model"
)
@@ -136,48 +135,43 @@ func makeEnclosure(feed *model.Feed, id string, lengthInBytes int64) (string, it
return url, contentType, lengthInBytes
}
func short(str string, i int) string {
runes := []rune(str)
if len(runes) > i {
return string(runes[:i]) + " ..."
func (s *Service) updateFromRedis(hashID string, feed *model.Feed) {
key := fmt.Sprintf("feeds/%s", hashID)
redisFeed := &model.Feed{}
if err := s.cache.GetItem(key, redisFeed); err != nil {
return
}
return str
feed.Episodes = redisFeed.Episodes
feed.UpdatedAt = redisFeed.UpdatedAt
feed.LastID = redisFeed.LastID
feed.ItemURL = redisFeed.ItemURL
feed.Author = redisFeed.Author
feed.PubDate = redisFeed.PubDate
feed.Description = redisFeed.Description
feed.Title = redisFeed.Title
feed.Language = redisFeed.Language
feed.Explicit = redisFeed.Explicit
feed.CoverArt = redisFeed.CoverArt
}
func (s *Service) BuildFeed(hashID string) ([]byte, error) {
cached, err := s.cache.Get(hashID)
if err == nil {
return []byte(cached), nil
}
logger := log.WithField("hash_id", hashID)
var (
key = fmt.Sprintf("feeds/%s", hashID)
now = time.Now().UTC()
feed = &model.Feed{}
logger = log.WithField("hash_id", hashID)
now = time.Now().UTC()
)
if err = s.cache.GetItem(key, feed); err != nil {
// If not found, get from DynamoDB
if err == cache.ErrNotFound {
logger.Warnf("getting feed from Dynamo %s", hashID)
if f, err := s.QueryFeed(hashID); err != nil {
return nil, err
} else {
feed = f
}
} else {
return nil, err
}
feed, err := s.QueryFeed(hashID)
if err != nil {
logger.WithError(err).Error("failed to query feed from dynamodb")
return nil, err
}
const (
updateTTL = 15 * time.Minute
recordTTL = 90 * 24 * time.Hour
)
s.updateFromRedis(hashID, feed)
oldLastID := feed.LastID
const updateTTL = 15 * time.Minute
if now.Sub(feed.UpdatedAt) < updateTTL {
if podcast, err := s.buildPodcast(feed); err != nil {
return nil, err
@@ -202,14 +196,24 @@ func (s *Service) BuildFeed(hashID string) ([]byte, error) {
if err := builder.Build(feed); err != nil {
logger.WithError(err).Error("failed to build feed")
// Save error to cache to avoid spamming
_ = s.cache.Set(hashID, err.Error(), updateTTL)
return nil, err
}
// Format podcast
if feed.PageSize < len(feed.Episodes) {
feed.Episodes = feed.Episodes[:feed.PageSize]
}
feed.LastAccess = time.Now().UTC()
// Don't zero last seen ID if failed to get updates
if feed.LastID == "" {
feed.LastID = oldLastID
logger.Warnf("failed to get updates for %s (builder: %s)", hashID, builderType)
}
podcast, err := s.buildPodcast(feed)
if err != nil {
return nil, err
@@ -217,10 +221,12 @@ func (s *Service) BuildFeed(hashID string) ([]byte, error) {
// Save to storage
logger.Infof("saving feed %s", hashID)
if err := s.cache.SaveItem(key, feed, recordTTL); err != nil {
logger.WithError(err).Error("failed to save feed")
return nil, err
if oldLastID != feed.LastID {
logger.Infof("updating feed %s (last id: %q, new id: %q)", hashID, oldLastID, feed.LastID)
if err := s.storage.UpdateFeed(feed); err != nil {
logger.WithError(err).Error("failed to save feed")
return nil, err
}
}
return []byte(podcast.String()), nil
+24 -18
View File
@@ -36,7 +36,6 @@ func (m *MockBuilder) EXPECT() *MockBuilderMockRecorder {
// Build mocks base method
func (m *MockBuilder) Build(feed *model.Feed) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Build", feed)
ret0, _ := ret[0].(error)
return ret0
@@ -44,7 +43,6 @@ func (m *MockBuilder) Build(feed *model.Feed) error {
// Build indicates an expected call of Build
func (mr *MockBuilderMockRecorder) Build(feed interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockBuilder)(nil).Build), feed)
}
@@ -73,7 +71,6 @@ func (m *Mockstorage) EXPECT() *MockstorageMockRecorder {
// SaveFeed mocks base method
func (m *Mockstorage) SaveFeed(feed *model.Feed) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SaveFeed", feed)
ret0, _ := ret[0].(error)
return ret0
@@ -81,13 +78,11 @@ func (m *Mockstorage) SaveFeed(feed *model.Feed) error {
// SaveFeed indicates an expected call of SaveFeed
func (mr *MockstorageMockRecorder) SaveFeed(feed interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveFeed", reflect.TypeOf((*Mockstorage)(nil).SaveFeed), feed)
}
// GetFeed mocks base method
func (m *Mockstorage) GetFeed(hashID string) (*model.Feed, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetFeed", hashID)
ret0, _ := ret[0].(*model.Feed)
ret1, _ := ret[1].(error)
@@ -96,13 +91,11 @@ func (m *Mockstorage) GetFeed(hashID string) (*model.Feed, error) {
// GetFeed indicates an expected call of GetFeed
func (mr *MockstorageMockRecorder) GetFeed(hashID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFeed", reflect.TypeOf((*Mockstorage)(nil).GetFeed), hashID)
}
// UpdateFeed mocks base method
func (m *Mockstorage) UpdateFeed(feed *model.Feed) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateFeed", feed)
ret0, _ := ret[0].(error)
return ret0
@@ -110,13 +103,11 @@ func (m *Mockstorage) UpdateFeed(feed *model.Feed) error {
// UpdateFeed indicates an expected call of UpdateFeed
func (mr *MockstorageMockRecorder) UpdateFeed(feed interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateFeed", reflect.TypeOf((*Mockstorage)(nil).UpdateFeed), feed)
}
// GetMetadata mocks base method
func (m *Mockstorage) GetMetadata(hashID string) (*model.Feed, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetMetadata", hashID)
ret0, _ := ret[0].(*model.Feed)
ret1, _ := ret[1].(error)
@@ -125,13 +116,11 @@ func (m *Mockstorage) GetMetadata(hashID string) (*model.Feed, error) {
// GetMetadata indicates an expected call of GetMetadata
func (mr *MockstorageMockRecorder) GetMetadata(hashID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetadata", reflect.TypeOf((*Mockstorage)(nil).GetMetadata), hashID)
}
// Downgrade mocks base method
func (m *Mockstorage) Downgrade(userID string, featureLevel int) ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Downgrade", userID, featureLevel)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(error)
@@ -140,7 +129,6 @@ func (m *Mockstorage) Downgrade(userID string, featureLevel int) ([]string, erro
// Downgrade indicates an expected call of Downgrade
func (mr *MockstorageMockRecorder) Downgrade(userID, featureLevel interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Downgrade", reflect.TypeOf((*Mockstorage)(nil).Downgrade), userID, featureLevel)
}
@@ -169,7 +157,6 @@ func (m *MockcacheService) EXPECT() *MockcacheServiceMockRecorder {
// Set mocks base method
func (m *MockcacheService) Set(key, value string, ttl time.Duration) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Set", key, value, ttl)
ret0, _ := ret[0].(error)
return ret0
@@ -177,13 +164,11 @@ func (m *MockcacheService) Set(key, value string, ttl time.Duration) error {
// Set indicates an expected call of Set
func (mr *MockcacheServiceMockRecorder) Set(key, value, ttl interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockcacheService)(nil).Set), key, value, ttl)
}
// Get mocks base method
func (m *MockcacheService) Get(key string) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", key)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
@@ -192,13 +177,35 @@ func (m *MockcacheService) Get(key string) (string, error) {
// Get indicates an expected call of Get
func (mr *MockcacheServiceMockRecorder) Get(key interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockcacheService)(nil).Get), key)
}
// SaveItem mocks base method
func (m *MockcacheService) SaveItem(key string, item interface{}, exp time.Duration) error {
ret := m.ctrl.Call(m, "SaveItem", key, item, exp)
ret0, _ := ret[0].(error)
return ret0
}
// SaveItem indicates an expected call of SaveItem
func (mr *MockcacheServiceMockRecorder) SaveItem(key, item, exp interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveItem", reflect.TypeOf((*MockcacheService)(nil).SaveItem), key, item, exp)
}
// GetItem mocks base method
func (m *MockcacheService) GetItem(key string, item interface{}) error {
ret := m.ctrl.Call(m, "GetItem", key, item)
ret0, _ := ret[0].(error)
return ret0
}
// GetItem indicates an expected call of GetItem
func (mr *MockcacheServiceMockRecorder) GetItem(key, item interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetItem", reflect.TypeOf((*MockcacheService)(nil).GetItem), key, item)
}
// Invalidate mocks base method
func (m *MockcacheService) Invalidate(key ...string) error {
m.ctrl.T.Helper()
varargs := []interface{}{}
for _, a := range key {
varargs = append(varargs, a)
@@ -210,6 +217,5 @@ func (m *MockcacheService) Invalidate(key ...string) error {
// Invalidate indicates an expected call of Invalidate
func (mr *MockcacheServiceMockRecorder) Invalidate(key ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invalidate", reflect.TypeOf((*MockcacheService)(nil).Invalidate), key...)
}
+3 -22
View File
@@ -4,11 +4,9 @@ package feeds
import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mxpv/podsync/pkg/api"
@@ -100,20 +98,6 @@ func TestService_QueryFeed(t *testing.T) {
require.NoError(t, err)
}
func TestService_GetFromCache(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cache := NewMockcacheService(ctrl)
cache.EXPECT().Get("123").Return("test", nil)
s := &Service{cache: cache}
data, err := s.BuildFeed("123")
assert.NoError(t, err)
assert.Equal(t, []byte("test"), data)
}
func TestService_BuildFeed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -123,12 +107,12 @@ func TestService_BuildFeed(t *testing.T) {
stor.EXPECT().UpdateFeed(feed).Return(nil)
cache := NewMockcacheService(ctrl)
cache.EXPECT().Get(feed.HashID).Return("", errors.New("not found"))
cache.EXPECT().Set(feed.HashID, gomock.Any(), 1*time.Hour).Return(nil)
cache.EXPECT().GetItem("feeds/123", gomock.Any()).Return(errors.New("not found"))
builder := NewMockBuilder(ctrl)
builder.EXPECT().Build(feed).Return(nil).Do(func(feed *model.Feed) {
feed.Episodes = append(feed.Episodes, feed.Episodes[0])
feed.LastID = "1"
})
s := Service{storage: stor, cache: cache, builders: map[api.Provider]Builder{
@@ -143,13 +127,10 @@ func TestService_WrongID(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cache := NewMockcacheService(ctrl)
cache.EXPECT().Get(gomock.Any()).Return("", errors.New("not found"))
stor := NewMockstorage(ctrl)
stor.EXPECT().GetFeed(gomock.Any()).Times(1).Return(nil, errors.New("not found"))
s := &Service{storage: stor, cache: cache}
s := &Service{storage: stor}
_, err := s.BuildFeed("invalid_feed_id")
require.Error(t, err)
+7 -1
View File
@@ -5,7 +5,10 @@ import (
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/gin-contrib/cache"
"github.com/gin-contrib/cache/persistence"
"github.com/gin-contrib/gzip"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin"
@@ -54,6 +57,8 @@ func New(feed feedService, support patreonService, opts Opts) http.Handler {
r.Use(gin.Recovery())
r.Use(gzip.Gzip(gzip.DefaultCompression))
cacheStore := persistence.NewRedisCache("redis:6379", "", time.Second)
store := sessions.NewCookieStore([]byte(opts.CookieSecret))
r.Use(sessions.Sessions("podsync", store))
@@ -88,7 +93,8 @@ func New(feed feedService, support patreonService, opts Opts) http.Handler {
r.GET("/api/metadata/:hashId", h.metadata)
r.POST("/api/webhooks", h.webhook)
r.NoRoute(h.getFeed)
const feedTTL = 30 * time.Minute
r.NoRoute(cache.CachePage(cacheStore, feedTTL, h.getFeed))
return r
}
+2 -1
View File
@@ -54,7 +54,8 @@ type Feed struct {
PubDate time.Time `dynamodbav:",unixtime"`
Author string
ItemURL string // Platform specific URL
Episodes []*Item // Array of episodes
LastID string // Last seen video URL ID (for incremental updater)
UpdatedAt time.Time `dynamodbav:",unixtime"`
Episodes []*Item `dynamodbav:"-"` // Array of episodes, serialized as gziped EpisodesData in DynamoDB
EpisodesData []byte
}
+39
View File
@@ -0,0 +1,39 @@
// Compression helpers to reduce object size in DynamoDB table.
// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-use-s3-too.html
package storage
import (
"bytes"
"compress/gzip"
"encoding/json"
)
func compressObj(obj interface{}) ([]byte, error) {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func decompressObj(data []byte, obj interface{}) error {
r, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return err
}
return json.NewDecoder(r).Decode(obj)
}
+25
View File
@@ -0,0 +1,25 @@
package storage
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/mxpv/podsync/pkg/model"
)
func TestCompress(t *testing.T) {
inData := []model.Item{
{ID: "1", Title: "title1"},
{ID: "2", Title: "title2"},
}
data, err := compressObj(inData)
assert.NoError(t, err)
var outData []model.Item
err = decompressObj(data, &outData)
assert.NoError(t, err)
assert.ObjectsAreEqual(inData, outData)
}
+73 -38
View File
@@ -25,7 +25,7 @@ const (
anonymousUserName = "anonymous"
// Update LastAccess field every hour
feedLastAccessUpdatePeriod = time.Hour
feedLastAccessUpdatePeriod = time.Hour * 24
feedTimeToLive = time.Hour * 24 * 90
)
@@ -100,16 +100,31 @@ func (d Dynamo) SaveFeed(feed *model.Feed) error {
feed.UserID = anonymousUserName
}
now := time.Now().UTC()
var (
err error
now = time.Now().UTC()
)
feed.LastAccess = now
feed.ExpirationTime = now.Add(feedTimeToLive)
// Compress episodes
feed.EpisodesData, err = compressObj(feed.Episodes)
if err != nil {
return err
}
// Marshal to DynamoDB's format
item, err := attr.MarshalMap(feed)
if err != nil {
logger.WithError(err).Error("failed to marshal feed model")
return err
}
// Submit request
input := &dynamodb.PutItemInput{
TableName: d.FeedsTableName,
Item: item,
@@ -127,7 +142,7 @@ func (d Dynamo) SaveFeed(feed *model.Feed) error {
func (d Dynamo) GetFeed(hashID string) (*model.Feed, error) {
logger := log.WithField("hash_id", hashID)
logger.Debug("getting feed")
// Submit get request
getInput := &dynamodb.GetItemInput{
TableName: d.FeedsTableName,
@@ -146,6 +161,8 @@ func (d Dynamo) GetFeed(hashID string) (*model.Feed, error) {
return nil, errors.New("not found")
}
// Unmarshal data
var feed model.Feed
if err := attr.UnmarshalMap(getOutput.Item, &feed); err != nil {
// TODO: remove this
@@ -160,61 +177,81 @@ func (d Dynamo) GetFeed(hashID string) (*model.Feed, error) {
}
// Check if we need to update LastAccess field (no more than once per hour)
now := time.Now().UTC()
if feed.LastAccess.Add(feedLastAccessUpdatePeriod).Before(now) {
logger.Debugf("updating feed's last access timestamp")
// Set LastAccess field to now
// Set ExpirationTime field to now + feedTimeToLive
updateExpression, err := expr.
NewBuilder().
WithUpdate(expr.
Set(expr.Name("LastAccess"), expr.Value(now.Unix())).
Set(expr.Name("ExpirationTime"), expr.Value(now.Add(feedTimeToLive).Unix()))).
Build()
if err != nil {
logger.WithError(err).Error("failed to build update expression")
return nil, err
}
updateInput := &dynamodb.UpdateItemInput{
TableName: d.FeedsTableName,
Key: getInput.Key,
UpdateExpression: updateExpression.Update(),
ExpressionAttributeNames: updateExpression.Names(),
ExpressionAttributeValues: updateExpression.Values(),
}
_, err = d.dynamo.UpdateItem(updateInput)
if err != nil {
logger.WithError(err).Error("failed to update feed item")
if err := d.updateLastAccess(getInput.Key); err != nil {
logger.WithError(err).Error("failed to update feed's last access")
return nil, err
}
feed.LastAccess = now
}
// Decompress episodes
if len(feed.EpisodesData) > 0 {
if err := decompressObj(feed.EpisodesData, &feed.Episodes); err != nil {
return nil, errors.Wrap(err, "failed to decompress episodes")
}
}
return &feed, nil
}
func (d Dynamo) UpdateFeed(feed *model.Feed) error {
log.Infof("saving feed %q", feed.HashID)
func (d Dynamo) updateLastAccess(key map[string]*dynamodb.AttributeValue) error {
now := time.Now().UTC()
// Set LastAccess field to now
// Set ExpirationTime field to now + feedTimeToLive
updateExpression, err := expr.
NewBuilder().
WithUpdate(expr.
Set(expr.Name("LastAccess"), expr.Value(now.Unix())).
Set(expr.Name("ExpirationTime"), expr.Value(now.Add(feedTimeToLive).Unix()))).
Build()
if err != nil {
return err
}
updateInput := &dynamodb.UpdateItemInput{
TableName: d.FeedsTableName,
Key: key,
UpdateExpression: updateExpression.Update(),
ExpressionAttributeNames: updateExpression.Names(),
ExpressionAttributeValues: updateExpression.Values(),
}
_, err = d.dynamo.UpdateItem(updateInput)
if err != nil {
return err
}
return nil
}
func (d Dynamo) UpdateFeed(feed *model.Feed) error {
var (
pubDate = feed.PubDate.Unix()
updatedAt = feed.LastAccess.Unix()
)
episodesData, err := compressObj(feed.Episodes)
if err != nil {
return errors.Wrap(err, "failed to compress episodes for update")
}
update := expr.
Set(expr.Name("Title"), expr.Value(feed.Title)).
Set(expr.Name("Description"), expr.Value(feed.Description)).
Set(expr.Name("PubDate"), expr.Value(pubDate)).
Set(expr.Name("Author"), expr.Value(feed.Author)).
Set(expr.Name("ItemURL"), expr.Value(feed.ItemURL)).
Set(expr.Name("Episodes"), expr.Value(feed.Episodes)).
Set(expr.Name("LastID"), expr.Value(feed.LastID)).
Set(expr.Name("UpdatedAt"), expr.Value(updatedAt))
Set(expr.Name("UpdatedAt"), expr.Value(updatedAt)).
Set(expr.Name("EpisodesData"), expr.Value(episodesData)). // Serialized episodes
Remove(expr.Name("Episodes")) // Remove old field to save space
expression, err := expr.NewBuilder().WithUpdate(update).Build()
if err != nil {
@@ -353,8 +390,7 @@ func (d Dynamo) Downgrade(userID string, featureLevel int) ([]string, error) {
NewBuilder().
WithUpdate(expr.
Set(expr.Name("PageSize"), expr.Value(150)).
Set(expr.Name("FeatureLevel"), expr.Value(api.ExtendedFeatures)).
Set(expr.Name("LastID"), expr.Value(""))).
Set(expr.Name("FeatureLevel"), expr.Value(api.ExtendedFeatures))).
WithCondition(expr.
Name("PageSize").GreaterThan(expr.Value(150))).
Build()
@@ -391,8 +427,7 @@ func (d Dynamo) Downgrade(userID string, featureLevel int) ([]string, error) {
Set(expr.Name("PageSize"), expr.Value(50)).
Set(expr.Name("FeatureLevel"), expr.Value(api.DefaultFeatures)).
Set(expr.Name("Format"), expr.Value(api.FormatVideo)).
Set(expr.Name("Quality"), expr.Value(api.QualityHigh)).
Set(expr.Name("LastID"), expr.Value(""))).
Set(expr.Name("Quality"), expr.Value(api.QualityHigh))).
Build()
if err != nil {