From eb166c7d496e807b7c3127217fbd2b9ec02fb771 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Sat, 18 May 2019 00:23:40 -0700 Subject: [PATCH] Use compression to fit dynamodb's limits, rework Redis caching --- go.mod | 4 +- go.sum | 8 ++- pkg/feeds/feeds.go | 84 +++++++++++++----------- pkg/feeds/feeds_mock_test.go | 42 ++++++------ pkg/feeds/feeds_test.go | 25 +------ pkg/handler/handler.go | 8 ++- pkg/model/model.go | 3 +- pkg/storage/compression.go | 39 +++++++++++ pkg/storage/compression_test.go | 25 +++++++ pkg/storage/dynamo.go | 111 +++++++++++++++++++++----------- 10 files changed, 225 insertions(+), 124 deletions(-) create mode 100644 pkg/storage/compression.go create mode 100644 pkg/storage/compression_test.go diff --git a/go.mod b/go.mod index 7730589..7e80e0b 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,12 @@ require ( cloud.google.com/go v0.25.0 // indirect github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20170929212804-61590edac4c7 - github.com/aws/aws-lambda-go v1.10.0 github.com/aws/aws-sdk-go v1.15.81 github.com/boj/redistore v0.0.0-20160128113310-fc113767cd6b // indirect - github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20180621172731-4e5d6d543851 // indirect github.com/eduncan911/podcast v1.3.0 // indirect github.com/garyburd/redigo v1.6.0 // indirect + github.com/gin-contrib/cache v1.1.0 github.com/gin-contrib/gzip v0.0.1 github.com/gin-contrib/sessions v0.0.0-20170731012558-a71ea9167c61 github.com/gin-gonic/gin v1.3.0 @@ -21,7 +20,6 @@ require ( github.com/jessevdk/go-flags v1.4.0 github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a // indirect github.com/kidstuff/mongostore v0.0.0-20180412085134-db2a8b4fac1f // indirect - github.com/memcachier/mc v2.0.1+incompatible // indirect github.com/mxpv/patreon-go v0.0.0-20180807002359-67dbab1ad14c github.com/mxpv/podcast v0.0.0-20170823220358-fe328ad87d18 github.com/onsi/ginkgo v1.7.0 // indirect diff --git a/go.sum b/go.sum index e636d24..1511065 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec h1:1VPruZMM1WQC7P github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec/go.mod h1:+hWo/MWgY8VtjZvdrYM2nPRMaK40zX2iPsH/qD0+Xs0= github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20170929212804-61590edac4c7 h1:Clo7QBZv+fHzjCgVp4ELlbIsY5rScCmj+4VCfoMfqtQ= github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20170929212804-61590edac4c7/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo= -github.com/aws/aws-lambda-go v1.10.0 h1:uafgdfYGQD0UeT7d2uKdyWW8j/ZYRifRPIdmeqLzLCk= -github.com/aws/aws-lambda-go v1.10.0/go.mod h1:zUsUQhAUjYzR8AuduJPCfhBuKWUaDbQiPOG+ouzmE1A= github.com/aws/aws-sdk-go v1.15.81 h1:va7uoFaV9uKAtZ6BTmp1u7paoMsizYRRLvRuoC07nQ8= github.com/aws/aws-sdk-go v1.15.81/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= github.com/boj/redistore v0.0.0-20160128113310-fc113767cd6b h1:PfxLkkgJYE095CKZji++BNwZjxWfoAF21WFPzkzOZEs= @@ -22,6 +20,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc= github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= +github.com/gin-contrib/cache v1.1.0 h1:lM8B4YtzdQQM6ThTlvtNPeBNfW1mNdh/CMFQfenH1dk= +github.com/gin-contrib/cache v1.1.0/go.mod h1:9ylpYjLq309/y5hTpyuDxfPG+V6QlSB56vrWe6OhoLQ= github.com/gin-contrib/gzip v0.0.1 h1:ezvKOL6jH+jlzdHNE4h9h8q8uMpDQjyl0NN0Jd7jozc= github.com/gin-contrib/gzip v0.0.1/go.mod h1:fGBJBCdt6qCZuCAOwWuFhBB4OOq9EFqlo5dEaFhhu5w= github.com/gin-contrib/sessions v0.0.0-20170731012558-a71ea9167c61 h1:PK7FuRfHB1/teDEyN/+STsl3BHJn3Uf7ZbZTknsSj0Q= @@ -38,6 +38,8 @@ github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= @@ -79,6 +81,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 h1:pyecQtsPmlkCsMkYhT5iZ+sUXuwee+OvfuJjinEA3ko= +github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62/go.mod h1:65XQgovT59RWatovFwnwocoUxiI/eENTnOY5GK3STuY= github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c h1:KhHx/Ta3c9C1gcSo5UhDeo/D4JnhnxJTrlcOEOFiMfY= github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c/go.mod h1:10FeaKUMy5t3KLsYfy54dFrq0rpwcfyKkKcF7vRGIRY= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= diff --git a/pkg/feeds/feeds.go b/pkg/feeds/feeds.go index 908d8d4..98f86c1 100644 --- a/pkg/feeds/feeds.go +++ b/pkg/feeds/feeds.go @@ -10,7 +10,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/mxpv/podsync/pkg/api" - "github.com/mxpv/podsync/pkg/cache" "github.com/mxpv/podsync/pkg/model" ) @@ -136,48 +135,43 @@ func makeEnclosure(feed *model.Feed, id string, lengthInBytes int64) (string, it return url, contentType, lengthInBytes } -func short(str string, i int) string { - runes := []rune(str) - if len(runes) > i { - return string(runes[:i]) + " ..." +func (s *Service) updateFromRedis(hashID string, feed *model.Feed) { + key := fmt.Sprintf("feeds/%s", hashID) + redisFeed := &model.Feed{} + if err := s.cache.GetItem(key, redisFeed); err != nil { + return } - return str + feed.Episodes = redisFeed.Episodes + feed.UpdatedAt = redisFeed.UpdatedAt + feed.LastID = redisFeed.LastID + feed.ItemURL = redisFeed.ItemURL + feed.Author = redisFeed.Author + feed.PubDate = redisFeed.PubDate + feed.Description = redisFeed.Description + feed.Title = redisFeed.Title + feed.Language = redisFeed.Language + feed.Explicit = redisFeed.Explicit + feed.CoverArt = redisFeed.CoverArt } func (s *Service) BuildFeed(hashID string) ([]byte, error) { - cached, err := s.cache.Get(hashID) - if err == nil { - return []byte(cached), nil - } - - logger := log.WithField("hash_id", hashID) - var ( - key = fmt.Sprintf("feeds/%s", hashID) - now = time.Now().UTC() - feed = &model.Feed{} + logger = log.WithField("hash_id", hashID) + now = time.Now().UTC() ) - if err = s.cache.GetItem(key, feed); err != nil { - // If not found, get from DynamoDB - if err == cache.ErrNotFound { - logger.Warnf("getting feed from Dynamo %s", hashID) - if f, err := s.QueryFeed(hashID); err != nil { - return nil, err - } else { - feed = f - } - } else { - return nil, err - } + feed, err := s.QueryFeed(hashID) + if err != nil { + logger.WithError(err).Error("failed to query feed from dynamodb") + return nil, err } - const ( - updateTTL = 15 * time.Minute - recordTTL = 90 * 24 * time.Hour - ) + s.updateFromRedis(hashID, feed) + oldLastID := feed.LastID + + const updateTTL = 15 * time.Minute if now.Sub(feed.UpdatedAt) < updateTTL { if podcast, err := s.buildPodcast(feed); err != nil { return nil, err @@ -202,14 +196,24 @@ func (s *Service) BuildFeed(hashID string) ([]byte, error) { if err := builder.Build(feed); err != nil { logger.WithError(err).Error("failed to build feed") - // Save error to cache to avoid spamming - _ = s.cache.Set(hashID, err.Error(), updateTTL) - return nil, err } // Format podcast + if feed.PageSize < len(feed.Episodes) { + feed.Episodes = feed.Episodes[:feed.PageSize] + } + + feed.LastAccess = time.Now().UTC() + + // Don't zero last seen ID if failed to get updates + if feed.LastID == "" { + feed.LastID = oldLastID + + logger.Warnf("failed to get updates for %s (builder: %s)", hashID, builderType) + } + podcast, err := s.buildPodcast(feed) if err != nil { return nil, err @@ -217,10 +221,12 @@ func (s *Service) BuildFeed(hashID string) ([]byte, error) { // Save to storage - logger.Infof("saving feed %s", hashID) - if err := s.cache.SaveItem(key, feed, recordTTL); err != nil { - logger.WithError(err).Error("failed to save feed") - return nil, err + if oldLastID != feed.LastID { + logger.Infof("updating feed %s (last id: %q, new id: %q)", hashID, oldLastID, feed.LastID) + if err := s.storage.UpdateFeed(feed); err != nil { + logger.WithError(err).Error("failed to save feed") + return nil, err + } } return []byte(podcast.String()), nil diff --git a/pkg/feeds/feeds_mock_test.go b/pkg/feeds/feeds_mock_test.go index f97f9e8..290d4a4 100644 --- a/pkg/feeds/feeds_mock_test.go +++ b/pkg/feeds/feeds_mock_test.go @@ -36,7 +36,6 @@ func (m *MockBuilder) EXPECT() *MockBuilderMockRecorder { // Build mocks base method func (m *MockBuilder) Build(feed *model.Feed) error { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Build", feed) ret0, _ := ret[0].(error) return ret0 @@ -44,7 +43,6 @@ func (m *MockBuilder) Build(feed *model.Feed) error { // Build indicates an expected call of Build func (mr *MockBuilderMockRecorder) Build(feed interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockBuilder)(nil).Build), feed) } @@ -73,7 +71,6 @@ func (m *Mockstorage) EXPECT() *MockstorageMockRecorder { // SaveFeed mocks base method func (m *Mockstorage) SaveFeed(feed *model.Feed) error { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveFeed", feed) ret0, _ := ret[0].(error) return ret0 @@ -81,13 +78,11 @@ func (m *Mockstorage) SaveFeed(feed *model.Feed) error { // SaveFeed indicates an expected call of SaveFeed func (mr *MockstorageMockRecorder) SaveFeed(feed interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveFeed", reflect.TypeOf((*Mockstorage)(nil).SaveFeed), feed) } // GetFeed mocks base method func (m *Mockstorage) GetFeed(hashID string) (*model.Feed, error) { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetFeed", hashID) ret0, _ := ret[0].(*model.Feed) ret1, _ := ret[1].(error) @@ -96,13 +91,11 @@ func (m *Mockstorage) GetFeed(hashID string) (*model.Feed, error) { // GetFeed indicates an expected call of GetFeed func (mr *MockstorageMockRecorder) GetFeed(hashID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFeed", reflect.TypeOf((*Mockstorage)(nil).GetFeed), hashID) } // UpdateFeed mocks base method func (m *Mockstorage) UpdateFeed(feed *model.Feed) error { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateFeed", feed) ret0, _ := ret[0].(error) return ret0 @@ -110,13 +103,11 @@ func (m *Mockstorage) UpdateFeed(feed *model.Feed) error { // UpdateFeed indicates an expected call of UpdateFeed func (mr *MockstorageMockRecorder) UpdateFeed(feed interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateFeed", reflect.TypeOf((*Mockstorage)(nil).UpdateFeed), feed) } // GetMetadata mocks base method func (m *Mockstorage) GetMetadata(hashID string) (*model.Feed, error) { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetMetadata", hashID) ret0, _ := ret[0].(*model.Feed) ret1, _ := ret[1].(error) @@ -125,13 +116,11 @@ func (m *Mockstorage) GetMetadata(hashID string) (*model.Feed, error) { // GetMetadata indicates an expected call of GetMetadata func (mr *MockstorageMockRecorder) GetMetadata(hashID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetadata", reflect.TypeOf((*Mockstorage)(nil).GetMetadata), hashID) } // Downgrade mocks base method func (m *Mockstorage) Downgrade(userID string, featureLevel int) ([]string, error) { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Downgrade", userID, featureLevel) ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) @@ -140,7 +129,6 @@ func (m *Mockstorage) Downgrade(userID string, featureLevel int) ([]string, erro // Downgrade indicates an expected call of Downgrade func (mr *MockstorageMockRecorder) Downgrade(userID, featureLevel interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Downgrade", reflect.TypeOf((*Mockstorage)(nil).Downgrade), userID, featureLevel) } @@ -169,7 +157,6 @@ func (m *MockcacheService) EXPECT() *MockcacheServiceMockRecorder { // Set mocks base method func (m *MockcacheService) Set(key, value string, ttl time.Duration) error { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Set", key, value, ttl) ret0, _ := ret[0].(error) return ret0 @@ -177,13 +164,11 @@ func (m *MockcacheService) Set(key, value string, ttl time.Duration) error { // Set indicates an expected call of Set func (mr *MockcacheServiceMockRecorder) Set(key, value, ttl interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockcacheService)(nil).Set), key, value, ttl) } // Get mocks base method func (m *MockcacheService) Get(key string) (string, error) { - m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", key) ret0, _ := ret[0].(string) ret1, _ := ret[1].(error) @@ -192,13 +177,35 @@ func (m *MockcacheService) Get(key string) (string, error) { // Get indicates an expected call of Get func (mr *MockcacheServiceMockRecorder) Get(key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockcacheService)(nil).Get), key) } +// SaveItem mocks base method +func (m *MockcacheService) SaveItem(key string, item interface{}, exp time.Duration) error { + ret := m.ctrl.Call(m, "SaveItem", key, item, exp) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveItem indicates an expected call of SaveItem +func (mr *MockcacheServiceMockRecorder) SaveItem(key, item, exp interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveItem", reflect.TypeOf((*MockcacheService)(nil).SaveItem), key, item, exp) +} + +// GetItem mocks base method +func (m *MockcacheService) GetItem(key string, item interface{}) error { + ret := m.ctrl.Call(m, "GetItem", key, item) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetItem indicates an expected call of GetItem +func (mr *MockcacheServiceMockRecorder) GetItem(key, item interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetItem", reflect.TypeOf((*MockcacheService)(nil).GetItem), key, item) +} + // Invalidate mocks base method func (m *MockcacheService) Invalidate(key ...string) error { - m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range key { varargs = append(varargs, a) @@ -210,6 +217,5 @@ func (m *MockcacheService) Invalidate(key ...string) error { // Invalidate indicates an expected call of Invalidate func (mr *MockcacheServiceMockRecorder) Invalidate(key ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invalidate", reflect.TypeOf((*MockcacheService)(nil).Invalidate), key...) } diff --git a/pkg/feeds/feeds_test.go b/pkg/feeds/feeds_test.go index d28814c..9fd985f 100644 --- a/pkg/feeds/feeds_test.go +++ b/pkg/feeds/feeds_test.go @@ -4,11 +4,9 @@ package feeds import ( "testing" - "time" "github.com/golang/mock/gomock" "github.com/pkg/errors" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/mxpv/podsync/pkg/api" @@ -100,20 +98,6 @@ func TestService_QueryFeed(t *testing.T) { require.NoError(t, err) } -func TestService_GetFromCache(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - cache := NewMockcacheService(ctrl) - cache.EXPECT().Get("123").Return("test", nil) - - s := &Service{cache: cache} - - data, err := s.BuildFeed("123") - assert.NoError(t, err) - assert.Equal(t, []byte("test"), data) -} - func TestService_BuildFeed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -123,12 +107,12 @@ func TestService_BuildFeed(t *testing.T) { stor.EXPECT().UpdateFeed(feed).Return(nil) cache := NewMockcacheService(ctrl) - cache.EXPECT().Get(feed.HashID).Return("", errors.New("not found")) - cache.EXPECT().Set(feed.HashID, gomock.Any(), 1*time.Hour).Return(nil) + cache.EXPECT().GetItem("feeds/123", gomock.Any()).Return(errors.New("not found")) builder := NewMockBuilder(ctrl) builder.EXPECT().Build(feed).Return(nil).Do(func(feed *model.Feed) { feed.Episodes = append(feed.Episodes, feed.Episodes[0]) + feed.LastID = "1" }) s := Service{storage: stor, cache: cache, builders: map[api.Provider]Builder{ @@ -143,13 +127,10 @@ func TestService_WrongID(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - cache := NewMockcacheService(ctrl) - cache.EXPECT().Get(gomock.Any()).Return("", errors.New("not found")) - stor := NewMockstorage(ctrl) stor.EXPECT().GetFeed(gomock.Any()).Times(1).Return(nil, errors.New("not found")) - s := &Service{storage: stor, cache: cache} + s := &Service{storage: stor} _, err := s.BuildFeed("invalid_feed_id") require.Error(t, err) diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 41fb16d..d51adb3 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -5,7 +5,10 @@ import ( "io/ioutil" "net/http" "strings" + "time" + "github.com/gin-contrib/cache" + "github.com/gin-contrib/cache/persistence" "github.com/gin-contrib/gzip" "github.com/gin-contrib/sessions" "github.com/gin-gonic/gin" @@ -54,6 +57,8 @@ func New(feed feedService, support patreonService, opts Opts) http.Handler { r.Use(gin.Recovery()) r.Use(gzip.Gzip(gzip.DefaultCompression)) + cacheStore := persistence.NewRedisCache("redis:6379", "", time.Second) + store := sessions.NewCookieStore([]byte(opts.CookieSecret)) r.Use(sessions.Sessions("podsync", store)) @@ -88,7 +93,8 @@ func New(feed feedService, support patreonService, opts Opts) http.Handler { r.GET("/api/metadata/:hashId", h.metadata) r.POST("/api/webhooks", h.webhook) - r.NoRoute(h.getFeed) + const feedTTL = 30 * time.Minute + r.NoRoute(cache.CachePage(cacheStore, feedTTL, h.getFeed)) return r } diff --git a/pkg/model/model.go b/pkg/model/model.go index 539765b..59e0676 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -54,7 +54,8 @@ type Feed struct { PubDate time.Time `dynamodbav:",unixtime"` Author string ItemURL string // Platform specific URL - Episodes []*Item // Array of episodes LastID string // Last seen video URL ID (for incremental updater) UpdatedAt time.Time `dynamodbav:",unixtime"` + Episodes []*Item `dynamodbav:"-"` // Array of episodes, serialized as gziped EpisodesData in DynamoDB + EpisodesData []byte } diff --git a/pkg/storage/compression.go b/pkg/storage/compression.go new file mode 100644 index 0000000..489457e --- /dev/null +++ b/pkg/storage/compression.go @@ -0,0 +1,39 @@ +// Compression helpers to reduce object size in DynamoDB table. +// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-use-s3-too.html + +package storage + +import ( + "bytes" + "compress/gzip" + "encoding/json" +) + +func compressObj(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + if _, err := w.Write(data); err != nil { + return nil, err + } + + if err := w.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func decompressObj(data []byte, obj interface{}) error { + r, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return err + } + + return json.NewDecoder(r).Decode(obj) +} diff --git a/pkg/storage/compression_test.go b/pkg/storage/compression_test.go new file mode 100644 index 0000000..9e57751 --- /dev/null +++ b/pkg/storage/compression_test.go @@ -0,0 +1,25 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/mxpv/podsync/pkg/model" +) + +func TestCompress(t *testing.T) { + inData := []model.Item{ + {ID: "1", Title: "title1"}, + {ID: "2", Title: "title2"}, + } + + data, err := compressObj(inData) + assert.NoError(t, err) + + var outData []model.Item + err = decompressObj(data, &outData) + assert.NoError(t, err) + + assert.ObjectsAreEqual(inData, outData) +} diff --git a/pkg/storage/dynamo.go b/pkg/storage/dynamo.go index 8463ef7..d291613 100644 --- a/pkg/storage/dynamo.go +++ b/pkg/storage/dynamo.go @@ -25,7 +25,7 @@ const ( anonymousUserName = "anonymous" // Update LastAccess field every hour - feedLastAccessUpdatePeriod = time.Hour + feedLastAccessUpdatePeriod = time.Hour * 24 feedTimeToLive = time.Hour * 24 * 90 ) @@ -100,16 +100,31 @@ func (d Dynamo) SaveFeed(feed *model.Feed) error { feed.UserID = anonymousUserName } - now := time.Now().UTC() + var ( + err error + now = time.Now().UTC() + ) + feed.LastAccess = now feed.ExpirationTime = now.Add(feedTimeToLive) + // Compress episodes + + feed.EpisodesData, err = compressObj(feed.Episodes) + if err != nil { + return err + } + + // Marshal to DynamoDB's format + item, err := attr.MarshalMap(feed) if err != nil { logger.WithError(err).Error("failed to marshal feed model") return err } + // Submit request + input := &dynamodb.PutItemInput{ TableName: d.FeedsTableName, Item: item, @@ -127,7 +142,7 @@ func (d Dynamo) SaveFeed(feed *model.Feed) error { func (d Dynamo) GetFeed(hashID string) (*model.Feed, error) { logger := log.WithField("hash_id", hashID) - logger.Debug("getting feed") + // Submit get request getInput := &dynamodb.GetItemInput{ TableName: d.FeedsTableName, @@ -146,6 +161,8 @@ func (d Dynamo) GetFeed(hashID string) (*model.Feed, error) { return nil, errors.New("not found") } + // Unmarshal data + var feed model.Feed if err := attr.UnmarshalMap(getOutput.Item, &feed); err != nil { // TODO: remove this @@ -160,61 +177,81 @@ func (d Dynamo) GetFeed(hashID string) (*model.Feed, error) { } // Check if we need to update LastAccess field (no more than once per hour) + now := time.Now().UTC() if feed.LastAccess.Add(feedLastAccessUpdatePeriod).Before(now) { - logger.Debugf("updating feed's last access timestamp") - - // Set LastAccess field to now - // Set ExpirationTime field to now + feedTimeToLive - updateExpression, err := expr. - NewBuilder(). - WithUpdate(expr. - Set(expr.Name("LastAccess"), expr.Value(now.Unix())). - Set(expr.Name("ExpirationTime"), expr.Value(now.Add(feedTimeToLive).Unix()))). - Build() - - if err != nil { - logger.WithError(err).Error("failed to build update expression") - return nil, err - } - - updateInput := &dynamodb.UpdateItemInput{ - TableName: d.FeedsTableName, - Key: getInput.Key, - UpdateExpression: updateExpression.Update(), - ExpressionAttributeNames: updateExpression.Names(), - ExpressionAttributeValues: updateExpression.Values(), - } - - _, err = d.dynamo.UpdateItem(updateInput) - if err != nil { - logger.WithError(err).Error("failed to update feed item") + if err := d.updateLastAccess(getInput.Key); err != nil { + logger.WithError(err).Error("failed to update feed's last access") return nil, err } feed.LastAccess = now } + // Decompress episodes + + if len(feed.EpisodesData) > 0 { + if err := decompressObj(feed.EpisodesData, &feed.Episodes); err != nil { + return nil, errors.Wrap(err, "failed to decompress episodes") + } + } + return &feed, nil } -func (d Dynamo) UpdateFeed(feed *model.Feed) error { - log.Infof("saving feed %q", feed.HashID) +func (d Dynamo) updateLastAccess(key map[string]*dynamodb.AttributeValue) error { + now := time.Now().UTC() + // Set LastAccess field to now + // Set ExpirationTime field to now + feedTimeToLive + updateExpression, err := expr. + NewBuilder(). + WithUpdate(expr. + Set(expr.Name("LastAccess"), expr.Value(now.Unix())). + Set(expr.Name("ExpirationTime"), expr.Value(now.Add(feedTimeToLive).Unix()))). + Build() + + if err != nil { + return err + } + + updateInput := &dynamodb.UpdateItemInput{ + TableName: d.FeedsTableName, + Key: key, + UpdateExpression: updateExpression.Update(), + ExpressionAttributeNames: updateExpression.Names(), + ExpressionAttributeValues: updateExpression.Values(), + } + + _, err = d.dynamo.UpdateItem(updateInput) + if err != nil { + return err + } + + return nil +} + +func (d Dynamo) UpdateFeed(feed *model.Feed) error { var ( pubDate = feed.PubDate.Unix() updatedAt = feed.LastAccess.Unix() ) + episodesData, err := compressObj(feed.Episodes) + if err != nil { + return errors.Wrap(err, "failed to compress episodes for update") + } + update := expr. Set(expr.Name("Title"), expr.Value(feed.Title)). Set(expr.Name("Description"), expr.Value(feed.Description)). Set(expr.Name("PubDate"), expr.Value(pubDate)). Set(expr.Name("Author"), expr.Value(feed.Author)). Set(expr.Name("ItemURL"), expr.Value(feed.ItemURL)). - Set(expr.Name("Episodes"), expr.Value(feed.Episodes)). Set(expr.Name("LastID"), expr.Value(feed.LastID)). - Set(expr.Name("UpdatedAt"), expr.Value(updatedAt)) + Set(expr.Name("UpdatedAt"), expr.Value(updatedAt)). + Set(expr.Name("EpisodesData"), expr.Value(episodesData)). // Serialized episodes + Remove(expr.Name("Episodes")) // Remove old field to save space expression, err := expr.NewBuilder().WithUpdate(update).Build() if err != nil { @@ -353,8 +390,7 @@ func (d Dynamo) Downgrade(userID string, featureLevel int) ([]string, error) { NewBuilder(). WithUpdate(expr. Set(expr.Name("PageSize"), expr.Value(150)). - Set(expr.Name("FeatureLevel"), expr.Value(api.ExtendedFeatures)). - Set(expr.Name("LastID"), expr.Value(""))). + Set(expr.Name("FeatureLevel"), expr.Value(api.ExtendedFeatures))). WithCondition(expr. Name("PageSize").GreaterThan(expr.Value(150))). Build() @@ -391,8 +427,7 @@ func (d Dynamo) Downgrade(userID string, featureLevel int) ([]string, error) { Set(expr.Name("PageSize"), expr.Value(50)). Set(expr.Name("FeatureLevel"), expr.Value(api.DefaultFeatures)). Set(expr.Name("Format"), expr.Value(api.FormatVideo)). - Set(expr.Name("Quality"), expr.Value(api.QualityHigh)). - Set(expr.Name("LastID"), expr.Value(""))). + Set(expr.Name("Quality"), expr.Value(api.QualityHigh))). Build() if err != nil {