package main import ( "bytes" "context" "fmt" "io" "os" "regexp" "sort" "time" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/mxpv/podsync/pkg/builder" "github.com/mxpv/podsync/pkg/db" "github.com/mxpv/podsync/pkg/feed" "github.com/mxpv/podsync/pkg/fs" "github.com/mxpv/podsync/pkg/model" "github.com/mxpv/podsync/pkg/ytdl" ) type Downloader interface { Download(ctx context.Context, feedConfig *feed.Config, episode *model.Episode) (io.ReadCloser, error) } type Updater struct { config *Config downloader Downloader db db.Storage fs fs.Storage keys map[model.Provider]feed.KeyProvider } func NewUpdater(config *Config, downloader Downloader, db db.Storage, fs fs.Storage) (*Updater, error) { keys := map[model.Provider]feed.KeyProvider{} for name, list := range config.Tokens { provider, err := feed.NewKeyProvider(list) if err != nil { return nil, errors.Wrapf(err, "failed to create key provider for %q", name) } keys[name] = provider } return &Updater{ config: config, downloader: downloader, db: db, fs: fs, keys: keys, }, nil } func (u *Updater) Update(ctx context.Context, feedConfig *feed.Config) error { log.WithFields(log.Fields{ "feed_id": feedConfig.ID, "format": feedConfig.Format, "quality": feedConfig.Quality, }).Infof("-> updating %s", feedConfig.URL) started := time.Now() if err := u.updateFeed(ctx, feedConfig); err != nil { return errors.Wrap(err, "update failed") } if err := u.downloadEpisodes(ctx, feedConfig); err != nil { return errors.Wrap(err, "download failed") } if err := u.buildXML(ctx, feedConfig); err != nil { return errors.Wrap(err, "xml build failed") } if err := u.buildOPML(ctx); err != nil { return errors.Wrap(err, "opml build failed") } if err := u.cleanup(ctx, feedConfig); err != nil { log.WithError(err).Error("cleanup failed") } elapsed := time.Since(started) log.Infof("successfully updated feed in %s", elapsed) return nil } // updateFeed pulls API for new episodes and saves them to database func (u *Updater) updateFeed(ctx context.Context, feedConfig *feed.Config) error { info, err := builder.ParseURL(feedConfig.URL) if err != nil { return errors.Wrapf(err, "failed to parse URL: %s", feedConfig.URL) } keyProvider, ok := u.keys[info.Provider] if !ok { return errors.Errorf("key provider %q not loaded", info.Provider) } // Create an updater for this feed type provider, err := builder.New(ctx, info.Provider, keyProvider.Get()) if err != nil { return err } // Query API to get episodes log.Debug("building feed") result, err := provider.Build(ctx, feedConfig) if err != nil { return err } log.Debugf("received %d episode(s) for %q", len(result.Episodes), result.Title) episodeSet := make(map[string]struct{}) if err := u.db.WalkEpisodes(ctx, feedConfig.ID, func(episode *model.Episode) error { if episode.Status != model.EpisodeDownloaded && episode.Status != model.EpisodeCleaned { episodeSet[episode.ID] = struct{}{} } return nil }); err != nil { return err } if err := u.db.AddFeed(ctx, feedConfig.ID, result); err != nil { return err } for _, episode := range result.Episodes { delete(episodeSet, episode.ID) } // removing episodes that are no longer available in the feed and not downloaded or cleaned for id := range episodeSet { log.Infof("removing episode %q", id) err := u.db.DeleteEpisode(feedConfig.ID, id) if err != nil { return err } } log.Debug("successfully saved updates to storage") return nil } func (u *Updater) matchRegexpFilter(pattern, str string, negative bool, logger log.FieldLogger) bool { if pattern != "" { matched, err := regexp.MatchString(pattern, str) if err != nil { logger.Warnf("pattern %q is not a valid") } else { if matched == negative { logger.Infof("skipping due to mismatch") return false } } } return true } func (u *Updater) matchFilters(episode *model.Episode, filters *feed.Filters) bool { logger := log.WithFields(log.Fields{"episode_id": episode.ID}) if !u.matchRegexpFilter(filters.Title, episode.Title, false, logger.WithField("filter", "title")) { return false } if !u.matchRegexpFilter(filters.NotTitle, episode.Title, true, logger.WithField("filter", "not_title")) { return false } if !u.matchRegexpFilter(filters.Description, episode.Description, false, logger.WithField("filter", "description")) { return false } if !u.matchRegexpFilter(filters.NotDescription, episode.Description, true, logger.WithField("filter", "not_description")) { return false } return true } func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *feed.Config) error { var ( feedID = feedConfig.ID downloadList []*model.Episode pageSize = feedConfig.PageSize ) log.WithField("page_size", pageSize).Info("downloading episodes") // Build the list of files to download if err := u.db.WalkEpisodes(ctx, feedID, func(episode *model.Episode) error { if episode.Status != model.EpisodeNew && episode.Status != model.EpisodeError { // File already downloaded return nil } if !u.matchFilters(episode, &feedConfig.Filters) { return nil } // Limit the number of episodes downloaded at once pageSize-- if pageSize < 0 { return nil } log.Debugf("adding %s (%q) to queue", episode.ID, episode.Title) downloadList = append(downloadList, episode) return nil }); err != nil { return errors.Wrapf(err, "failed to build update list") } var ( downloadCount = len(downloadList) downloaded = 0 ) if downloadCount > 0 { log.Infof("download count: %d", downloadCount) } else { log.Info("no episodes to download") return nil } // Download pending episodes for idx, episode := range downloadList { var ( logger = log.WithFields(log.Fields{"index": idx, "episode_id": episode.ID}) episodeName = feed.EpisodeName(feedConfig, episode) ) // Check whether episode already exists size, err := fs.Size(u.fs, fmt.Sprintf("%s/%s", feedID, episodeName)) if err == nil { logger.Infof("episode %q already exists on disk", episode.ID) // File already exists, update file status and disk size if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error { episode.Size = size episode.Status = model.EpisodeDownloaded return nil }); err != nil { logger.WithError(err).Error("failed to update file info") return err } continue } else if os.IsNotExist(err) { // Will download, do nothing here } else { logger.WithError(err).Error("failed to stat file") return err } // Download episode to disk // We download the episode to a temp directory first to avoid downloading this file by clients // while still being processed by youtube-dl (e.g. a file is being downloaded from YT or encoding in progress) logger.Infof("! downloading episode %s", episode.VideoURL) tempFile, err := u.downloader.Download(ctx, feedConfig, episode) if err != nil { // YouTube might block host with HTTP Error 429: Too Many Requests // We still need to generate XML, so just stop sending download requests and // retry next time if err == ytdl.ErrTooManyRequests { logger.Warn("server responded with a 'Too Many Requests' error") break } if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error { episode.Status = model.EpisodeError return nil }); err != nil { return err } continue } logger.Debug("copying file") fileSize, err := u.fs.Create(ctx, fmt.Sprintf("%s/%s", feedID, episodeName), tempFile) tempFile.Close() if err != nil { logger.WithError(err).Error("failed to copy file") return err } // Update file status in database logger.Infof("successfully downloaded file %q", episode.ID) if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error { episode.Size = fileSize episode.Status = model.EpisodeDownloaded return nil }); err != nil { return err } downloaded++ } log.Infof("downloaded %d episode(s)", downloaded) return nil } func (u *Updater) buildXML(ctx context.Context, feedConfig *feed.Config) error { f, err := u.db.GetFeed(ctx, feedConfig.ID) if err != nil { return err } // Build iTunes XML feed with data received from builder log.Debug("building iTunes podcast feed") podcast, err := feed.Build(ctx, f, feedConfig, u.config.Server.Hostname) if err != nil { return err } var ( reader = bytes.NewReader([]byte(podcast.String())) xmlName = fmt.Sprintf("%s.xml", feedConfig.ID) ) if _, err := u.fs.Create(ctx, xmlName, reader); err != nil { return errors.Wrap(err, "failed to upload new XML feed") } return nil } func (u *Updater) buildOPML(ctx context.Context) error { // Build OPML with data received from builder log.Debug("building podcast OPML") opml, err := feed.BuildOPML(ctx, u.config.Feeds, u.db, u.config.Server.Hostname) if err != nil { return err } var ( reader = bytes.NewReader([]byte(opml)) xmlName = fmt.Sprintf("%s.opml", "podsync") ) if _, err := u.fs.Create(ctx, xmlName, reader); err != nil { return errors.Wrap(err, "failed to upload OPML") } return nil } func (u *Updater) cleanup(ctx context.Context, feedConfig *feed.Config) error { var ( feedID = feedConfig.ID logger = log.WithField("feed_id", feedID) count = feedConfig.Clean.KeepLast list []*model.Episode result *multierror.Error ) if count < 1 { logger.Info("nothing to clean") return nil } logger.WithField("count", count).Info("running cleaner") if err := u.db.WalkEpisodes(ctx, feedConfig.ID, func(episode *model.Episode) error { if episode.Status == model.EpisodeDownloaded { list = append(list, episode) } return nil }); err != nil { return err } if count > len(list) { return nil } sort.Slice(list, func(i, j int) bool { return list[i].PubDate.After(list[j].PubDate) }) for _, episode := range list[count:] { logger.WithField("episode_id", episode.ID).Infof("deleting %q", episode.Title) var ( episodeName = feed.EpisodeName(feedConfig, episode) path = fmt.Sprintf("%s/%s", feedConfig.ID, episodeName) ) if err := u.fs.Delete(ctx, path); err != nil { result = multierror.Append(result, errors.Wrapf(err, "failed to delete episode: %s", episode.ID)) continue } if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error { episode.Status = model.EpisodeCleaned episode.Title = "" episode.Description = "" return nil }); err != nil { result = multierror.Append(result, errors.Wrapf(err, "failed to set state for cleaned episode: %s", episode.ID)) continue } } return result.ErrorOrNil() }