From 7cff604c268c61de5c26ef9210e7f9a4311a1fb3 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Fri, 3 Nov 2017 19:16:15 -0700 Subject: [PATCH] Switch to Postgres storage for feeds --- cmd/app/main.go | 8 +-- pkg/feeds/feeds.go | 57 +++++++++++----- pkg/feeds/feeds_mock_test.go | 48 -------------- pkg/feeds/feeds_test.go | 80 +++++++++++++++-------- pkg/handler/handler.go | 4 +- pkg/handler/handler_mock_test.go | 12 ++-- pkg/handler/handler_test.go | 2 +- pkg/model/models.go | 1 + pkg/model/pg.sql | 3 +- pkg/storage/pg.go | 76 ---------------------- pkg/storage/pg_sql.go | 38 ----------- pkg/storage/pg_test.go | 107 ------------------------------- 12 files changed, 107 insertions(+), 329 deletions(-) delete mode 100644 pkg/storage/pg.go delete mode 100644 pkg/storage/pg_sql.go delete mode 100644 pkg/storage/pg_test.go diff --git a/cmd/app/main.go b/cmd/app/main.go index ea477ec..a2da4df 100644 --- a/cmd/app/main.go +++ b/cmd/app/main.go @@ -18,7 +18,6 @@ import ( "github.com/mxpv/podsync/pkg/config" "github.com/mxpv/podsync/pkg/feeds" "github.com/mxpv/podsync/pkg/handler" - "github.com/mxpv/podsync/pkg/storage" "github.com/mxpv/podsync/pkg/support" "github.com/pkg/errors" ) @@ -37,11 +36,6 @@ func main() { panic(err) } - redis, err := storage.NewRedisStorage(cfg.RedisURL) - if err != nil { - panic(err) - } - database, err := createPg(cfg.PostgresConnectionURL) if err != nil { panic(err) @@ -62,7 +56,7 @@ func main() { } feed, err := feeds.NewFeedService( - feeds.WithStorage(redis), + feeds.WithPostgres(database), feeds.WithBuilder(api.ProviderYoutube, youtube), feeds.WithBuilder(api.ProviderVimeo, vimeo), ) diff --git a/pkg/feeds/feeds.go b/pkg/feeds/feeds.go index 150c48a..c400617 100644 --- a/pkg/feeds/feeds.go +++ b/pkg/feeds/feeds.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/go-pg/pg" itunes "github.com/mxpv/podcast" "github.com/mxpv/podsync/pkg/api" "github.com/mxpv/podsync/pkg/model" @@ -15,18 +16,13 @@ const ( maxPageSize = 150 ) -type storageService interface { - CreateFeed(feed *model.Feed) error - GetFeed(hashId string) (*model.Feed, error) -} - type builder interface { Build(feed *model.Feed) (podcast *itunes.Podcast, err error) } type Service struct { sid *shortid.Shortid - storage storageService + db *pg.DB builders map[api.Provider]builder } @@ -42,12 +38,15 @@ func (s Service) CreateFeed(req *api.CreateFeedRequest, identity *api.Identity) return "", fmt.Errorf("failed to get builder for URL: %s", req.URL) } + now := time.Now().UTC() + // Set default fields feed.PageSize = api.DefaultPageSize feed.Format = api.FormatVideo feed.Quality = api.QualityHigh feed.FeatureLevel = api.DefaultFeatures - feed.LastAccess = time.Now().UTC() + feed.CreatedAt = now + feed.LastAccess = now if identity.FeatureLevel > 0 { feed.UserID = identity.UserId @@ -69,29 +68,57 @@ func (s Service) CreateFeed(req *api.CreateFeedRequest, identity *api.Identity) feed.HashID = hashId // Save to database - if err := s.storage.CreateFeed(feed); err != nil { + _, err = s.db.Model(feed).Insert() + if err != nil { return "", errors.Wrap(err, "failed to save feed to database") } return hashId, nil } -func (s Service) GetFeed(hashId string) (*itunes.Podcast, error) { - feed, err := s.storage.GetFeed(hashId) +func (s Service) QueryFeed(hashID string) (*model.Feed, error) { + lastAccess := time.Now().UTC() + + feed := &model.Feed{} + res, err := s.db.Model(feed). + Set("last_access = ?", lastAccess). + Where("hash_id = ?", hashID). + Returning("*"). + Update() + + if err != nil { + return nil, errors.Wrapf(err, "failed to query feed: %s", hashID) + } + + if res.RowsAffected() != 1 { + return nil, api.ErrNotFound + } + + return feed, nil +} + +func (s Service) BuildFeed(hashID string) (*itunes.Podcast, error) { + feed, err := s.QueryFeed(hashID) if err != nil { return nil, err } builder, ok := s.builders[feed.Provider] if !ok { - return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashId) + return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashID) } return builder.Build(feed) } -func (s Service) GetMetadata(hashId string) (*api.Metadata, error) { - feed, err := s.storage.GetFeed(hashId) +func (s Service) GetMetadata(hashID string) (*api.Metadata, error) { + feed := &model.Feed{} + err := s.db. + Model(feed). + Where("hash_id = ?", hashID). + Column("provider", "format", "quality"). + Select() + if err != nil { return nil, err } @@ -106,9 +133,9 @@ func (s Service) GetMetadata(hashId string) (*api.Metadata, error) { type feedOption func(*Service) //noinspection GoExportedFuncWithUnexportedType -func WithStorage(storage storageService) feedOption { +func WithPostgres(db *pg.DB) feedOption { return func(service *Service) { - service.storage = storage + service.db = db } } diff --git a/pkg/feeds/feeds_mock_test.go b/pkg/feeds/feeds_mock_test.go index e1357b2..7157489 100644 --- a/pkg/feeds/feeds_mock_test.go +++ b/pkg/feeds/feeds_mock_test.go @@ -10,54 +10,6 @@ import ( reflect "reflect" ) -// MockstorageService is a mock of storageService interface -type MockstorageService struct { - ctrl *gomock.Controller - recorder *MockstorageServiceMockRecorder -} - -// MockstorageServiceMockRecorder is the mock recorder for MockstorageService -type MockstorageServiceMockRecorder struct { - mock *MockstorageService -} - -// NewMockstorageService creates a new mock instance -func NewMockstorageService(ctrl *gomock.Controller) *MockstorageService { - mock := &MockstorageService{ctrl: ctrl} - mock.recorder = &MockstorageServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (_m *MockstorageService) EXPECT() *MockstorageServiceMockRecorder { - return _m.recorder -} - -// CreateFeed mocks base method -func (_m *MockstorageService) CreateFeed(feed *model.Feed) error { - ret := _m.ctrl.Call(_m, "CreateFeed", feed) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateFeed indicates an expected call of CreateFeed -func (_mr *MockstorageServiceMockRecorder) CreateFeed(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CreateFeed", reflect.TypeOf((*MockstorageService)(nil).CreateFeed), arg0) -} - -// GetFeed mocks base method -func (_m *MockstorageService) GetFeed(hashId string) (*model.Feed, error) { - ret := _m.ctrl.Call(_m, "GetFeed", hashId) - ret0, _ := ret[0].(*model.Feed) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetFeed indicates an expected call of GetFeed -func (_mr *MockstorageServiceMockRecorder) GetFeed(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetFeed", reflect.TypeOf((*MockstorageService)(nil).GetFeed), arg0) -} - // Mockbuilder is a mock of builder interface type Mockbuilder struct { ctrl *gomock.Controller diff --git a/pkg/feeds/feeds_test.go b/pkg/feeds/feeds_test.go index f0aad7e..83e66d8 100644 --- a/pkg/feeds/feeds_test.go +++ b/pkg/feeds/feeds_test.go @@ -5,6 +5,7 @@ package feeds import ( "testing" + "github.com/go-pg/pg" "github.com/golang/mock/gomock" "github.com/mxpv/podsync/pkg/api" "github.com/mxpv/podsync/pkg/model" @@ -12,16 +13,23 @@ import ( "github.com/ventu-io/go-shortid" ) +var feed = &model.Feed{ + HashID: "123", + ItemID: "xyz", + Provider: api.ProviderVimeo, + LinkType: api.LinkTypeChannel, + PageSize: 50, + Quality: api.QualityHigh, + Format: api.FormatVideo, +} + func TestService_CreateFeed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - storage := NewMockstorageService(ctrl) - storage.EXPECT().CreateFeed(gomock.Any()).Times(1).Return(nil) - s := Service{ sid: shortid.GetDefault(), - storage: storage, + db: createDatabase(t), builders: map[api.Provider]builder{api.ProviderYoutube: nil}, } @@ -38,34 +46,50 @@ func TestService_CreateFeed(t *testing.T) { } func TestService_GetFeed(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() + s := Service{db: createDatabase(t)} - feed := &model.Feed{Provider: api.ProviderYoutube} - - storage := NewMockstorageService(ctrl) - storage.EXPECT().GetFeed("123").Times(1).Return(feed, nil) - - bld := NewMockbuilder(ctrl) - bld.EXPECT().Build(feed).Return(nil, nil) - - s := Service{ - storage: storage, - builders: map[api.Provider]builder{api.ProviderYoutube: bld}, - } - - _, err := s.GetFeed("123") + _, err := s.BuildFeed(feed.HashID) require.NoError(t, err) } +func TestService_WrongID(t *testing.T) { + s := Service{db: createDatabase(t)} + + _, err := s.BuildFeed("invalid_feed_id") + require.Error(t, err) +} + +func TestService_UpdateLastAccess(t *testing.T) { + s := Service{db: createDatabase(t)} + + feed1, err := s.QueryFeed(feed.HashID) + require.NoError(t, err) + + feed2, err := s.QueryFeed(feed.HashID) + require.NoError(t, err) + + require.True(t, feed2.LastAccess.After(feed1.LastAccess)) +} + func TestService_GetMetadata(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - storage := NewMockstorageService(ctrl) - storage.EXPECT().GetFeed("123").Times(1).Return(&model.Feed{}, nil) - - s := Service{storage: storage} - _, err := s.GetMetadata("123") + s := Service{db: createDatabase(t)} + _, err := s.GetMetadata(feed.HashID) require.NoError(t, err) } + +func createDatabase(t *testing.T) *pg.DB { + opts, err := pg.ParseURL("postgres://postgres:@localhost/podsync?sslmode=disable") + if err != nil { + require.NoError(t, err) + } + + db := pg.Connect(opts) + + _, err = db.Model(&model.Feed{}).Where("1=1").Delete() + require.NoError(t, err) + + err = db.Insert(feed) + require.NoError(t, err) + + return db +} diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index d379cfd..fdba847 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -24,7 +24,7 @@ const ( type feedService interface { CreateFeed(req *api.CreateFeedRequest, identity *api.Identity) (string, error) - GetFeed(hashId string) (*itunes.Podcast, error) + BuildFeed(hashID string) (*itunes.Podcast, error) GetMetadata(hashId string) (*api.Metadata, error) } @@ -154,7 +154,7 @@ func (h handler) getFeed(c *gin.Context) { hashId = strings.TrimSuffix(hashId, ".xml") } - podcast, err := h.feed.GetFeed(hashId) + podcast, err := h.feed.BuildFeed(hashId) if err != nil { code := http.StatusInternalServerError if err == api.ErrNotFound { diff --git a/pkg/handler/handler_mock_test.go b/pkg/handler/handler_mock_test.go index 41e4013..f5dd5e8 100644 --- a/pkg/handler/handler_mock_test.go +++ b/pkg/handler/handler_mock_test.go @@ -47,17 +47,17 @@ func (_mr *MockfeedServiceMockRecorder) CreateFeed(arg0, arg1 interface{}) *gomo return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CreateFeed", reflect.TypeOf((*MockfeedService)(nil).CreateFeed), arg0, arg1) } -// GetFeed mocks base method -func (_m *MockfeedService) GetFeed(hashId string) (*podcast.Podcast, error) { - ret := _m.ctrl.Call(_m, "GetFeed", hashId) +// BuildFeed mocks base method +func (_m *MockfeedService) BuildFeed(hashID string) (*podcast.Podcast, error) { + ret := _m.ctrl.Call(_m, "BuildFeed", hashID) ret0, _ := ret[0].(*podcast.Podcast) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetFeed indicates an expected call of GetFeed -func (_mr *MockfeedServiceMockRecorder) GetFeed(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetFeed", reflect.TypeOf((*MockfeedService)(nil).GetFeed), arg0) +// BuildFeed indicates an expected call of BuildFeed +func (_mr *MockfeedServiceMockRecorder) BuildFeed(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "BuildFeed", reflect.TypeOf((*MockfeedService)(nil).BuildFeed), arg0) } // GetMetadata mocks base method diff --git a/pkg/handler/handler_test.go b/pkg/handler/handler_test.go index 367e334..68a5bf9 100644 --- a/pkg/handler/handler_test.go +++ b/pkg/handler/handler_test.go @@ -101,7 +101,7 @@ func TestGetFeed(t *testing.T) { podcast := itunes.New("", "", "", nil, nil) feed := NewMockfeedService(ctrl) - feed.EXPECT().GetFeed("123").Return(&podcast, nil) + feed.EXPECT().BuildFeed("123").Return(&podcast, nil) srv := httptest.NewServer(New(feed, nil, cfg)) defer srv.Close() diff --git a/pkg/model/models.go b/pkg/model/models.go index 3318591..48bc0e5 100644 --- a/pkg/model/models.go +++ b/pkg/model/models.go @@ -27,5 +27,6 @@ type Feed struct { Format api.Format Quality api.Quality FeatureLevel int + CreatedAt time.Time LastAccess time.Time // Available features } diff --git a/pkg/model/pg.sql b/pkg/model/pg.sql index dc53aa2..ad71ad3 100644 --- a/pkg/model/pg.sql +++ b/pkg/model/pg.sql @@ -42,12 +42,13 @@ CREATE TABLE IF NOT EXISTS feeds ( hash_id VARCHAR(12) NOT NULL UNIQUE, user_id VARCHAR(32) NULL, item_id VARCHAR(32) NOT NULL CHECK (item_id <> ''), - link_type link_type NOT NULL, provider provider NOT NULL, + link_type link_type NOT NULL, page_size INT NOT NULL DEFAULT 50, format format NOT NULL DEFAULT 'video', quality quality NOT NULL DEFAULT 'high', feature_level INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_access TIMESTAMPTZ NOT NULL DEFAULT now() ); diff --git a/pkg/storage/pg.go b/pkg/storage/pg.go deleted file mode 100644 index a558f71..0000000 --- a/pkg/storage/pg.go +++ /dev/null @@ -1,76 +0,0 @@ -package storage - -import ( - "log" - "net" - "strings" - "time" - - "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy" - "github.com/go-pg/pg" - "github.com/mxpv/podsync/pkg/model" - "github.com/pkg/errors" -) - -type PgConfig struct { - ConnectionUrl string `yaml:"connectionUrl"` -} - -type PgStorage struct { - db *pg.DB -} - -func (p *PgStorage) CreateFeed(feed *model.Feed) error { - feed.LastAccess = time.Now().UTC() - _, err := p.db.Model(feed).OnConflict("DO NOTHING").Insert() - if err != nil { - return errors.Wrap(err, "failed to create feed") - } - - return nil -} - -func (p *PgStorage) GetFeed(hashId string) (*model.Feed, error) { - lastAccess := time.Now().UTC() - - feed := &model.Feed{} - _, err := p.db.Model(feed). - Set("last_access = ?", lastAccess). - Where("hash_id = ?", hashId). - Returning("*"). - Update() - - return feed, err -} - -func NewPgStorage(config *PgConfig) (*PgStorage, error) { - opts, err := pg.ParseURL(config.ConnectionUrl) - if err != nil { - return nil, err - } - - // If host format is "projection:region:host", than use Google SQL Proxy - // See https://github.com/go-pg/pg/issues/576 - if strings.Count(opts.Addr, ":") == 2 { - log.Print("using GCP SQL proxy") - opts.Dialer = func(network, addr string) (net.Conn, error) { - return proxy.Dial(addr) - } - } - - db := pg.Connect(opts) - - // Check database connectivity - if _, err := db.ExecOne("SELECT 1"); err != nil { - db.Close() - return nil, errors.Wrap(err, "failed to check database connectivity") - } - - log.Print("running update script") - if _, err := db.Exec(installScript); err != nil { - return nil, errors.Wrap(err, "failed to upgrade database structure") - } - - storage := &PgStorage{db: db} - return storage, nil -} diff --git a/pkg/storage/pg_sql.go b/pkg/storage/pg_sql.go deleted file mode 100644 index d082aee..0000000 --- a/pkg/storage/pg_sql.go +++ /dev/null @@ -1,38 +0,0 @@ -package storage - -const installScript = ` -BEGIN; - -DO $$ -BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'provider') THEN - CREATE TYPE provider AS ENUM ('youtube', 'vimeo'); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'link_type') THEN - CREATE TYPE link_type AS ENUM ('channel', 'playlist', 'user', 'group'); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'quality') THEN - CREATE TYPE quality AS ENUM ('high', 'low'); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'format') THEN - CREATE TYPE format AS ENUM ('audio', 'video'); - END IF; -END -$$; - -CREATE TABLE IF NOT EXISTS feeds ( - feed_id BIGSERIAL PRIMARY KEY, - hash_id VARCHAR(12) NOT NULL CHECK (hash_id <> '') UNIQUE, - user_id VARCHAR(32) NULL, - item_id VARCHAR(32) NOT NULL CHECK (item_id <> ''), - provider provider NOT NULL, - link_type link_type NOT NULL, - page_size INT NOT NULL DEFAULT 50, - format format NOT NULL DEFAULT 'video', - quality quality NOT NULL DEFAULT 'high', - feature_level INT NOT NULL DEFAULT 0, - last_access timestamp WITHOUT TIME ZONE NOT NULL -); - -COMMIT; -` diff --git a/pkg/storage/pg_test.go b/pkg/storage/pg_test.go deleted file mode 100644 index 97858db..0000000 --- a/pkg/storage/pg_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package storage - -import ( - "testing" - - "github.com/mxpv/podsync/pkg/api" - "github.com/mxpv/podsync/pkg/model" - "github.com/stretchr/testify/require" -) - -func TestPgStorage_CreateFeed(t *testing.T) { - feed := &model.Feed{ - HashID: "xyz", - Provider: api.ProviderYoutube, - LinkType: api.LinkTypeChannel, - ItemID: "123", - } - - client := createClient(t) - err := client.CreateFeed(feed) - require.NoError(t, err) - require.True(t, feed.FeedID > 0) -} - -func TestPgStorage_CreateFeedWithDuplicate(t *testing.T) { - feed := &model.Feed{ - HashID: "123", - Provider: api.ProviderYoutube, - LinkType: api.LinkTypeChannel, - ItemID: "123", - } - - client := createClient(t) - err := client.CreateFeed(feed) - require.NoError(t, err) - - // Ensure 1 record - count, err := client.db.Model(&model.Feed{}).Count() - require.NoError(t, err) - require.Equal(t, 1, count) - - // Insert duplicated feed - err = client.CreateFeed(feed) - require.NoError(t, err) - - // Check no duplicates inserted - count, err = client.db.Model(&model.Feed{}).Count() - require.NoError(t, err) - require.Equal(t, 1, count) -} - -func TestPgStorage_GetFeed(t *testing.T) { - feed := &model.Feed{ - HashID: "xyz", - UserID: "123", - Provider: api.ProviderYoutube, - LinkType: api.LinkTypeChannel, - ItemID: "123", - } - - client := createClient(t) - client.CreateFeed(feed) - - out, err := client.GetFeed("xyz") - require.NoError(t, err) - require.Equal(t, feed.FeedID, out.FeedID) -} - -func TestPgStorage_UpdateLastAccess(t *testing.T) { - feed := &model.Feed{ - HashID: "xyz", - UserID: "123", - Provider: api.ProviderYoutube, - LinkType: api.LinkTypeChannel, - ItemID: "123", - } - - client := createClient(t) - err := client.CreateFeed(feed) - require.NoError(t, err) - - lastAccess := feed.LastAccess - require.True(t, lastAccess.Unix() > 0) - - last, err := client.GetFeed("xyz") - require.NoError(t, err) - - require.NotEmpty(t, last.HashID) - require.NotEmpty(t, last.UserID) - require.NotEmpty(t, last.Provider) - require.NotEmpty(t, last.LinkType) - require.NotEmpty(t, last.ItemID) - - require.True(t, last.LastAccess.UnixNano() > lastAccess.UnixNano()) -} - -const TestDatabaseConnectionUrl = "postgres://postgres:@localhost/podsync?sslmode=disable" - -func createClient(t *testing.T) *PgStorage { - pg, err := NewPgStorage(&PgConfig{ConnectionUrl: TestDatabaseConnectionUrl}) - require.NoError(t, err) - - _, err = pg.db.Model(&model.Feed{}).Where("1=1").Delete() - require.NoError(t, err) - - return pg -}