mirror of
https://github.com/mxpv/podsync.git
synced 2024-05-11 05:55:04 +00:00
Support S3-compatible provider as storage backend
This commit is contained in:
@@ -65,3 +65,18 @@ func (l *Local) copyFile(source io.Reader, destinationPath string) (int64, error
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
func (l *Local) Size(_ctx context.Context, name string) (int64, error) {
|
||||
file, err := l.Open(name)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return stat.Size(), nil
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestLocal_Size(t *testing.T) {
|
||||
_, err = stor.Create(testCtx, "1/test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
sz, err := Size(stor, "1/test")
|
||||
sz, err := stor.Size(testCtx, "1/test")
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 5, sz)
|
||||
}
|
||||
@@ -61,7 +61,7 @@ func TestLocal_NoSize(t *testing.T) {
|
||||
stor, err := NewLocal("")
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = Size(stor, "1/test")
|
||||
_, err = stor.Size(testCtx, "1/test")
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ func TestLocal_Delete(t *testing.T) {
|
||||
err = stor.Delete(testCtx, "1/test")
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = Size(stor, "1/test")
|
||||
_, err = stor.Size(testCtx, "1/test")
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
|
||||
_, err = os.Stat(filepath.Join(tmpDir, "1", "test"))
|
||||
|
||||
108
pkg/fs/s3.go
Normal file
108
pkg/fs/s3.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// S3 implements file storage for S3-compatible providers.
|
||||
type S3 struct {
|
||||
api s3iface.S3API
|
||||
uploader *s3manager.Uploader
|
||||
bucket string
|
||||
}
|
||||
|
||||
func NewS3(endpointURL, region, bucket string) (*S3, error) {
|
||||
cfg := aws.NewConfig().
|
||||
WithEndpoint(endpointURL).
|
||||
WithRegion(region).
|
||||
WithLogger(s3logger{}).
|
||||
WithLogLevel(aws.LogDebug)
|
||||
sess, err := session.NewSessionWithOptions(session.Options{Config: *cfg})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to initialize S3 session")
|
||||
}
|
||||
return &S3{
|
||||
api: s3.New(sess),
|
||||
uploader: s3manager.NewUploader(sess),
|
||||
bucket: bucket,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *S3) Open(_name string) (http.File, error) {
|
||||
return nil, errors.New("serving files from S3 is not supported")
|
||||
}
|
||||
|
||||
func (s *S3) Delete(ctx context.Context, name string) error {
|
||||
_, err := s.api.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &name,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *S3) Create(ctx context.Context, name string, reader io.Reader) (int64, error) {
|
||||
logger := log.WithField("name", name)
|
||||
|
||||
logger.Infof("uploading file to %s", s.bucket)
|
||||
r := &readerWithN{Reader: reader}
|
||||
_, err := s.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &name,
|
||||
Body: r,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "failed to upload file")
|
||||
}
|
||||
|
||||
logger.Debugf("written %d bytes", r.n)
|
||||
return int64(r.n), nil
|
||||
}
|
||||
|
||||
func (s *S3) Size(ctx context.Context, name string) (int64, error) {
|
||||
logger := log.WithField("name", name)
|
||||
|
||||
logger.Debugf("getting file size from %s", s.bucket)
|
||||
resp, err := s.api.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &name,
|
||||
})
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
if awsErr.Code() == "NotFound" {
|
||||
return 0, os.ErrNotExist
|
||||
}
|
||||
}
|
||||
return 0, errors.Wrap(err, "failed to get file size")
|
||||
}
|
||||
|
||||
return *resp.ContentLength, nil
|
||||
}
|
||||
|
||||
type readerWithN struct {
|
||||
io.Reader
|
||||
n int
|
||||
}
|
||||
|
||||
func (r *readerWithN) Read(p []byte) (n int, err error) {
|
||||
n, err = r.Reader.Read(p)
|
||||
r.n += n
|
||||
return
|
||||
}
|
||||
|
||||
type s3logger struct{}
|
||||
|
||||
func (s s3logger) Log(args ...interface{}) {
|
||||
log.Debug(args...)
|
||||
}
|
||||
@@ -16,20 +16,7 @@ type Storage interface {
|
||||
|
||||
// Delete deletes the file
|
||||
Delete(ctx context.Context, name string) error
|
||||
}
|
||||
|
||||
// Size returns storage object's size in bytes.
|
||||
func Size(storage http.FileSystem, name string) (int64, error) {
|
||||
file, err := storage.Open(name)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return stat.Size(), nil
|
||||
|
||||
// Size returns a storage object's size in bytes
|
||||
Size(ctx context.Context, name string) (int64, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user