mirror of
https://github.com/mxpv/podsync.git
synced 2024-05-11 05:55:04 +00:00
Implement basic metric tracker
This commit is contained in:
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/mxpv/podsync/pkg/config"
|
||||
"github.com/mxpv/podsync/pkg/feeds"
|
||||
"github.com/mxpv/podsync/pkg/handler"
|
||||
"github.com/mxpv/podsync/pkg/stats"
|
||||
"github.com/mxpv/podsync/pkg/support"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@@ -41,6 +42,11 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
statistics, err := stats.NewRedisStats(cfg.RedisURL)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
patreon := support.NewPatreon(database)
|
||||
|
||||
// Builders
|
||||
@@ -57,6 +63,7 @@ func main() {
|
||||
|
||||
feed, err := feeds.NewFeedService(
|
||||
feeds.WithPostgres(database),
|
||||
feeds.WithStats(statistics),
|
||||
feeds.WithBuilder(api.ProviderYoutube, youtube),
|
||||
feeds.WithBuilder(api.ProviderVimeo, vimeo),
|
||||
)
|
||||
@@ -83,6 +90,7 @@ func main() {
|
||||
|
||||
srv.Shutdown(ctx)
|
||||
database.Close()
|
||||
statistics.Close()
|
||||
|
||||
log.Printf("server gracefully stopped")
|
||||
}
|
||||
|
||||
@@ -45,9 +45,10 @@ const (
|
||||
)
|
||||
|
||||
type Metadata struct {
|
||||
Provider Provider `json:"provider"`
|
||||
Format Format `json:"format"`
|
||||
Quality Quality `json:"quality"`
|
||||
Provider Provider `json:"provider"`
|
||||
Format Format `json:"format"`
|
||||
Quality Quality `json:"quality"`
|
||||
Downloads int64 `json:"downloads"`
|
||||
}
|
||||
|
||||
const (
|
||||
|
||||
@@ -17,12 +17,23 @@ const (
|
||||
maxPageSize = 150
|
||||
)
|
||||
|
||||
const (
|
||||
MetricQueries = "queries"
|
||||
MetricDownloads = "downloads"
|
||||
)
|
||||
|
||||
type stats interface {
|
||||
Inc(metric, hashID string) (int64, error)
|
||||
Get(metric, hashID string) (int64, error)
|
||||
}
|
||||
|
||||
type builder interface {
|
||||
Build(feed *model.Feed) (podcast *itunes.Podcast, err error)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
sid *shortid.Shortid
|
||||
stats stats
|
||||
db *pg.DB
|
||||
builders map[api.Provider]builder
|
||||
}
|
||||
@@ -109,7 +120,17 @@ func (s Service) BuildFeed(hashID string) (*itunes.Podcast, error) {
|
||||
return nil, errors.Wrapf(err, "failed to get builder for feed: %s", hashID)
|
||||
}
|
||||
|
||||
return builder.Build(feed)
|
||||
podcast, err := builder.Build(feed)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = s.stats.Inc(MetricQueries, feed.HashID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to update metrics for feed: %s", hashID)
|
||||
}
|
||||
|
||||
return podcast, nil
|
||||
}
|
||||
|
||||
func (s Service) GetMetadata(hashID string) (*api.Metadata, error) {
|
||||
@@ -117,17 +138,23 @@ func (s Service) GetMetadata(hashID string) (*api.Metadata, error) {
|
||||
err := s.db.
|
||||
Model(feed).
|
||||
Where("hash_id = ?", hashID).
|
||||
Column("provider", "format", "quality").
|
||||
Column("provider", "format", "quality", "user_id").
|
||||
Select()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
downloads, err := s.stats.Inc(MetricDownloads, hashID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &api.Metadata{
|
||||
Provider: feed.Provider,
|
||||
Format: feed.Format,
|
||||
Quality: feed.Quality,
|
||||
Provider: feed.Provider,
|
||||
Format: feed.Format,
|
||||
Quality: feed.Quality,
|
||||
Downloads: downloads,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -172,6 +199,13 @@ func WithBuilder(provider api.Provider, builder builder) feedOption {
|
||||
}
|
||||
}
|
||||
|
||||
//noinspection GoExportedFuncWithUnexportedType
|
||||
func WithStats(m stats) feedOption {
|
||||
return func(service *Service) {
|
||||
service.stats = m
|
||||
}
|
||||
}
|
||||
|
||||
func NewFeedService(opts ...feedOption) (*Service, error) {
|
||||
sid, err := shortid.New(1, shortid.DefaultABC, uint64(time.Now().UnixNano()))
|
||||
if err != nil {
|
||||
|
||||
@@ -10,6 +10,55 @@ import (
|
||||
reflect "reflect"
|
||||
)
|
||||
|
||||
// Mockstats is a mock of stats interface
|
||||
type Mockstats struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockstatsMockRecorder
|
||||
}
|
||||
|
||||
// MockstatsMockRecorder is the mock recorder for Mockstats
|
||||
type MockstatsMockRecorder struct {
|
||||
mock *Mockstats
|
||||
}
|
||||
|
||||
// NewMockstats creates a new mock instance
|
||||
func NewMockstats(ctrl *gomock.Controller) *Mockstats {
|
||||
mock := &Mockstats{ctrl: ctrl}
|
||||
mock.recorder = &MockstatsMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (_m *Mockstats) EXPECT() *MockstatsMockRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
// Inc mocks base method
|
||||
func (_m *Mockstats) Inc(metric string, hashID string) (int64, error) {
|
||||
ret := _m.ctrl.Call(_m, "Inc", metric, hashID)
|
||||
ret0, _ := ret[0].(int64)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Inc indicates an expected call of Inc
|
||||
func (_mr *MockstatsMockRecorder) Inc(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Inc", reflect.TypeOf((*Mockstats)(nil).Inc), arg0, arg1)
|
||||
}
|
||||
|
||||
// Get mocks base method
|
||||
func (_m *Mockstats) Get(metric string, hashID string) (int64, error) {
|
||||
ret := _m.ctrl.Call(_m, "Get", metric, hashID)
|
||||
ret0, _ := ret[0].(int64)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Get indicates an expected call of Get
|
||||
func (_mr *MockstatsMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Get", reflect.TypeOf((*Mockstats)(nil).Get), arg0, arg1)
|
||||
}
|
||||
|
||||
// Mockbuilder is a mock of builder interface
|
||||
type Mockbuilder struct {
|
||||
ctrl *gomock.Controller
|
||||
|
||||
@@ -72,9 +72,20 @@ func TestService_UpdateLastAccess(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestService_GetMetadata(t *testing.T) {
|
||||
s := Service{db: createDatabase(t)}
|
||||
_, err := s.GetMetadata(feed.HashID)
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
stats := NewMockstats(ctrl)
|
||||
stats.EXPECT().Inc(MetricDownloads, feed.HashID).Return(int64(10), nil)
|
||||
|
||||
s := Service{
|
||||
db: createDatabase(t),
|
||||
stats: stats,
|
||||
}
|
||||
|
||||
m, err := s.GetMetadata(feed.HashID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(10), m.Downloads)
|
||||
}
|
||||
|
||||
func TestService_DowngradeToAnonymous(t *testing.T) {
|
||||
|
||||
45
pkg/stats/redis.go
Normal file
45
pkg/stats/redis.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
)
|
||||
|
||||
type RedisStats struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func (r RedisStats) Inc(metric, hashID string) (int64, error) {
|
||||
key := r.makeKey(hashID)
|
||||
return r.client.HIncrBy(key, metric, 1).Result()
|
||||
}
|
||||
|
||||
func (r RedisStats) Get(metric, hashID string) (int64, error) {
|
||||
key := r.makeKey(hashID)
|
||||
return r.client.HGet(key, metric).Int64()
|
||||
}
|
||||
|
||||
func (r RedisStats) Close() error {
|
||||
return r.client.Close()
|
||||
}
|
||||
|
||||
func (r RedisStats) makeKey(hashID string) string {
|
||||
now := time.Now().UTC()
|
||||
return fmt.Sprintf("stats/%d/%d/%s", now.Year(), now.Month(), hashID)
|
||||
}
|
||||
|
||||
func NewRedisStats(redisUrl string) (*RedisStats, error) {
|
||||
opts, err := redis.ParseURL(redisUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := redis.NewClient(opts)
|
||||
if err := client.Ping().Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &RedisStats{client}, nil
|
||||
}
|
||||
41
pkg/stats/redis_test.go
Normal file
41
pkg/stats/redis_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const metric = "downloads"
|
||||
|
||||
func TestRedisStats_IncAndGet(t *testing.T) {
|
||||
t.Skip("run redis tests manually")
|
||||
|
||||
s := createRedisClient(t)
|
||||
|
||||
const hashID = "321"
|
||||
|
||||
v, err := s.Inc(metric, hashID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), v)
|
||||
|
||||
v, err = s.Inc(metric, hashID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(2), v)
|
||||
|
||||
v, err = s.Get(metric, hashID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(2), v)
|
||||
}
|
||||
|
||||
func createRedisClient(t *testing.T) *RedisStats {
|
||||
client, err := NewRedisStats("redis://localhost")
|
||||
require.NoError(t, err)
|
||||
|
||||
keys, err := client.client.Keys("*").Result()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = client.client.Del(keys...).Err()
|
||||
require.NoError(t, err)
|
||||
|
||||
return client
|
||||
}
|
||||
@@ -1,207 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/mxpv/podsync/pkg/api"
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const expiration = 24 * time.Hour * 90
|
||||
|
||||
// Backward compatible Redis storage for feeds
|
||||
type RedisStorage struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func (r *RedisStorage) parsePageSize(m map[string]string) (int, error) {
|
||||
str, ok := m["pagesize"]
|
||||
if !ok {
|
||||
return 50, nil
|
||||
}
|
||||
|
||||
size, err := strconv.ParseInt(str, 10, 32)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if size > 150 {
|
||||
return 0, errors.New("invalid page size")
|
||||
}
|
||||
|
||||
return int(size), nil
|
||||
}
|
||||
|
||||
func (r *RedisStorage) parseFormat(m map[string]string) (api.Format, api.Quality, error) {
|
||||
quality, ok := m["quality"]
|
||||
if !ok {
|
||||
return api.FormatVideo, api.QualityHigh, nil
|
||||
}
|
||||
|
||||
if quality == "VideoHigh" {
|
||||
return api.FormatVideo, api.QualityHigh, nil
|
||||
} else if quality == "VideoLow" {
|
||||
return api.FormatVideo, api.QualityLow, nil
|
||||
} else if quality == "AudioHigh" {
|
||||
return api.FormatAudio, api.QualityHigh, nil
|
||||
} else if quality == "AudioLow" {
|
||||
return api.FormatAudio, api.QualityLow, nil
|
||||
}
|
||||
|
||||
return "", "", fmt.Errorf("unsupported formmat %s", quality)
|
||||
}
|
||||
|
||||
func (r *RedisStorage) GetFeed(hashId string) (*model.Feed, error) {
|
||||
result, err := r.client.HGetAll(hashId).Result()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to query feed with id %s", hashId)
|
||||
}
|
||||
|
||||
if len(result) == 0 {
|
||||
return nil, api.ErrNotFound
|
||||
}
|
||||
|
||||
// Expire after 3 month if no use
|
||||
if err := r.client.Expire(hashId, expiration).Err(); err != nil {
|
||||
return nil, errors.Wrap(err, "failed query update feed")
|
||||
}
|
||||
|
||||
feed := &model.Feed{
|
||||
PageSize: api.DefaultPageSize,
|
||||
Quality: api.DefaultQuality,
|
||||
Format: api.DefaultFormat,
|
||||
HashID: hashId,
|
||||
LastAccess: time.Now().UTC(),
|
||||
}
|
||||
|
||||
m := make(map[string]string, len(result))
|
||||
for key, val := range result {
|
||||
m[strings.ToLower(key)] = val
|
||||
}
|
||||
|
||||
// Unpack provider and link type
|
||||
provider := m["provider"]
|
||||
linkType := m["type"]
|
||||
if strings.EqualFold(provider, "youtube") {
|
||||
feed.Provider = api.ProviderYoutube
|
||||
|
||||
if strings.EqualFold(linkType, "channel") {
|
||||
feed.LinkType = api.LinkTypeChannel
|
||||
} else if strings.EqualFold(linkType, "playlist") {
|
||||
feed.LinkType = api.LinkTypePlaylist
|
||||
} else if strings.EqualFold(linkType, "user") {
|
||||
feed.LinkType = api.LinkTypeUser
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported yt link type %s", linkType)
|
||||
}
|
||||
|
||||
} else if strings.EqualFold(provider, "vimeo") {
|
||||
feed.Provider = api.ProviderVimeo
|
||||
|
||||
if strings.EqualFold(linkType, "channel") {
|
||||
feed.LinkType = api.LinkTypeChannel
|
||||
} else if strings.EqualFold(linkType, "user") {
|
||||
feed.LinkType = api.LinkTypeUser
|
||||
} else if strings.EqualFold(linkType, "group") {
|
||||
feed.LinkType = api.LinkTypeGroup
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported vimeo link type %s", linkType)
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, errors.New("unsupported provider")
|
||||
}
|
||||
|
||||
// Unpack item id
|
||||
id, ok := m["id"]
|
||||
if !ok || id == "" {
|
||||
return nil, errors.New("failed to unpack item id")
|
||||
}
|
||||
|
||||
feed.ItemID = id
|
||||
|
||||
// Fetch user id
|
||||
patreonId, ok := m["patreonid"]
|
||||
if ok {
|
||||
feed.UserID = patreonId
|
||||
}
|
||||
|
||||
// Unpack page size
|
||||
pageSize, err := r.parsePageSize(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if patreonId == "" && pageSize > 50 {
|
||||
return nil, errors.New("wrong feed data")
|
||||
}
|
||||
|
||||
// Parse feed's format and quality
|
||||
format, quality, err := r.parseFormat(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
feed.PageSize = pageSize
|
||||
feed.Format = format
|
||||
feed.Quality = quality
|
||||
|
||||
return feed, nil
|
||||
}
|
||||
|
||||
func (r *RedisStorage) CreateFeed(feed *model.Feed) error {
|
||||
fields := map[string]interface{}{
|
||||
"provider": string(feed.Provider),
|
||||
"type": string(feed.LinkType),
|
||||
"id": feed.ItemID,
|
||||
"patreonid": feed.UserID,
|
||||
"pagesize": feed.PageSize,
|
||||
}
|
||||
|
||||
if feed.Format == api.FormatVideo {
|
||||
|
||||
if feed.Quality == api.QualityHigh {
|
||||
fields["quality"] = "VideoHigh"
|
||||
} else {
|
||||
fields["quality"] = "VideoLow"
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
if feed.Quality == api.QualityHigh {
|
||||
fields["quality"] = "AudioHigh"
|
||||
} else {
|
||||
fields["quality"] = "AudioLow"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if err := r.client.HMSet(feed.HashID, fields).Err(); err != nil {
|
||||
return errors.Wrap(err, "failed to save feed")
|
||||
}
|
||||
|
||||
return r.client.Expire(feed.HashID, expiration).Err()
|
||||
}
|
||||
|
||||
func (r *RedisStorage) keys() ([]string, error) {
|
||||
return r.client.Keys("*").Result()
|
||||
}
|
||||
|
||||
func NewRedisStorage(redisUrl string) (*RedisStorage, error) {
|
||||
opts, err := redis.ParseURL(redisUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := redis.NewClient(opts)
|
||||
if err := client.Ping().Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &RedisStorage{client}, nil
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mxpv/podsync/pkg/api"
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRedisStorage_GetFeed(t *testing.T) {
|
||||
t.Skip("run redis tests manually")
|
||||
|
||||
client := createRedisClient(t)
|
||||
|
||||
keys, err := client.keys()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, len(keys) > 0)
|
||||
|
||||
for idx, key := range keys {
|
||||
if key == "keygen" {
|
||||
continue
|
||||
}
|
||||
|
||||
feed, err := client.GetFeed(key)
|
||||
require.NoError(t, err, "feed %s (id = %d) failed", key, idx)
|
||||
require.NotNil(t, feed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisStorage_CreateFeed(t *testing.T) {
|
||||
t.Skip("run redis tests manually")
|
||||
|
||||
client := createRedisClient(t)
|
||||
|
||||
hashId := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
|
||||
|
||||
err := client.CreateFeed(&model.Feed{
|
||||
HashID: hashId,
|
||||
UserID: "321",
|
||||
Provider: api.ProviderYoutube,
|
||||
LinkType: api.LinkTypeChannel,
|
||||
ItemID: "123",
|
||||
PageSize: 45,
|
||||
Quality: api.QualityLow,
|
||||
Format: api.FormatAudio,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
feed, err := client.GetFeed(hashId)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, hashId, feed.HashID)
|
||||
require.Equal(t, "321", feed.UserID)
|
||||
require.Equal(t, api.ProviderYoutube, feed.Provider)
|
||||
require.Equal(t, api.LinkTypeChannel, feed.LinkType)
|
||||
require.Equal(t, "123", feed.ItemID)
|
||||
require.Equal(t, 45, feed.PageSize)
|
||||
require.Equal(t, api.QualityLow, feed.Quality)
|
||||
require.Equal(t, api.FormatAudio, feed.Format)
|
||||
}
|
||||
|
||||
func createRedisClient(t *testing.T) *RedisStorage {
|
||||
client, err := NewRedisStorage("redis://localhost")
|
||||
require.NoError(t, err)
|
||||
|
||||
return client
|
||||
}
|
||||
Reference in New Issue
Block a user