mirror of
https://github.com/mxpv/podsync.git
synced 2024-05-11 05:55:04 +00:00
Merge pull request #317 from Contextualist/feat-s3
Support S3-compatible provider as storage backend
This commit is contained in:
@@ -67,7 +67,10 @@ Minimal configuration would look like this:
|
||||
```toml
|
||||
[server]
|
||||
port = 8080
|
||||
data_dir = "/data/podsync/"
|
||||
|
||||
[storage]
|
||||
[storage.local]
|
||||
data_dir = "/data/podsync/"
|
||||
|
||||
[tokens]
|
||||
youtube = "PASTE YOUR API KEY HERE"
|
||||
|
||||
+4
-1
@@ -169,8 +169,11 @@ Resources:
|
||||
tee /home/ec2-user/podsync/config.toml <<EOF
|
||||
[server]
|
||||
port = ${PodsyncPort}
|
||||
data_dir = "/home/ec2-user/podsync/data"
|
||||
hostname = "http://$publichost:${PodsyncPort}"
|
||||
|
||||
[storage]
|
||||
[storage.local]
|
||||
data_dir = "/home/ec2-user/podsync/data"
|
||||
|
||||
[tokens]
|
||||
youtube = "${YouTubeApiKey}"
|
||||
|
||||
+32
-2
@@ -9,9 +9,11 @@ import (
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/pelletier/go-toml"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/mxpv/podsync/pkg/db"
|
||||
"github.com/mxpv/podsync/pkg/feed"
|
||||
"github.com/mxpv/podsync/pkg/fs"
|
||||
"github.com/mxpv/podsync/pkg/model"
|
||||
"github.com/mxpv/podsync/pkg/ytdl"
|
||||
"github.com/mxpv/podsync/services/web"
|
||||
@@ -20,6 +22,8 @@ import (
|
||||
type Config struct {
|
||||
// Server is the web server configuration
|
||||
Server web.Config `toml:"server"`
|
||||
// S3 is the optional configuration for S3-compatible storage provider
|
||||
Storage fs.Config `toml:"storage"`
|
||||
// Log is the optional logging configuration
|
||||
Log Log `toml:"log"`
|
||||
// Database configuration
|
||||
@@ -74,8 +78,17 @@ func LoadConfig(path string) (*Config, error) {
|
||||
func (c *Config) validate() error {
|
||||
var result *multierror.Error
|
||||
|
||||
if c.Server.DataDir == "" {
|
||||
result = multierror.Append(result, errors.New("data directory is required"))
|
||||
if c.Server.DataDir != "" {
|
||||
log.Warnf(`server.data_dir is deprecated, and will be removed in a future release. Use the following config instead:
|
||||
|
||||
[storage]
|
||||
[storage.local]
|
||||
data_dir = "%s"
|
||||
|
||||
`, c.Server.DataDir)
|
||||
if c.Storage.Local.DataDir == "" {
|
||||
c.Storage.Local.DataDir = c.Server.DataDir
|
||||
}
|
||||
}
|
||||
|
||||
if c.Server.Path != "" {
|
||||
@@ -85,6 +98,19 @@ func (c *Config) validate() error {
|
||||
}
|
||||
}
|
||||
|
||||
switch c.Storage.Type {
|
||||
case "local":
|
||||
if c.Storage.Local.DataDir == "" {
|
||||
result = multierror.Append(result, errors.New("data directory is required for local storage"))
|
||||
}
|
||||
case "s3":
|
||||
if c.Storage.S3.EndpointURL == "" || c.Storage.S3.Region == "" || c.Storage.S3.Bucket == "" {
|
||||
result = multierror.Append(result, errors.New("S3 storage requires endpoint_url, region and bucket to be set"))
|
||||
}
|
||||
default:
|
||||
result = multierror.Append(result, errors.Errorf("unknown storage type: %s", c.Storage.Type))
|
||||
}
|
||||
|
||||
if len(c.Feeds) == 0 {
|
||||
result = multierror.Append(result, errors.New("at least one feed must be specified"))
|
||||
}
|
||||
@@ -107,6 +133,10 @@ func (c *Config) applyDefaults(configPath string) {
|
||||
}
|
||||
}
|
||||
|
||||
if c.Storage.Type == "" {
|
||||
c.Storage.Type = "local"
|
||||
}
|
||||
|
||||
if c.Log.Filename != "" {
|
||||
if c.Log.MaxSize == 0 {
|
||||
c.Log.MaxSize = model.DefaultLogMaxSize
|
||||
|
||||
+37
-13
@@ -26,6 +26,7 @@ import (
|
||||
|
||||
type Opts struct {
|
||||
ConfigPath string `long:"config" short:"c" default:"config.toml" env:"PODSYNC_CONFIG_PATH"`
|
||||
Headless bool `long:"headless"`
|
||||
Debug bool `long:"debug"`
|
||||
NoBanner bool `long:"no-banner"`
|
||||
}
|
||||
@@ -59,8 +60,6 @@ func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// Parse args
|
||||
opts := Opts{}
|
||||
_, err := flags.Parse(&opts)
|
||||
@@ -110,8 +109,21 @@ func main() {
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("failed to open database")
|
||||
}
|
||||
defer func() {
|
||||
if err := database.Close(); err != nil {
|
||||
log.WithError(err).Error("failed to close database")
|
||||
}
|
||||
}()
|
||||
|
||||
storage, err := fs.NewLocal(cfg.Server.DataDir)
|
||||
var storage fs.Storage
|
||||
switch cfg.Storage.Type {
|
||||
case "local":
|
||||
storage, err = fs.NewLocal(cfg.Storage.Local.DataDir)
|
||||
case "s3":
|
||||
storage, err = fs.NewS3(cfg.Storage.S3)
|
||||
default:
|
||||
log.Fatalf("unknown storage type: %s", cfg.Storage.Type)
|
||||
}
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("failed to open storage")
|
||||
}
|
||||
@@ -133,6 +145,24 @@ func main() {
|
||||
log.WithError(err).Fatal("failed to create updater")
|
||||
}
|
||||
|
||||
// In Headless mode, do one round of feed updates and quit
|
||||
if opts.Headless {
|
||||
for _, feed := range cfg.Feeds {
|
||||
if err := manager.Update(ctx, feed); err != nil {
|
||||
log.WithError(err).Errorf("failed to update feed: %s", feed.URL)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
defer func() {
|
||||
if err := group.Wait(); err != nil && (err != context.Canceled && err != http.ErrServerClosed) {
|
||||
log.WithError(err).Error("wait error")
|
||||
}
|
||||
log.Info("gracefully stopped")
|
||||
}()
|
||||
|
||||
// Queue of feeds to update
|
||||
updates := make(chan *feed.Config, 16)
|
||||
defer close(updates)
|
||||
@@ -191,6 +221,10 @@ func main() {
|
||||
}
|
||||
})
|
||||
|
||||
if cfg.Storage.Type == "s3" {
|
||||
return // S3 content is hosted externally
|
||||
}
|
||||
|
||||
// Run web server
|
||||
srv := web.New(cfg.Server, storage)
|
||||
|
||||
@@ -222,14 +256,4 @@ func main() {
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err := group.Wait(); err != nil && (err != context.Canceled && err != http.ErrServerClosed) {
|
||||
log.WithError(err).Error("wait error")
|
||||
}
|
||||
|
||||
if err := database.Close(); err != nil {
|
||||
log.WithError(err).Error("failed to close database")
|
||||
}
|
||||
|
||||
log.Info("gracefully stopped")
|
||||
}
|
||||
|
||||
+15
-1
@@ -11,7 +11,21 @@ hostname = "https://my.test.host:4443"
|
||||
bind_address = "172.20.10.2"
|
||||
# Specify path for reverse proxy and only [A-Za-z0-9]
|
||||
path = "test"
|
||||
data_dir = "/app/data" # Don't change if you run podsync via docker
|
||||
|
||||
# Configure where to store the episode data
|
||||
[storage]
|
||||
# Could be "local" (default) for the local file system, or "s3" for a S3-compatible storage provider (e.g. AWS S3)
|
||||
type = "local"
|
||||
|
||||
[storage.local]
|
||||
data_dir = "/app/data" # Don't change if you run podsync via docker
|
||||
|
||||
# To configure for a S3 provider, set key and secret in environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`, respectively;
|
||||
# then fillout the API endpoint, region, and bucket below.
|
||||
[storage.s3]
|
||||
endpoint = "https://s3.us-west-2.amazonaws.com"
|
||||
region = "us-west-2"
|
||||
bucket = "example-bucket-name"
|
||||
|
||||
# API keys to be used to access Youtube and Vimeo.
|
||||
# These can be either specified as string parameter or array of string (so those will be rotated).
|
||||
|
||||
@@ -2,6 +2,7 @@ module github.com/mxpv/podsync
|
||||
|
||||
require (
|
||||
github.com/BrianHicks/finch v0.0.0-20140409222414-419bd73c29ec
|
||||
github.com/aws/aws-sdk-go v1.44.30
|
||||
github.com/dgraph-io/badger v1.6.2
|
||||
github.com/eduncan911/podcast v1.4.2
|
||||
github.com/gilliek/go-opml v1.0.0
|
||||
@@ -20,7 +21,6 @@ require (
|
||||
google.golang.org/api v0.0.0-20180718221112-efcb5f25ac56
|
||||
google.golang.org/appengine v1.1.0 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
gopkg.in/yaml.v2 v2.2.8 // indirect
|
||||
)
|
||||
|
||||
go 1.13
|
||||
|
||||
@@ -7,6 +7,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||
github.com/aws/aws-sdk-go v1.44.30 h1:w7sTp6jFWRaZCDg08fUx8X4IOU4sgbCR+e+1qSmf+bc=
|
||||
github.com/aws/aws-sdk-go v1.44.30/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
|
||||
@@ -43,6 +45,10 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
|
||||
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
|
||||
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
|
||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
@@ -91,8 +97,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
|
||||
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/oauth2 v0.0.0-20180620175406-ef147856a6dd h1:QQhib242ErYDSMitlBm8V7wYCm/1a25hV8qMadIKLPA=
|
||||
golang.org/x/oauth2 v0.0.0-20180620175406-ef147856a6dd/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -106,11 +113,16 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
|
||||
@@ -11,6 +11,11 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// LocalConfig is the storage configuration for local file system
|
||||
type LocalConfig struct {
|
||||
DataDir string `yaml:"data_dir"`
|
||||
}
|
||||
|
||||
// Local implements local file storage
|
||||
type Local struct {
|
||||
rootDir string
|
||||
@@ -65,3 +70,18 @@ func (l *Local) copyFile(source io.Reader, destinationPath string) (int64, error
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
func (l *Local) Size(_ctx context.Context, name string) (int64, error) {
|
||||
file, err := l.Open(name)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return stat.Size(), nil
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestLocal_Size(t *testing.T) {
|
||||
_, err = stor.Create(testCtx, "1/test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
sz, err := Size(stor, "1/test")
|
||||
sz, err := stor.Size(testCtx, "1/test")
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 5, sz)
|
||||
}
|
||||
@@ -61,7 +61,7 @@ func TestLocal_NoSize(t *testing.T) {
|
||||
stor, err := NewLocal("")
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = Size(stor, "1/test")
|
||||
_, err = stor.Size(testCtx, "1/test")
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ func TestLocal_Delete(t *testing.T) {
|
||||
err = stor.Delete(testCtx, "1/test")
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = Size(stor, "1/test")
|
||||
_, err = stor.Size(testCtx, "1/test")
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
|
||||
_, err = os.Stat(filepath.Join(tmpDir, "1", "test"))
|
||||
|
||||
+118
@@ -0,0 +1,118 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// S3Config is the configuration for a S3-compatible storage provider
|
||||
type S3Config struct {
|
||||
// S3 Bucket to store files
|
||||
Bucket string `toml:"bucket"`
|
||||
// Region of the S3 service
|
||||
Region string `toml:"region"`
|
||||
// EndpointURL is an HTTP endpoint of the S3 API
|
||||
EndpointURL string `toml:"endpoint_url"`
|
||||
}
|
||||
|
||||
// S3 implements file storage for S3-compatible providers.
|
||||
type S3 struct {
|
||||
api s3iface.S3API
|
||||
uploader *s3manager.Uploader
|
||||
bucket string
|
||||
}
|
||||
|
||||
func NewS3(c S3Config) (*S3, error) {
|
||||
cfg := aws.NewConfig().
|
||||
WithEndpoint(c.EndpointURL).
|
||||
WithRegion(c.Region).
|
||||
WithLogger(s3logger{}).
|
||||
WithLogLevel(aws.LogDebug)
|
||||
sess, err := session.NewSessionWithOptions(session.Options{Config: *cfg})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to initialize S3 session")
|
||||
}
|
||||
return &S3{
|
||||
api: s3.New(sess),
|
||||
uploader: s3manager.NewUploader(sess),
|
||||
bucket: c.Bucket,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *S3) Open(_name string) (http.File, error) {
|
||||
return nil, errors.New("serving files from S3 is not supported")
|
||||
}
|
||||
|
||||
func (s *S3) Delete(ctx context.Context, name string) error {
|
||||
_, err := s.api.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &name,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *S3) Create(ctx context.Context, name string, reader io.Reader) (int64, error) {
|
||||
logger := log.WithField("name", name)
|
||||
|
||||
logger.Infof("uploading file to %s", s.bucket)
|
||||
r := &readerWithN{Reader: reader}
|
||||
_, err := s.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &name,
|
||||
Body: r,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "failed to upload file")
|
||||
}
|
||||
|
||||
logger.Debugf("written %d bytes", r.n)
|
||||
return int64(r.n), nil
|
||||
}
|
||||
|
||||
func (s *S3) Size(ctx context.Context, name string) (int64, error) {
|
||||
logger := log.WithField("name", name)
|
||||
|
||||
logger.Debugf("getting file size from %s", s.bucket)
|
||||
resp, err := s.api.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &name,
|
||||
})
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
if awsErr.Code() == "NotFound" {
|
||||
return 0, os.ErrNotExist
|
||||
}
|
||||
}
|
||||
return 0, errors.Wrap(err, "failed to get file size")
|
||||
}
|
||||
|
||||
return *resp.ContentLength, nil
|
||||
}
|
||||
|
||||
type readerWithN struct {
|
||||
io.Reader
|
||||
n int
|
||||
}
|
||||
|
||||
func (r *readerWithN) Read(p []byte) (n int, err error) {
|
||||
n, err = r.Reader.Read(p)
|
||||
r.n += n
|
||||
return
|
||||
}
|
||||
|
||||
type s3logger struct{}
|
||||
|
||||
func (s s3logger) Log(args ...interface{}) {
|
||||
log.Debug(args...)
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/client/metadata"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestS3_Create(t *testing.T) {
|
||||
files := make(map[string][]byte)
|
||||
stor, err := newMockS3(files)
|
||||
assert.NoError(t, err)
|
||||
|
||||
written, err := stor.Create(testCtx, "1/test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3}))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 5, written)
|
||||
|
||||
d, ok := files["1/test"]
|
||||
assert.True(t, ok)
|
||||
assert.EqualValues(t, 5, len(d))
|
||||
}
|
||||
|
||||
func TestS3_Size(t *testing.T) {
|
||||
files := make(map[string][]byte)
|
||||
stor, err := newMockS3(files)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = stor.Create(testCtx, "1/test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
sz, err := stor.Size(testCtx, "1/test")
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 5, sz)
|
||||
}
|
||||
|
||||
func TestS3_NoSize(t *testing.T) {
|
||||
files := make(map[string][]byte)
|
||||
stor, err := newMockS3(files)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = stor.Size(testCtx, "1/test")
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
func TestS3_Delete(t *testing.T) {
|
||||
files := make(map[string][]byte)
|
||||
stor, err := newMockS3(files)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = stor.Create(testCtx, "1/test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = stor.Delete(testCtx, "1/test")
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = stor.Size(testCtx, "1/test")
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
|
||||
_, ok := files["1/test"]
|
||||
assert.False(t, ok)
|
||||
}
|
||||
|
||||
type mockS3API struct {
|
||||
s3iface.S3API
|
||||
files map[string][]byte
|
||||
}
|
||||
|
||||
func newMockS3(files map[string][]byte) (*S3, error) {
|
||||
api := &mockS3API{files: files}
|
||||
return &S3{
|
||||
api: api,
|
||||
uploader: s3manager.NewUploaderWithClient(api),
|
||||
bucket: "mock-bucket",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mockS3API) PutObjectRequest(input *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
|
||||
content, _ := ioutil.ReadAll(input.Body)
|
||||
req := request.New(aws.Config{}, metadata.ClientInfo{}, request.Handlers{}, nil, &request.Operation{}, nil, nil)
|
||||
m.files[*input.Key] = content
|
||||
return req, &s3.PutObjectOutput{}
|
||||
}
|
||||
|
||||
func (m *mockS3API) HeadObjectWithContext(ctx aws.Context, input *s3.HeadObjectInput, opts ...request.Option) (*s3.HeadObjectOutput, error) {
|
||||
if _, ok := m.files[*input.Key]; ok {
|
||||
return &s3.HeadObjectOutput{ContentLength: aws.Int64(int64(len(m.files[*input.Key])))}, nil
|
||||
}
|
||||
return nil, awserr.New("NotFound", "", nil)
|
||||
}
|
||||
|
||||
func (m *mockS3API) DeleteObjectWithContext(ctx aws.Context, input *s3.DeleteObjectInput, opts ...request.Option) (*s3.DeleteObjectOutput, error) {
|
||||
if _, ok := m.files[*input.Key]; ok {
|
||||
delete(m.files, *input.Key)
|
||||
return &s3.DeleteObjectOutput{}, nil
|
||||
}
|
||||
return nil, awserr.New("NotFound", "", nil)
|
||||
}
|
||||
+9
-14
@@ -16,20 +16,15 @@ type Storage interface {
|
||||
|
||||
// Delete deletes the file
|
||||
Delete(ctx context.Context, name string) error
|
||||
|
||||
// Size returns a storage object's size in bytes
|
||||
Size(ctx context.Context, name string) (int64, error)
|
||||
}
|
||||
|
||||
// Size returns storage object's size in bytes.
|
||||
func Size(storage http.FileSystem, name string) (int64, error) {
|
||||
file, err := storage.Open(name)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return stat.Size(), nil
|
||||
// Config is a configuration for the file storage backend
|
||||
type Config struct {
|
||||
// Type is the type of file system to use
|
||||
Type string `toml:"type"`
|
||||
Local LocalConfig `toml:"local"`
|
||||
S3 S3Config `toml:"s3"`
|
||||
}
|
||||
|
||||
@@ -200,7 +200,7 @@ func (u *Manager) downloadEpisodes(ctx context.Context, feedConfig *feed.Config)
|
||||
)
|
||||
|
||||
// Check whether episode already exists
|
||||
size, err := fs.Size(u.fs, fmt.Sprintf("%s/%s", feedID, episodeName))
|
||||
size, err := u.fs.Size(ctx, fmt.Sprintf("%s/%s", feedID, episodeName))
|
||||
if err == nil {
|
||||
logger.Infof("episode %q already exists on disk", episode.ID)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user