mirror of
https://github.com/mxpv/podsync.git
synced 2024-05-11 05:55:04 +00:00
@@ -2,12 +2,15 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/jessevdk/go-flags"
|
||||
"github.com/robfig/cron/v3"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
@@ -91,10 +94,6 @@ func main() {
|
||||
log.WithError(err).Fatal("failed to open database")
|
||||
}
|
||||
|
||||
// Queue of feeds to update
|
||||
updates := make(chan *config.Feed, 16)
|
||||
defer close(updates)
|
||||
|
||||
// Run updater thread
|
||||
log.Debug("creating updater")
|
||||
updater, err := NewUpdater(cfg, downloader, database)
|
||||
@@ -102,42 +101,46 @@ func main() {
|
||||
log.WithError(err).Fatal("failed to create updater")
|
||||
}
|
||||
|
||||
c := cron.New(cron.WithChain(cron.SkipIfStillRunning(nil)))
|
||||
|
||||
group.Go(func() error {
|
||||
for {
|
||||
select {
|
||||
case feed := <-updates:
|
||||
defer func() {
|
||||
log.Info("shutting down cron")
|
||||
c.Stop()
|
||||
}()
|
||||
|
||||
for _, feed := range cfg.Feeds {
|
||||
if feed.CronSchedule == "" {
|
||||
feed.CronSchedule = fmt.Sprintf("@every %s", feed.UpdatePeriod.String())
|
||||
}
|
||||
|
||||
_, err = c.AddFunc(feed.CronSchedule, func() {
|
||||
log.Debugf("adding %q to update queue", feed.URL)
|
||||
|
||||
if err := updater.Update(ctx, feed); err != nil {
|
||||
log.WithError(err).Errorf("failed to update feed: %s", feed.URL)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Run wait goroutines for each feed configuration
|
||||
for _, feed := range cfg.Feeds {
|
||||
_feed := feed
|
||||
group.Go(func() error {
|
||||
log.Debugf("-> %s (update every %s)", _feed.URL, _feed.UpdatePeriod)
|
||||
if err != nil {
|
||||
log.WithError(err).Fatalf("can't create cron task for feed: %s", feed.ID)
|
||||
}
|
||||
|
||||
log.Debugf("-> %s (update '%s')", feed.URL, feed.CronSchedule)
|
||||
|
||||
// Perform initial update after CLI restart
|
||||
updates <- _feed
|
||||
|
||||
timer := time.NewTicker(_feed.UpdatePeriod.Duration)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
log.Debugf("adding %q to update queue", _feed.URL)
|
||||
updates <- _feed
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
if err := updater.Update(ctx, feed); err != nil {
|
||||
log.WithError(err).Errorf("failed to update feed: %s", feed.URL)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
c.Start()
|
||||
|
||||
for {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
})
|
||||
|
||||
// Run web server
|
||||
srv := NewServer(cfg)
|
||||
@@ -167,7 +170,7 @@ func main() {
|
||||
}
|
||||
})
|
||||
|
||||
if err := group.Wait(); err != nil && err != context.Canceled {
|
||||
if err := group.Wait(); err != nil && (err != context.Canceled && err != http.ErrServerClosed) {
|
||||
log.WithError(err).Error("wait error")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user