1
0
mirror of https://github.com/mxpv/podsync.git synced 2024-05-11 05:55:04 +00:00

Switch to Postgres storage for feeds

This commit is contained in:
Maksym Pavlenko
2017-11-03 19:16:15 -07:00
parent d79cc87ea5
commit 7cff604c26
12 changed files with 107 additions and 329 deletions

View File

@@ -18,7 +18,6 @@ import (
"github.com/mxpv/podsync/pkg/config" "github.com/mxpv/podsync/pkg/config"
"github.com/mxpv/podsync/pkg/feeds" "github.com/mxpv/podsync/pkg/feeds"
"github.com/mxpv/podsync/pkg/handler" "github.com/mxpv/podsync/pkg/handler"
"github.com/mxpv/podsync/pkg/storage"
"github.com/mxpv/podsync/pkg/support" "github.com/mxpv/podsync/pkg/support"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@@ -37,11 +36,6 @@ func main() {
panic(err) panic(err)
} }
redis, err := storage.NewRedisStorage(cfg.RedisURL)
if err != nil {
panic(err)
}
database, err := createPg(cfg.PostgresConnectionURL) database, err := createPg(cfg.PostgresConnectionURL)
if err != nil { if err != nil {
panic(err) panic(err)
@@ -62,7 +56,7 @@ func main() {
} }
feed, err := feeds.NewFeedService( feed, err := feeds.NewFeedService(
feeds.WithStorage(redis), feeds.WithPostgres(database),
feeds.WithBuilder(api.ProviderYoutube, youtube), feeds.WithBuilder(api.ProviderYoutube, youtube),
feeds.WithBuilder(api.ProviderVimeo, vimeo), feeds.WithBuilder(api.ProviderVimeo, vimeo),
) )

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/go-pg/pg"
itunes "github.com/mxpv/podcast" itunes "github.com/mxpv/podcast"
"github.com/mxpv/podsync/pkg/api" "github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/model" "github.com/mxpv/podsync/pkg/model"
@@ -15,18 +16,13 @@ const (
maxPageSize = 150 maxPageSize = 150
) )
type storageService interface {
CreateFeed(feed *model.Feed) error
GetFeed(hashId string) (*model.Feed, error)
}
type builder interface { type builder interface {
Build(feed *model.Feed) (podcast *itunes.Podcast, err error) Build(feed *model.Feed) (podcast *itunes.Podcast, err error)
} }
type Service struct { type Service struct {
sid *shortid.Shortid sid *shortid.Shortid
storage storageService db *pg.DB
builders map[api.Provider]builder builders map[api.Provider]builder
} }
@@ -42,12 +38,15 @@ func (s Service) CreateFeed(req *api.CreateFeedRequest, identity *api.Identity)
return "", fmt.Errorf("failed to get builder for URL: %s", req.URL) return "", fmt.Errorf("failed to get builder for URL: %s", req.URL)
} }
now := time.Now().UTC()
// Set default fields // Set default fields
feed.PageSize = api.DefaultPageSize feed.PageSize = api.DefaultPageSize
feed.Format = api.FormatVideo feed.Format = api.FormatVideo
feed.Quality = api.QualityHigh feed.Quality = api.QualityHigh
feed.FeatureLevel = api.DefaultFeatures feed.FeatureLevel = api.DefaultFeatures
feed.LastAccess = time.Now().UTC() feed.CreatedAt = now
feed.LastAccess = now
if identity.FeatureLevel > 0 { if identity.FeatureLevel > 0 {
feed.UserID = identity.UserId feed.UserID = identity.UserId
@@ -69,29 +68,57 @@ func (s Service) CreateFeed(req *api.CreateFeedRequest, identity *api.Identity)
feed.HashID = hashId feed.HashID = hashId
// Save to database // Save to database
if err := s.storage.CreateFeed(feed); err != nil { _, err = s.db.Model(feed).Insert()
if err != nil {
return "", errors.Wrap(err, "failed to save feed to database") return "", errors.Wrap(err, "failed to save feed to database")
} }
return hashId, nil return hashId, nil
} }
func (s Service) GetFeed(hashId string) (*itunes.Podcast, error) { func (s Service) QueryFeed(hashID string) (*model.Feed, error) {
feed, err := s.storage.GetFeed(hashId) lastAccess := time.Now().UTC()
feed := &model.Feed{}
res, err := s.db.Model(feed).
Set("last_access = ?", lastAccess).
Where("hash_id = ?", hashID).
Returning("*").
Update()
if err != nil {
return nil, errors.Wrapf(err, "failed to query feed: %s", hashID)
}
if res.RowsAffected() != 1 {
return nil, api.ErrNotFound
}
return feed, nil
}
func (s Service) BuildFeed(hashID string) (*itunes.Podcast, error) {
feed, err := s.QueryFeed(hashID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
builder, ok := s.builders[feed.Provider] builder, ok := s.builders[feed.Provider]
if !ok { if !ok {
return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashId) return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashID)
} }
return builder.Build(feed) return builder.Build(feed)
} }
func (s Service) GetMetadata(hashId string) (*api.Metadata, error) { func (s Service) GetMetadata(hashID string) (*api.Metadata, error) {
feed, err := s.storage.GetFeed(hashId) feed := &model.Feed{}
err := s.db.
Model(feed).
Where("hash_id = ?", hashID).
Column("provider", "format", "quality").
Select()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -106,9 +133,9 @@ func (s Service) GetMetadata(hashId string) (*api.Metadata, error) {
type feedOption func(*Service) type feedOption func(*Service)
//noinspection GoExportedFuncWithUnexportedType //noinspection GoExportedFuncWithUnexportedType
func WithStorage(storage storageService) feedOption { func WithPostgres(db *pg.DB) feedOption {
return func(service *Service) { return func(service *Service) {
service.storage = storage service.db = db
} }
} }

View File

@@ -10,54 +10,6 @@ import (
reflect "reflect" reflect "reflect"
) )
// MockstorageService is a mock of storageService interface
type MockstorageService struct {
ctrl *gomock.Controller
recorder *MockstorageServiceMockRecorder
}
// MockstorageServiceMockRecorder is the mock recorder for MockstorageService
type MockstorageServiceMockRecorder struct {
mock *MockstorageService
}
// NewMockstorageService creates a new mock instance
func NewMockstorageService(ctrl *gomock.Controller) *MockstorageService {
mock := &MockstorageService{ctrl: ctrl}
mock.recorder = &MockstorageServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (_m *MockstorageService) EXPECT() *MockstorageServiceMockRecorder {
return _m.recorder
}
// CreateFeed mocks base method
func (_m *MockstorageService) CreateFeed(feed *model.Feed) error {
ret := _m.ctrl.Call(_m, "CreateFeed", feed)
ret0, _ := ret[0].(error)
return ret0
}
// CreateFeed indicates an expected call of CreateFeed
func (_mr *MockstorageServiceMockRecorder) CreateFeed(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CreateFeed", reflect.TypeOf((*MockstorageService)(nil).CreateFeed), arg0)
}
// GetFeed mocks base method
func (_m *MockstorageService) GetFeed(hashId string) (*model.Feed, error) {
ret := _m.ctrl.Call(_m, "GetFeed", hashId)
ret0, _ := ret[0].(*model.Feed)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetFeed indicates an expected call of GetFeed
func (_mr *MockstorageServiceMockRecorder) GetFeed(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetFeed", reflect.TypeOf((*MockstorageService)(nil).GetFeed), arg0)
}
// Mockbuilder is a mock of builder interface // Mockbuilder is a mock of builder interface
type Mockbuilder struct { type Mockbuilder struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View File

@@ -5,6 +5,7 @@ package feeds
import ( import (
"testing" "testing"
"github.com/go-pg/pg"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/mxpv/podsync/pkg/api" "github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/model" "github.com/mxpv/podsync/pkg/model"
@@ -12,16 +13,23 @@ import (
"github.com/ventu-io/go-shortid" "github.com/ventu-io/go-shortid"
) )
var feed = &model.Feed{
HashID: "123",
ItemID: "xyz",
Provider: api.ProviderVimeo,
LinkType: api.LinkTypeChannel,
PageSize: 50,
Quality: api.QualityHigh,
Format: api.FormatVideo,
}
func TestService_CreateFeed(t *testing.T) { func TestService_CreateFeed(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
storage := NewMockstorageService(ctrl)
storage.EXPECT().CreateFeed(gomock.Any()).Times(1).Return(nil)
s := Service{ s := Service{
sid: shortid.GetDefault(), sid: shortid.GetDefault(),
storage: storage, db: createDatabase(t),
builders: map[api.Provider]builder{api.ProviderYoutube: nil}, builders: map[api.Provider]builder{api.ProviderYoutube: nil},
} }
@@ -38,34 +46,50 @@ func TestService_CreateFeed(t *testing.T) {
} }
func TestService_GetFeed(t *testing.T) { func TestService_GetFeed(t *testing.T) {
ctrl := gomock.NewController(t) s := Service{db: createDatabase(t)}
defer ctrl.Finish()
feed := &model.Feed{Provider: api.ProviderYoutube} _, err := s.BuildFeed(feed.HashID)
storage := NewMockstorageService(ctrl)
storage.EXPECT().GetFeed("123").Times(1).Return(feed, nil)
bld := NewMockbuilder(ctrl)
bld.EXPECT().Build(feed).Return(nil, nil)
s := Service{
storage: storage,
builders: map[api.Provider]builder{api.ProviderYoutube: bld},
}
_, err := s.GetFeed("123")
require.NoError(t, err) require.NoError(t, err)
} }
func TestService_WrongID(t *testing.T) {
s := Service{db: createDatabase(t)}
_, err := s.BuildFeed("invalid_feed_id")
require.Error(t, err)
}
func TestService_UpdateLastAccess(t *testing.T) {
s := Service{db: createDatabase(t)}
feed1, err := s.QueryFeed(feed.HashID)
require.NoError(t, err)
feed2, err := s.QueryFeed(feed.HashID)
require.NoError(t, err)
require.True(t, feed2.LastAccess.After(feed1.LastAccess))
}
func TestService_GetMetadata(t *testing.T) { func TestService_GetMetadata(t *testing.T) {
ctrl := gomock.NewController(t) s := Service{db: createDatabase(t)}
defer ctrl.Finish() _, err := s.GetMetadata(feed.HashID)
storage := NewMockstorageService(ctrl)
storage.EXPECT().GetFeed("123").Times(1).Return(&model.Feed{}, nil)
s := Service{storage: storage}
_, err := s.GetMetadata("123")
require.NoError(t, err) require.NoError(t, err)
} }
func createDatabase(t *testing.T) *pg.DB {
opts, err := pg.ParseURL("postgres://postgres:@localhost/podsync?sslmode=disable")
if err != nil {
require.NoError(t, err)
}
db := pg.Connect(opts)
_, err = db.Model(&model.Feed{}).Where("1=1").Delete()
require.NoError(t, err)
err = db.Insert(feed)
require.NoError(t, err)
return db
}

View File

@@ -24,7 +24,7 @@ const (
type feedService interface { type feedService interface {
CreateFeed(req *api.CreateFeedRequest, identity *api.Identity) (string, error) CreateFeed(req *api.CreateFeedRequest, identity *api.Identity) (string, error)
GetFeed(hashId string) (*itunes.Podcast, error) BuildFeed(hashID string) (*itunes.Podcast, error)
GetMetadata(hashId string) (*api.Metadata, error) GetMetadata(hashId string) (*api.Metadata, error)
} }
@@ -154,7 +154,7 @@ func (h handler) getFeed(c *gin.Context) {
hashId = strings.TrimSuffix(hashId, ".xml") hashId = strings.TrimSuffix(hashId, ".xml")
} }
podcast, err := h.feed.GetFeed(hashId) podcast, err := h.feed.BuildFeed(hashId)
if err != nil { if err != nil {
code := http.StatusInternalServerError code := http.StatusInternalServerError
if err == api.ErrNotFound { if err == api.ErrNotFound {

View File

@@ -47,17 +47,17 @@ func (_mr *MockfeedServiceMockRecorder) CreateFeed(arg0, arg1 interface{}) *gomo
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CreateFeed", reflect.TypeOf((*MockfeedService)(nil).CreateFeed), arg0, arg1) return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CreateFeed", reflect.TypeOf((*MockfeedService)(nil).CreateFeed), arg0, arg1)
} }
// GetFeed mocks base method // BuildFeed mocks base method
func (_m *MockfeedService) GetFeed(hashId string) (*podcast.Podcast, error) { func (_m *MockfeedService) BuildFeed(hashID string) (*podcast.Podcast, error) {
ret := _m.ctrl.Call(_m, "GetFeed", hashId) ret := _m.ctrl.Call(_m, "BuildFeed", hashID)
ret0, _ := ret[0].(*podcast.Podcast) ret0, _ := ret[0].(*podcast.Podcast)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// GetFeed indicates an expected call of GetFeed // BuildFeed indicates an expected call of BuildFeed
func (_mr *MockfeedServiceMockRecorder) GetFeed(arg0 interface{}) *gomock.Call { func (_mr *MockfeedServiceMockRecorder) BuildFeed(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetFeed", reflect.TypeOf((*MockfeedService)(nil).GetFeed), arg0) return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "BuildFeed", reflect.TypeOf((*MockfeedService)(nil).BuildFeed), arg0)
} }
// GetMetadata mocks base method // GetMetadata mocks base method

View File

@@ -101,7 +101,7 @@ func TestGetFeed(t *testing.T) {
podcast := itunes.New("", "", "", nil, nil) podcast := itunes.New("", "", "", nil, nil)
feed := NewMockfeedService(ctrl) feed := NewMockfeedService(ctrl)
feed.EXPECT().GetFeed("123").Return(&podcast, nil) feed.EXPECT().BuildFeed("123").Return(&podcast, nil)
srv := httptest.NewServer(New(feed, nil, cfg)) srv := httptest.NewServer(New(feed, nil, cfg))
defer srv.Close() defer srv.Close()

View File

@@ -27,5 +27,6 @@ type Feed struct {
Format api.Format Format api.Format
Quality api.Quality Quality api.Quality
FeatureLevel int FeatureLevel int
CreatedAt time.Time
LastAccess time.Time // Available features LastAccess time.Time // Available features
} }

View File

@@ -42,12 +42,13 @@ CREATE TABLE IF NOT EXISTS feeds (
hash_id VARCHAR(12) NOT NULL UNIQUE, hash_id VARCHAR(12) NOT NULL UNIQUE,
user_id VARCHAR(32) NULL, user_id VARCHAR(32) NULL,
item_id VARCHAR(32) NOT NULL CHECK (item_id <> ''), item_id VARCHAR(32) NOT NULL CHECK (item_id <> ''),
link_type link_type NOT NULL,
provider provider NOT NULL, provider provider NOT NULL,
link_type link_type NOT NULL,
page_size INT NOT NULL DEFAULT 50, page_size INT NOT NULL DEFAULT 50,
format format NOT NULL DEFAULT 'video', format format NOT NULL DEFAULT 'video',
quality quality NOT NULL DEFAULT 'high', quality quality NOT NULL DEFAULT 'high',
feature_level INT NOT NULL DEFAULT 0, feature_level INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_access TIMESTAMPTZ NOT NULL DEFAULT now() last_access TIMESTAMPTZ NOT NULL DEFAULT now()
); );

View File

@@ -1,76 +0,0 @@
package storage
import (
"log"
"net"
"strings"
"time"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
"github.com/go-pg/pg"
"github.com/mxpv/podsync/pkg/model"
"github.com/pkg/errors"
)
type PgConfig struct {
ConnectionUrl string `yaml:"connectionUrl"`
}
type PgStorage struct {
db *pg.DB
}
func (p *PgStorage) CreateFeed(feed *model.Feed) error {
feed.LastAccess = time.Now().UTC()
_, err := p.db.Model(feed).OnConflict("DO NOTHING").Insert()
if err != nil {
return errors.Wrap(err, "failed to create feed")
}
return nil
}
func (p *PgStorage) GetFeed(hashId string) (*model.Feed, error) {
lastAccess := time.Now().UTC()
feed := &model.Feed{}
_, err := p.db.Model(feed).
Set("last_access = ?", lastAccess).
Where("hash_id = ?", hashId).
Returning("*").
Update()
return feed, err
}
func NewPgStorage(config *PgConfig) (*PgStorage, error) {
opts, err := pg.ParseURL(config.ConnectionUrl)
if err != nil {
return nil, err
}
// If host format is "projection:region:host", than use Google SQL Proxy
// See https://github.com/go-pg/pg/issues/576
if strings.Count(opts.Addr, ":") == 2 {
log.Print("using GCP SQL proxy")
opts.Dialer = func(network, addr string) (net.Conn, error) {
return proxy.Dial(addr)
}
}
db := pg.Connect(opts)
// Check database connectivity
if _, err := db.ExecOne("SELECT 1"); err != nil {
db.Close()
return nil, errors.Wrap(err, "failed to check database connectivity")
}
log.Print("running update script")
if _, err := db.Exec(installScript); err != nil {
return nil, errors.Wrap(err, "failed to upgrade database structure")
}
storage := &PgStorage{db: db}
return storage, nil
}

View File

@@ -1,38 +0,0 @@
package storage
const installScript = `
BEGIN;
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'provider') THEN
CREATE TYPE provider AS ENUM ('youtube', 'vimeo');
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'link_type') THEN
CREATE TYPE link_type AS ENUM ('channel', 'playlist', 'user', 'group');
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'quality') THEN
CREATE TYPE quality AS ENUM ('high', 'low');
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'format') THEN
CREATE TYPE format AS ENUM ('audio', 'video');
END IF;
END
$$;
CREATE TABLE IF NOT EXISTS feeds (
feed_id BIGSERIAL PRIMARY KEY,
hash_id VARCHAR(12) NOT NULL CHECK (hash_id <> '') UNIQUE,
user_id VARCHAR(32) NULL,
item_id VARCHAR(32) NOT NULL CHECK (item_id <> ''),
provider provider NOT NULL,
link_type link_type NOT NULL,
page_size INT NOT NULL DEFAULT 50,
format format NOT NULL DEFAULT 'video',
quality quality NOT NULL DEFAULT 'high',
feature_level INT NOT NULL DEFAULT 0,
last_access timestamp WITHOUT TIME ZONE NOT NULL
);
COMMIT;
`

View File

@@ -1,107 +0,0 @@
package storage
import (
"testing"
"github.com/mxpv/podsync/pkg/api"
"github.com/mxpv/podsync/pkg/model"
"github.com/stretchr/testify/require"
)
func TestPgStorage_CreateFeed(t *testing.T) {
feed := &model.Feed{
HashID: "xyz",
Provider: api.ProviderYoutube,
LinkType: api.LinkTypeChannel,
ItemID: "123",
}
client := createClient(t)
err := client.CreateFeed(feed)
require.NoError(t, err)
require.True(t, feed.FeedID > 0)
}
func TestPgStorage_CreateFeedWithDuplicate(t *testing.T) {
feed := &model.Feed{
HashID: "123",
Provider: api.ProviderYoutube,
LinkType: api.LinkTypeChannel,
ItemID: "123",
}
client := createClient(t)
err := client.CreateFeed(feed)
require.NoError(t, err)
// Ensure 1 record
count, err := client.db.Model(&model.Feed{}).Count()
require.NoError(t, err)
require.Equal(t, 1, count)
// Insert duplicated feed
err = client.CreateFeed(feed)
require.NoError(t, err)
// Check no duplicates inserted
count, err = client.db.Model(&model.Feed{}).Count()
require.NoError(t, err)
require.Equal(t, 1, count)
}
func TestPgStorage_GetFeed(t *testing.T) {
feed := &model.Feed{
HashID: "xyz",
UserID: "123",
Provider: api.ProviderYoutube,
LinkType: api.LinkTypeChannel,
ItemID: "123",
}
client := createClient(t)
client.CreateFeed(feed)
out, err := client.GetFeed("xyz")
require.NoError(t, err)
require.Equal(t, feed.FeedID, out.FeedID)
}
func TestPgStorage_UpdateLastAccess(t *testing.T) {
feed := &model.Feed{
HashID: "xyz",
UserID: "123",
Provider: api.ProviderYoutube,
LinkType: api.LinkTypeChannel,
ItemID: "123",
}
client := createClient(t)
err := client.CreateFeed(feed)
require.NoError(t, err)
lastAccess := feed.LastAccess
require.True(t, lastAccess.Unix() > 0)
last, err := client.GetFeed("xyz")
require.NoError(t, err)
require.NotEmpty(t, last.HashID)
require.NotEmpty(t, last.UserID)
require.NotEmpty(t, last.Provider)
require.NotEmpty(t, last.LinkType)
require.NotEmpty(t, last.ItemID)
require.True(t, last.LastAccess.UnixNano() > lastAccess.UnixNano())
}
const TestDatabaseConnectionUrl = "postgres://postgres:@localhost/podsync?sslmode=disable"
func createClient(t *testing.T) *PgStorage {
pg, err := NewPgStorage(&PgConfig{ConnectionUrl: TestDatabaseConnectionUrl})
require.NoError(t, err)
_, err = pg.db.Model(&model.Feed{}).Where("1=1").Delete()
require.NoError(t, err)
return pg
}