mirror of
https://github.com/mxpv/podsync.git
synced 2024-05-11 05:55:04 +00:00
Implement Badger storage
This commit is contained in:
committed by
Maksym Pavlenko
parent
81dae47c7f
commit
f2a8638a4f
9
go.mod
9
go.mod
@ -3,19 +3,18 @@ module github.com/mxpv/podsync
|
||||
require (
|
||||
github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec
|
||||
github.com/BurntSushi/toml v0.3.1
|
||||
github.com/dgraph-io/badger v1.6.0
|
||||
github.com/eduncan911/podcast v1.3.0 // indirect
|
||||
github.com/golang/protobuf v1.2.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.0.0
|
||||
github.com/jessevdk/go-flags v1.4.0
|
||||
github.com/mxpv/podcast v0.0.0-20170823220358-fe328ad87d18
|
||||
github.com/pkg/errors v0.8.0
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c
|
||||
github.com/sirupsen/logrus v1.2.0
|
||||
github.com/stretchr/testify v1.2.2
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a
|
||||
github.com/stretchr/testify v1.3.0
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
|
||||
golang.org/x/oauth2 v0.0.0-20180620175406-ef147856a6dd
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
|
||||
golang.org/x/sys v0.0.0-20190311152110-c8c8c57fd1e1 // indirect
|
||||
google.golang.org/api v0.0.0-20180718221112-efcb5f25ac56
|
||||
google.golang.org/appengine v1.1.0 // indirect
|
||||
)
|
||||
|
53
go.sum
53
go.sum
@ -1,51 +1,88 @@
|
||||
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4=
|
||||
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
||||
github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec h1:1VPruZMM1WQC7POhjxbZOWK564cuFz1hlpwYW6ocM4E=
|
||||
github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec/go.mod h1:+hWo/MWgY8VtjZvdrYM2nPRMaK40zX2iPsH/qD0+Xs0=
|
||||
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
|
||||
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
|
||||
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
|
||||
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/eduncan911/podcast v1.3.0 h1:lVCar1J39mMNWR2SbGzPjeUbCKEkQ6/pt/7beQqK6fk=
|
||||
github.com/eduncan911/podcast v1.3.0/go.mod h1:C7Q04QZtv7LW/1X67mc1zwsktpZ68kbxsUS3CYWniJg=
|
||||
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
|
||||
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
|
||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mxpv/podcast v0.0.0-20170823220358-fe328ad87d18 h1:YYsu49Y42JA+CSs9+z2MGBdGxb5jklpagLp5QPJ6BwQ=
|
||||
github.com/mxpv/podcast v0.0.0-20170823220358-fe328ad87d18/go.mod h1:bKrqwMF8O3PciTG92w0992h/d7Aj7CuIF5uTNEl3pNY=
|
||||
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||
github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c h1:KhHx/Ta3c9C1gcSo5UhDeo/D4JnhnxJTrlcOEOFiMfY=
|
||||
github.com/silentsokolov/go-vimeo v0.0.0-20190116124215-06829264260c/go.mod h1:10FeaKUMy5t3KLsYfy54dFrq0rpwcfyKkKcF7vRGIRY=
|
||||
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
|
||||
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
|
||||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/oauth2 v0.0.0-20180620175406-ef147856a6dd h1:QQhib242ErYDSMitlBm8V7wYCm/1a25hV8qMadIKLPA=
|
||||
golang.org/x/oauth2 v0.0.0-20180620175406-ef147856a6dd/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190311152110-c8c8c57fd1e1 h1:FQNj2xvjQ1lgFyzbSybGZr792Y8Dy95D7uuqnZAzNaA=
|
||||
golang.org/x/sys v0.0.0-20190311152110-c8c8c57fd1e1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k=
|
||||
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
google.golang.org/api v0.0.0-20180718221112-efcb5f25ac56 h1:iDRbkenn0VZEo05mHiCtN6/EfbZj7x1Rg+tPjB5HiQc=
|
||||
google.golang.org/api v0.0.0-20180718221112-efcb5f25ac56/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
|
||||
google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@ -36,7 +36,7 @@ type Episode struct {
|
||||
}
|
||||
|
||||
type Feed struct {
|
||||
FeedID string `json:"feed_id"`
|
||||
ID string `json:"feed_id"`
|
||||
ItemID string `json:"item_id"`
|
||||
LinkType link.Type `json:"link_type"` // Either group, channel or user
|
||||
Provider link.Provider `json:"provider"` // Youtube or Vimeo
|
||||
@ -57,3 +57,18 @@ type Feed struct {
|
||||
Episodes []*Episode `json:"-"` // Array of episodes, serialized as gziped EpisodesData in DynamoDB
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
type EpisodeStatus string
|
||||
|
||||
const (
|
||||
EpisodeNew = EpisodeStatus("new") // New episode received via API
|
||||
EpisodeDownloaded = EpisodeStatus("downloaded") // Downloaded, encoded and available for download
|
||||
EpisodeCleaned = EpisodeStatus("cleaned") // Downloaded and later removed from disk due to update strategy
|
||||
)
|
||||
|
||||
type File struct {
|
||||
EpisodeID string `json:"episode_id"`
|
||||
FeedID string `json:"feed_id"`
|
||||
Size int64 `json:"size"` // Size on disk after encoding
|
||||
Status EpisodeStatus `json:"status"`
|
||||
}
|
||||
|
259
pkg/storage/badger.go
Normal file
259
pkg/storage/badger.go
Normal file
@ -0,0 +1,259 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/dgraph-io/badger"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
)
|
||||
|
||||
const (
|
||||
versionPath = "podsync/version"
|
||||
feedPrefix = "feed/"
|
||||
feedPath = "feed/%s"
|
||||
episodePrefix = "episode/%s/"
|
||||
episodePath = "episode/%s/%s" // FeedID + EpisodeID
|
||||
filePrefix = "file/%s/"
|
||||
filePath = "file/%s/%s" // FeedID + EpisodeID
|
||||
)
|
||||
|
||||
type Badger struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
var _ Storage = (*Badger)(nil)
|
||||
|
||||
func NewBadger(dir string) (*Badger, error) {
|
||||
log.Infof("opening database %q", dir)
|
||||
|
||||
opts := badger.DefaultOptions(dir)
|
||||
opts.Logger = log.New()
|
||||
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to open database")
|
||||
}
|
||||
|
||||
storage := &Badger{db: db}
|
||||
|
||||
if err := db.Update(func(txn *badger.Txn) error {
|
||||
return storage.setObj(txn, []byte(versionPath), CurrentVersion, false)
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to read database version")
|
||||
}
|
||||
|
||||
return &Badger{db: db}, nil
|
||||
}
|
||||
|
||||
func (b *Badger) Close() error {
|
||||
return b.db.Close()
|
||||
}
|
||||
|
||||
func (b *Badger) Version() (int, error) {
|
||||
var (
|
||||
version = -1
|
||||
)
|
||||
|
||||
err := b.db.View(func(txn *badger.Txn) error {
|
||||
return b.getObj(txn, []byte(versionPath), &version)
|
||||
})
|
||||
|
||||
return version, err
|
||||
}
|
||||
|
||||
func (b *Badger) AddFeed(_ context.Context, feed *model.Feed) error {
|
||||
return b.db.Update(func(txn *badger.Txn) error {
|
||||
// Insert or update feed info
|
||||
feedKey := b.getKey(feedPath, feed.ID)
|
||||
if err := b.setObj(txn, feedKey, feed, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Append new episodes
|
||||
for _, episode := range feed.Episodes {
|
||||
episodeKey := b.getKey(episodePath, feed.ID, episode.ID)
|
||||
err := b.setObj(txn, episodeKey, episode, false)
|
||||
if err == nil || err == ErrAlreadyExists {
|
||||
// Do nothing
|
||||
} else {
|
||||
return errors.Wrapf(err, "failed to save episode %q", feed.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// Update download file statuses
|
||||
for _, episode := range feed.Episodes {
|
||||
fileKey := b.getKey(filePath, feed.ID, episode.ID)
|
||||
file := &model.File{
|
||||
EpisodeID: episode.ID,
|
||||
FeedID: feed.ID,
|
||||
Size: episode.Size, // Use estimated file size
|
||||
Status: model.EpisodeNew,
|
||||
}
|
||||
|
||||
err := b.setObj(txn, fileKey, file, false)
|
||||
if err != nil && err != ErrAlreadyExists {
|
||||
return errors.Wrapf(err, "failed to set %q status for %q", model.EpisodeNew, episode.ID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Badger) WalkFeeds(_ context.Context, cb func(feed *model.Feed) error) error {
|
||||
return b.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = b.getKey(feedPrefix)
|
||||
opts.PrefetchValues = true
|
||||
return b.iterator(txn, opts, func(item *badger.Item) error {
|
||||
feed := &model.Feed{}
|
||||
if err := b.unmarshalObj(item, feed); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cb(feed)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Badger) DeleteFeed(_ context.Context, feedID string) error {
|
||||
return b.db.Update(func(txn *badger.Txn) error {
|
||||
// Feed
|
||||
feedKey := b.getKey(feedPath, feedID)
|
||||
if err := txn.Delete(feedKey); err != nil {
|
||||
return errors.Wrapf(err, "failed to delete feed %q", feedID)
|
||||
}
|
||||
|
||||
// Episodes
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = b.getKey(episodePrefix, feedID)
|
||||
opts.PrefetchValues = false
|
||||
if err := b.iterator(txn, opts, func(item *badger.Item) error {
|
||||
return txn.Delete(item.KeyCopy(nil))
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "failed to iterate episodes for feed %q", feedID)
|
||||
}
|
||||
|
||||
// Files
|
||||
opts = badger.DefaultIteratorOptions
|
||||
opts.Prefix = b.getKey(filePrefix, feedID)
|
||||
opts.PrefetchValues = false
|
||||
if err := b.iterator(txn, opts, func(item *badger.Item) error {
|
||||
return txn.Delete(item.KeyCopy(nil))
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "failed to iterate files for feed %q", feedID)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Badger) WalkFiles(_ context.Context, feedID string, cb func(file *model.File) error) error {
|
||||
return b.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = b.getKey(filePrefix, feedID)
|
||||
opts.PrefetchValues = true
|
||||
return b.iterator(txn, opts, func(item *badger.Item) error {
|
||||
file := &model.File{}
|
||||
if err := b.unmarshalObj(item, file); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cb(file)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Badger) UpdateFile(file *model.File, cb func() error) error {
|
||||
return b.db.Update(func(txn *badger.Txn) error {
|
||||
fileKey := b.getKey(filePath, file.FeedID, file.EpisodeID)
|
||||
|
||||
fileOld := &model.File{}
|
||||
if err := b.getObj(txn, fileKey, fileOld); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if file.Size > 0 {
|
||||
fileOld.Size = file.Size
|
||||
}
|
||||
|
||||
fileOld.Status = file.Status
|
||||
|
||||
if err := b.setObj(txn, fileKey, fileOld, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cb != nil {
|
||||
return cb()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Badger) iterator(txn *badger.Txn, opts badger.IteratorOptions, callback func(item *badger.Item) error) error {
|
||||
iter := txn.NewIterator(opts)
|
||||
defer iter.Close()
|
||||
|
||||
for iter.Rewind(); iter.Valid(); iter.Next() {
|
||||
item := iter.Item()
|
||||
|
||||
if err := callback(item); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Badger) getKey(format string, a ...interface{}) []byte {
|
||||
resourcePath := fmt.Sprintf(format, a...)
|
||||
fullPath := fmt.Sprintf("podsync/v%d/%s", CurrentVersion, resourcePath)
|
||||
|
||||
return []byte(fullPath)
|
||||
}
|
||||
|
||||
func (b *Badger) setObj(txn *badger.Txn, key []byte, obj interface{}, overwrite bool) error {
|
||||
if !overwrite {
|
||||
// Overwrites are not allowed, make sure there is no object with the given key
|
||||
_, err := txn.Get(key)
|
||||
if err == nil {
|
||||
return ErrAlreadyExists
|
||||
} else if err == badger.ErrKeyNotFound {
|
||||
// Key not found, do nothing
|
||||
} else {
|
||||
return errors.Wrap(err, "failed to check whether key exists")
|
||||
}
|
||||
}
|
||||
|
||||
data, err := b.marshalObj(obj)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to serialize object for key %q", key)
|
||||
}
|
||||
|
||||
return txn.Set(key, data)
|
||||
}
|
||||
|
||||
func (b *Badger) getObj(txn *badger.Txn, key []byte, out interface{}) error {
|
||||
item, err := txn.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.unmarshalObj(item, out)
|
||||
}
|
||||
|
||||
func (b *Badger) marshalObj(obj interface{}) ([]byte, error) {
|
||||
return json.Marshal(obj)
|
||||
}
|
||||
|
||||
func (b *Badger) unmarshalObj(item *badger.Item, out interface{}) error {
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, out)
|
||||
})
|
||||
}
|
230
pkg/storage/badger_test.go
Normal file
230
pkg/storage/badger_test.go
Normal file
@ -0,0 +1,230 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/mxpv/podsync/pkg/link"
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
)
|
||||
|
||||
var testCtx = context.TODO()
|
||||
|
||||
func TestNewBadger(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = os.RemoveAll(dir)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestBadger_Version(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
ver, err := db.Version()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, CurrentVersion, ver)
|
||||
}
|
||||
|
||||
func TestBadger_AddFeed(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
err = db.AddFeed(context.Background(), getFeed())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestBadger_WalkFeeds(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
feed := getFeed()
|
||||
feed.Episodes = nil // These are not serialized to database
|
||||
|
||||
err = db.AddFeed(testCtx, feed)
|
||||
assert.NoError(t, err)
|
||||
|
||||
called := 0
|
||||
err = db.WalkFeeds(testCtx, func(actual *model.Feed) error {
|
||||
assert.EqualValues(t, feed, actual)
|
||||
called++
|
||||
return nil
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, called, 1)
|
||||
}
|
||||
|
||||
func TestBadger_DeleteFeed(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
feed := getFeed()
|
||||
err = db.AddFeed(testCtx, feed)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.DeleteFeed(testCtx, feed.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
called := 0
|
||||
|
||||
err = db.WalkFeeds(testCtx, func(feed *model.Feed) error {
|
||||
called++
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.WalkFiles(testCtx, feed.ID, func(file *model.File) error {
|
||||
called++
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, called)
|
||||
}
|
||||
|
||||
func TestBadger_WalkFiles(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
feed := getFeed()
|
||||
err = db.AddFeed(testCtx, feed)
|
||||
assert.NoError(t, err)
|
||||
|
||||
called := 0
|
||||
|
||||
err = db.WalkFiles(testCtx, feed.ID, func(file *model.File) error {
|
||||
assert.Equal(t, feed.ID, file.FeedID)
|
||||
assert.Equal(t, feed.Episodes[called].ID, file.EpisodeID)
|
||||
assert.Equal(t, feed.Episodes[called].Size, file.Size)
|
||||
assert.Equal(t, model.EpisodeNew, file.Status)
|
||||
|
||||
called++
|
||||
return nil
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, called)
|
||||
}
|
||||
|
||||
|
||||
func TestBadger_UpdateFile(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "podsync-badger-")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := NewBadger(dir)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
feed := getFeed()
|
||||
err = db.AddFeed(testCtx, feed)
|
||||
assert.NoError(t, err)
|
||||
|
||||
update := &model.File{
|
||||
EpisodeID: feed.Episodes[0].ID,
|
||||
FeedID: feed.ID,
|
||||
Size: 333,
|
||||
Status: model.EpisodeDownloaded,
|
||||
}
|
||||
|
||||
err = db.UpdateFile(update, func() error {
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
first := true
|
||||
|
||||
err = db.WalkFiles(testCtx, feed.ID, func(file *model.File) error {
|
||||
if first {
|
||||
assert.Equal(t, update, file)
|
||||
first = false
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func getFeed() *model.Feed {
|
||||
return &model.Feed{
|
||||
ID: "1",
|
||||
ItemID: "2",
|
||||
LinkType: link.TypeChannel,
|
||||
Provider: link.ProviderVimeo,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
LastAccess: time.Now().UTC(),
|
||||
ExpirationTime: time.Now().UTC().Add(1 * time.Hour),
|
||||
Format: "video",
|
||||
Quality: "high",
|
||||
PageSize: 50,
|
||||
Language: "en",
|
||||
Title: "Test",
|
||||
Description: "Test",
|
||||
PubDate: time.Now().UTC(),
|
||||
Author: "",
|
||||
ItemURL: "https://vimeo.com",
|
||||
Episodes: []*model.Episode{
|
||||
{
|
||||
ID: "1",
|
||||
Title: "Episode title 1",
|
||||
Description: "Episode description 1",
|
||||
Duration: 100,
|
||||
VideoURL: "https://vimeo.com/123",
|
||||
PubDate: time.Now().UTC(),
|
||||
Size: 1234,
|
||||
Order: "1",
|
||||
},
|
||||
{
|
||||
ID: "2",
|
||||
Title: "Episode title 2",
|
||||
Description: "Episode description 2",
|
||||
Duration: 299,
|
||||
VideoURL: "https://vimeo.com/321",
|
||||
PubDate: time.Now().UTC(),
|
||||
Size: 4321,
|
||||
Order: "2",
|
||||
},
|
||||
},
|
||||
UpdatedAt: time.Now().UTC(),
|
||||
}
|
||||
}
|
42
pkg/storage/storage.go
Normal file
42
pkg/storage/storage.go
Normal file
@ -0,0 +1,42 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
)
|
||||
|
||||
type Version int
|
||||
|
||||
const (
|
||||
CurrentVersion = 1
|
||||
)
|
||||
|
||||
var (
|
||||
ErrAlreadyExists = errors.New("object already exists")
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
Close() error
|
||||
Version() (int, error)
|
||||
|
||||
// AddFeed will:
|
||||
// - Insert or update feed info
|
||||
// - Append new episodes to the existing list of episodes
|
||||
// - Insert File model for each new episode
|
||||
AddFeed(ctx context.Context, feed *model.Feed) error
|
||||
|
||||
// WalkFeeds iterates over feeds saved to database
|
||||
WalkFeeds(ctx context.Context, cb func(feed *model.Feed) error) error
|
||||
|
||||
// DeleteFeed deletes feed and all related data from database
|
||||
DeleteFeed(ctx context.Context, feedID string) error
|
||||
|
||||
// WalkFiles walks all files for the given feed ID
|
||||
WalkFiles(ctx context.Context, feedID string, cb func(file *model.File) error) error
|
||||
|
||||
// UpdateFile updates file's status and (optionally) size.
|
||||
// Callback can be used to rollback update transaction.
|
||||
UpdateFile(file *model.File, cb func() error) error
|
||||
}
|
Reference in New Issue
Block a user