Start on big scanner code refactoring
This commit is contained in:
parent
e9a4839f48
commit
6361df1793
|
@ -6,13 +6,14 @@ import (
|
|||
|
||||
"github.com/photoview/photoview/api/database/drivers"
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/scanner"
|
||||
"github.com/photoview/photoview/api/scanner/periodic_scanner"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_queue"
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func (r *mutationResolver) ScanAll(ctx context.Context) (*models.ScannerResult, error) {
|
||||
err := scanner.AddAllToQueue()
|
||||
err := scanner_queue.AddAllToQueue()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -32,7 +33,7 @@ func (r *mutationResolver) ScanUser(ctx context.Context, userID int) (*models.Sc
|
|||
return nil, errors.Wrap(err, "get user from database")
|
||||
}
|
||||
|
||||
scanner.AddUserToQueue(&user)
|
||||
scanner_queue.AddUserToQueue(&user)
|
||||
|
||||
startMessage := "Scanner started"
|
||||
return &models.ScannerResult{
|
||||
|
@ -57,7 +58,7 @@ func (r *mutationResolver) SetPeriodicScanInterval(ctx context.Context, interval
|
|||
return 0, err
|
||||
}
|
||||
|
||||
scanner.ChangePeriodicScanInterval(time.Duration(siteInfo.PeriodicScanInterval) * time.Second)
|
||||
periodic_scanner.ChangePeriodicScanInterval(time.Duration(siteInfo.PeriodicScanInterval) * time.Second)
|
||||
|
||||
return siteInfo.PeriodicScanInterval, nil
|
||||
}
|
||||
|
@ -81,7 +82,7 @@ func (r *mutationResolver) SetScannerConcurrentWorkers(ctx context.Context, work
|
|||
return 0, err
|
||||
}
|
||||
|
||||
scanner.ChangeScannerConcurrentWorkers(siteInfo.ConcurrentWorkers)
|
||||
scanner_queue.ChangeScannerConcurrentWorkers(siteInfo.ConcurrentWorkers)
|
||||
|
||||
return siteInfo.ConcurrentWorkers, nil
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package scanner
|
||||
package periodic_scanner
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_queue"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
|
@ -81,7 +82,7 @@ func scanIntervalRunner() {
|
|||
log.Print("Scan interval runner: New ticker detected")
|
||||
case <-mainPeriodicScanner.ticker.C:
|
||||
log.Print("Scan interval runner: Starting periodic scan")
|
||||
AddAllToQueue()
|
||||
scanner_queue.AddAllToQueue()
|
||||
}
|
||||
} else {
|
||||
<-mainPeriodicScanner.ticker_changed
|
|
@ -6,16 +6,15 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/graphql/notification"
|
||||
"github.com/photoview/photoview/api/scanner/face_detection"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_cache"
|
||||
"github.com/photoview/photoview/api/scanner/media_encoding"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_task"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_tasks"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_utils"
|
||||
"github.com/photoview/photoview/api/utils"
|
||||
"github.com/pkg/errors"
|
||||
ignore "github.com/sabhiram/go-gitignore"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
|
@ -56,7 +55,7 @@ func NewRootAlbum(db *gorm.DB, rootPath string, owner *models.User) (*models.Alb
|
|||
}
|
||||
|
||||
if err := db.Model(&owner).Association("Albums").Append(&album); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to add owner to already existing album")
|
||||
return nil, errors.Wrap(err, "add owner to already existing album")
|
||||
}
|
||||
|
||||
return &album, nil
|
||||
|
@ -87,142 +86,145 @@ func ValidRootPath(rootPath string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func scanAlbum(album *models.Album, cache *scanner_cache.AlbumScannerCache, db *gorm.DB) {
|
||||
func ScanAlbum(ctx scanner_task.TaskContext) {
|
||||
|
||||
album_notify_key := utils.GenerateToken()
|
||||
notifyThrottle := utils.NewThrottle(500 * time.Millisecond)
|
||||
notifyThrottle.Trigger(nil)
|
||||
newCtx, err := scanner_tasks.Tasks.BeforeScanAlbum(ctx)
|
||||
if err != nil {
|
||||
scanner_utils.ScannerError("before scan album (%s): %s", ctx.GetAlbum().Path, err)
|
||||
return
|
||||
}
|
||||
ctx = newCtx
|
||||
|
||||
// Scan for photos
|
||||
albumMedia, err := findMediaForAlbum(album, cache, db, func(photo *models.Media, newPhoto bool) {
|
||||
if newPhoto {
|
||||
notifyThrottle.Trigger(func() {
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: album_notify_key,
|
||||
Type: models.NotificationTypeMessage,
|
||||
Header: fmt.Sprintf("Found new media in album '%s'", album.Title),
|
||||
Content: fmt.Sprintf("Found %s", photo.Path),
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
albumMedia, err := findMediaForAlbum(ctx)
|
||||
if err != nil {
|
||||
scanner_utils.ScannerError("Failed to find media for album (%s): %s", album.Path, err)
|
||||
scanner_utils.ScannerError("find media for album (%s): %s", ctx.GetAlbum().Path, err)
|
||||
return
|
||||
}
|
||||
|
||||
album_has_changes := false
|
||||
albumHasChanges := false
|
||||
for count, media := range albumMedia {
|
||||
processing_was_needed := false
|
||||
didProcess := false
|
||||
|
||||
transactionError := db.Transaction(func(tx *gorm.DB) error {
|
||||
processing_was_needed, err = ProcessMedia(tx, media)
|
||||
transactionError := ctx.GetDB().Transaction(func(tx *gorm.DB) error {
|
||||
// processing_was_needed, err = ProcessMedia(tx, media)
|
||||
didProcess, err = processMedia(ctx, media)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to process photo (%s)", media.Path)
|
||||
return errors.Wrapf(err, "process media (%s)", media.Path)
|
||||
}
|
||||
|
||||
if processing_was_needed {
|
||||
album_has_changes = true
|
||||
progress := float64(count) / float64(len(albumMedia)) * 100.0
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: album_notify_key,
|
||||
Type: models.NotificationTypeProgress,
|
||||
Header: fmt.Sprintf("Processing media for album '%s'", album.Title),
|
||||
Content: fmt.Sprintf("Processed media at %s", media.Path),
|
||||
Progress: &progress,
|
||||
})
|
||||
if didProcess {
|
||||
albumHasChanges = true
|
||||
}
|
||||
|
||||
if err = scanner_tasks.Tasks.AfterProcessMedia(ctx, media, didProcess, count, len(albumMedia)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if transactionError != nil {
|
||||
scanner_utils.ScannerError("Failed to begin database transaction: %s", transactionError)
|
||||
scanner_utils.ScannerError("begin database transaction: %s", transactionError)
|
||||
}
|
||||
|
||||
if processing_was_needed && media.Type == models.MediaTypePhoto {
|
||||
if didProcess && media.Type == models.MediaTypePhoto {
|
||||
go func(media *models.Media) {
|
||||
if face_detection.GlobalFaceDetector == nil {
|
||||
return
|
||||
}
|
||||
if err := face_detection.GlobalFaceDetector.DetectFaces(db, media); err != nil {
|
||||
if err := face_detection.GlobalFaceDetector.DetectFaces(ctx.GetDB(), media); err != nil {
|
||||
scanner_utils.ScannerError("Error detecting faces in image (%s): %s", media.Path, err)
|
||||
}
|
||||
}(media)
|
||||
}
|
||||
}
|
||||
|
||||
cleanup_errors := CleanupMedia(db, album.ID, albumMedia)
|
||||
cleanup_errors := CleanupMedia(ctx.GetDB(), ctx.GetAlbum().ID, albumMedia)
|
||||
for _, err := range cleanup_errors {
|
||||
scanner_utils.ScannerError("Failed to delete old media: %s", err)
|
||||
scanner_utils.ScannerError("delete old media: %s", err)
|
||||
}
|
||||
|
||||
if album_has_changes {
|
||||
timeoutDelay := 2000
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: album_notify_key,
|
||||
Type: models.NotificationTypeMessage,
|
||||
Positive: true,
|
||||
Header: fmt.Sprintf("Done processing media for album '%s'", album.Title),
|
||||
Content: fmt.Sprintf("All media have been processed"),
|
||||
Timeout: &timeoutDelay,
|
||||
})
|
||||
if err := scanner_tasks.Tasks.AfterScanAlbum(ctx, albumHasChanges); err != nil {
|
||||
scanner_utils.ScannerError("after scan album: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func findMediaForAlbum(album *models.Album, cache *scanner_cache.AlbumScannerCache, db *gorm.DB, onScanPhoto func(photo *models.Media, newPhoto bool)) ([]*models.Media, error) {
|
||||
func findMediaForAlbum(ctx scanner_task.TaskContext) ([]*models.Media, error) {
|
||||
|
||||
albumPhotos := make([]*models.Media, 0)
|
||||
albumMedia := make([]*models.Media, 0)
|
||||
|
||||
dirContent, err := ioutil.ReadDir(album.Path)
|
||||
dirContent, err := ioutil.ReadDir(ctx.GetAlbum().Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get ignore data
|
||||
albumIgnore := ignore.CompileIgnoreLines(*cache.GetAlbumIgnore(album.Path)...)
|
||||
|
||||
for _, item := range dirContent {
|
||||
photoPath := path.Join(album.Path, item.Name())
|
||||
mediaPath := path.Join(ctx.GetAlbum().Path, item.Name())
|
||||
|
||||
isDirSymlink, err := utils.IsDirSymlink(photoPath)
|
||||
isDirSymlink, err := utils.IsDirSymlink(mediaPath)
|
||||
if err != nil {
|
||||
log.Printf("Cannot detect whether %s is symlink to a directory. Pretending it is not", photoPath)
|
||||
log.Printf("Cannot detect whether %s is symlink to a directory. Pretending it is not", mediaPath)
|
||||
isDirSymlink = false
|
||||
}
|
||||
|
||||
if !item.IsDir() && !isDirSymlink && cache.IsPathMedia(photoPath) {
|
||||
// Match file against ignore data
|
||||
if albumIgnore.MatchesPath(item.Name()) {
|
||||
log.Printf("File %s ignored\n", item.Name())
|
||||
if !item.IsDir() && !isDirSymlink && ctx.GetCache().IsPathMedia(mediaPath) {
|
||||
|
||||
skip, err := scanner_tasks.Tasks.MediaFound(ctx, item, mediaPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip the JPEGs that are compressed version of raw files
|
||||
counterpartFile := scanForRawCounterpartFile(photoPath)
|
||||
counterpartFile := scanForRawCounterpartFile(mediaPath)
|
||||
if counterpartFile != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := db.Transaction(func(tx *gorm.DB) error {
|
||||
media, isNewMedia, err := ScanMedia(tx, photoPath, album.ID, cache)
|
||||
err = ctx.GetDB().Transaction(func(tx *gorm.DB) error {
|
||||
media, isNewMedia, err := ScanMedia(tx, mediaPath, ctx.GetAlbum().ID, ctx.GetCache())
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Scanning media error (%s)", photoPath)
|
||||
return errors.Wrapf(err, "scanning media error (%s)", mediaPath)
|
||||
}
|
||||
|
||||
onScanPhoto(media, isNewMedia)
|
||||
if err = scanner_tasks.Tasks.AfterMediaFound(ctx, media, isNewMedia); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
albumPhotos = append(albumPhotos, media)
|
||||
albumMedia = append(albumMedia, media)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
scanner_utils.ScannerError("Error scanning media for album (%d): %s\n", album.ID, err)
|
||||
scanner_utils.ScannerError("Error scanning media for album (%d): %s\n", ctx.GetAlbum().ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return albumPhotos, nil
|
||||
return albumMedia, nil
|
||||
}
|
||||
|
||||
func processMedia(ctx scanner_task.TaskContext, media *models.Media) (bool, error) {
|
||||
mediaData := media_encoding.EncodeMediaData{
|
||||
Media: media,
|
||||
}
|
||||
|
||||
_, err := mediaData.ContentType()
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "get content-type of media (%s)", media.Path)
|
||||
}
|
||||
|
||||
// Make sure media cache directory exists
|
||||
mediaCachePath, err := makeMediaCacheDir(media)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "cache directory error")
|
||||
}
|
||||
|
||||
return scanner_tasks.Tasks.ProcessMedia(ctx, &mediaData, *mediaCachePath)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package scanner
|
||||
package scanner_queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
@ -8,20 +9,24 @@ import (
|
|||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/graphql/notification"
|
||||
"github.com/photoview/photoview/api/scanner"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_cache"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_task"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_utils"
|
||||
"github.com/photoview/photoview/api/utils"
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// ScannerJob describes a job on the queue to be run by the scanner over a single album
|
||||
type ScannerJob struct {
|
||||
album *models.Album
|
||||
cache *scanner_cache.AlbumScannerCache
|
||||
ctx scanner_task.TaskContext
|
||||
// album *models.Album
|
||||
// cache *scanner_cache.AlbumScannerCache
|
||||
}
|
||||
|
||||
func (job *ScannerJob) Run(db *gorm.DB) {
|
||||
scanAlbum(job.album, job.cache, db)
|
||||
scanner.ScanAlbum(job.ctx)
|
||||
}
|
||||
|
||||
type ScannerQueueSettings struct {
|
||||
|
@ -162,7 +167,7 @@ func (queue *ScannerQueue) processQueue(notifyThrottle *utils.Throttle) {
|
|||
Positive: true,
|
||||
})
|
||||
|
||||
if err := GenerateBlurhashes(queue.db); err != nil {
|
||||
if err := scanner.GenerateBlurhashes(queue.db); err != nil {
|
||||
scanner_utils.ScannerError("Failed to generate blurhashes: %v", err)
|
||||
}
|
||||
|
||||
|
@ -214,7 +219,7 @@ func AddAllToQueue() error {
|
|||
|
||||
func AddUserToQueue(user *models.User) error {
|
||||
album_cache := scanner_cache.MakeAlbumCache()
|
||||
albums, album_errors := findAlbumsForUser(global_scanner_queue.db, user, album_cache)
|
||||
albums, album_errors := scanner.FindAlbumsForUser(global_scanner_queue.db, user, album_cache)
|
||||
for _, err := range album_errors {
|
||||
return errors.Wrapf(err, "find albums for user (user_id: %d)", user.ID)
|
||||
}
|
||||
|
@ -222,8 +227,7 @@ func AddUserToQueue(user *models.User) error {
|
|||
global_scanner_queue.mutex.Lock()
|
||||
for _, album := range albums {
|
||||
global_scanner_queue.addJob(&ScannerJob{
|
||||
album: album,
|
||||
cache: album_cache,
|
||||
ctx: scanner_task.NewTaskContext(context.Background(), global_scanner_queue.db, album, album_cache),
|
||||
})
|
||||
}
|
||||
global_scanner_queue.mutex.Unlock()
|
||||
|
@ -248,7 +252,7 @@ func (queue *ScannerQueue) jobOnQueue(job *ScannerJob) (bool, error) {
|
|||
scannerJobs := append(queue.in_progress, queue.up_next...)
|
||||
|
||||
for _, scannerJob := range scannerJobs {
|
||||
if scannerJob.album.ID == job.album.ID {
|
||||
if scannerJob.ctx.GetAlbum().ID == job.ctx.GetAlbum().ID {
|
||||
return true, nil
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package scanner
|
||||
package scanner_queue
|
||||
|
||||
import (
|
||||
"testing"
|
|
@ -0,0 +1,71 @@
|
|||
package scanner_task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/fs"
|
||||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/scanner/media_encoding"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_cache"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// ScannerTask is an interface for a task to be performed as a part of the scanner pipeline
|
||||
type ScannerTask interface {
|
||||
// BeforeScanAlbum will run at the beginning of the scan task.
|
||||
// New values can be stored in the returned TaskContext that will live throughout the lifetime of the task.
|
||||
BeforeScanAlbum(ctx TaskContext) (TaskContext, error)
|
||||
AfterScanAlbum(ctx TaskContext, albumHadChanges bool) error
|
||||
|
||||
MediaFound(ctx TaskContext, fileInfo fs.FileInfo, mediaPath string) (skip bool, err error)
|
||||
AfterMediaFound(ctx TaskContext, media *models.Media, newMedia bool) error
|
||||
|
||||
BeforeProcessMedia(ctx TaskContext, media *models.Media) (TaskContext, error)
|
||||
ProcessMedia(ctx TaskContext, mediaData *media_encoding.EncodeMediaData, mediaCachePath string) (didProcess bool, err error)
|
||||
AfterProcessMedia(ctx TaskContext, media *models.Media, didProcess bool, mediaIndex int, mediaTotal int) error
|
||||
}
|
||||
|
||||
type TaskContext struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewTaskContext(parent context.Context, db *gorm.DB, album *models.Album, cache *scanner_cache.AlbumScannerCache) TaskContext {
|
||||
ctx := parent
|
||||
ctx = context.WithValue(ctx, taskCtxKeyAlbum, album)
|
||||
ctx = context.WithValue(ctx, taskCtxKeyAlbumCache, cache)
|
||||
ctx = context.WithValue(ctx, taskCtxKeyDatabase, db.WithContext(ctx))
|
||||
|
||||
return TaskContext{
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
type taskCtxKeyType string
|
||||
|
||||
const (
|
||||
taskCtxKeyAlbum taskCtxKeyType = "task_album"
|
||||
taskCtxKeyAlbumCache taskCtxKeyType = "task_album_cache"
|
||||
taskCtxKeyDatabase taskCtxKeyType = "task_database"
|
||||
)
|
||||
|
||||
func (c TaskContext) GetAlbum() *models.Album {
|
||||
return c.ctx.Value(taskCtxKeyAlbum).(*models.Album)
|
||||
}
|
||||
|
||||
func (c TaskContext) GetCache() *scanner_cache.AlbumScannerCache {
|
||||
return c.ctx.Value(taskCtxKeyAlbumCache).(*scanner_cache.AlbumScannerCache)
|
||||
}
|
||||
|
||||
func (c TaskContext) GetDB() *gorm.DB {
|
||||
return c.ctx.Value(taskCtxKeyDatabase).(*gorm.DB)
|
||||
}
|
||||
|
||||
func (c TaskContext) WithValue(key, val interface{}) TaskContext {
|
||||
return TaskContext{
|
||||
ctx: context.WithValue(c.ctx, key, val),
|
||||
}
|
||||
}
|
||||
|
||||
func (c TaskContext) Value(key interface{}) interface{} {
|
||||
return c.ctx.Value(key)
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package scanner_task
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/scanner/media_encoding"
|
||||
)
|
||||
|
||||
// ScannerTaskBase provides a default "empty" implementation of ScannerTask,
|
||||
type ScannerTaskBase struct{}
|
||||
|
||||
func (t ScannerTaskBase) BeforeScanAlbum(ctx TaskContext) (TaskContext, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (t ScannerTaskBase) AfterScanAlbum(ctx TaskContext, albumHadChanges bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t ScannerTaskBase) MediaFound(ctx TaskContext, fileInfo fs.FileInfo, mediaPath string) (skip bool, err error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (t ScannerTaskBase) AfterMediaFound(ctx TaskContext, media *models.Media, newMedia bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t ScannerTaskBase) BeforeProcessMedia(ctx TaskContext, media *models.Media) (TaskContext, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (t ScannerTaskBase) ProcessMedia(ctx TaskContext, mediaData *media_encoding.EncodeMediaData, mediaCachePath string) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (t ScannerTaskBase) AfterProcessMedia(ctx TaskContext, media *models.Media, didProcess bool, mediaIndex int, mediaTotal int) error {
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package scanner_tasks
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"log"
|
||||
|
||||
"github.com/photoview/photoview/api/scanner/scanner_task"
|
||||
ignore "github.com/sabhiram/go-gitignore"
|
||||
)
|
||||
|
||||
type IgnorefileTask struct {
|
||||
scanner_task.ScannerTaskBase
|
||||
}
|
||||
|
||||
type ignorefileTaskKey string
|
||||
|
||||
const albumIgnoreKey ignorefileTaskKey = "album_ignore_key"
|
||||
|
||||
func getAlbumIgnore(ctx scanner_task.TaskContext) *ignore.GitIgnore {
|
||||
return ctx.Value(albumIgnoreKey).(*ignore.GitIgnore)
|
||||
}
|
||||
|
||||
func (t IgnorefileTask) BeforeScanAlbum(ctx scanner_task.TaskContext) (scanner_task.TaskContext, error) {
|
||||
albumIgnore := ignore.CompileIgnoreLines(*ctx.GetCache().GetAlbumIgnore(ctx.GetAlbum().Path)...)
|
||||
return ctx.WithValue(albumIgnoreKey, albumIgnore), nil
|
||||
}
|
||||
|
||||
func (t IgnorefileTask) MediaFound(ctx scanner_task.TaskContext, fileInfo fs.FileInfo, mediaPath string) (bool, error) {
|
||||
|
||||
// Match file against ignore data
|
||||
if getAlbumIgnore(ctx).MatchesPath(fileInfo.Name()) {
|
||||
log.Printf("File %s ignored\n", fileInfo.Name())
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package scanner_tasks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/graphql/notification"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_task"
|
||||
"github.com/photoview/photoview/api/utils"
|
||||
)
|
||||
|
||||
type NotificationTask struct {
|
||||
scanner_task.ScannerTaskBase
|
||||
throttle utils.Throttle
|
||||
albumKey string
|
||||
}
|
||||
|
||||
func NewNotificationTask() NotificationTask {
|
||||
notifyThrottle := utils.NewThrottle(500 * time.Millisecond)
|
||||
notifyThrottle.Trigger(nil)
|
||||
|
||||
return NotificationTask{
|
||||
albumKey: utils.GenerateToken(),
|
||||
throttle: notifyThrottle,
|
||||
}
|
||||
}
|
||||
|
||||
func (t NotificationTask) AfterMediaFound(ctx scanner_task.TaskContext, media *models.Media, newMedia bool) error {
|
||||
if newMedia {
|
||||
t.throttle.Trigger(func() {
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: t.albumKey,
|
||||
Type: models.NotificationTypeMessage,
|
||||
Header: fmt.Sprintf("Found new media in album '%s'", ctx.GetAlbum().Title),
|
||||
Content: fmt.Sprintf("Found %s", media.Path),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t NotificationTask) AfterProcessMedia(ctx scanner_task.TaskContext, media *models.Media, didProcess bool, mediaIndex int, mediaTotal int) error {
|
||||
if didProcess {
|
||||
progress := float64(mediaIndex) / float64(mediaTotal) * 100.0
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: t.albumKey,
|
||||
Type: models.NotificationTypeProgress,
|
||||
Header: fmt.Sprintf("Processing media for album '%s'", ctx.GetAlbum().Title),
|
||||
Content: fmt.Sprintf("Processed media at %s", media.Path),
|
||||
Progress: &progress,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t NotificationTask) AfterScanAlbum(ctx scanner_task.TaskContext, albumHadChanges bool) error {
|
||||
if albumHadChanges {
|
||||
timeoutDelay := 2000
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: t.albumKey,
|
||||
Type: models.NotificationTypeMessage,
|
||||
Positive: true,
|
||||
Header: fmt.Sprintf("Done processing media for album '%s'", ctx.GetAlbum().Title),
|
||||
Content: "All media have been processed",
|
||||
Timeout: &timeoutDelay,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
package scanner_tasks
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
|
||||
"github.com/photoview/photoview/api/graphql/models"
|
||||
"github.com/photoview/photoview/api/scanner/media_encoding"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_task"
|
||||
)
|
||||
|
||||
var allTasks []scanner_task.ScannerTask = []scanner_task.ScannerTask{
|
||||
NotificationTask{},
|
||||
IgnorefileTask{},
|
||||
}
|
||||
|
||||
type scannerTasks struct {
|
||||
scanner_task.ScannerTaskBase
|
||||
}
|
||||
|
||||
var Tasks scannerTasks = scannerTasks{}
|
||||
|
||||
type scannerTasksKey string
|
||||
|
||||
const (
|
||||
tasksSubContextsGlobal scannerTasksKey = "tasks_sub_contexts_global"
|
||||
tasksSubContextsProcessing scannerTasksKey = "tasks_sub_contexts_processing"
|
||||
)
|
||||
|
||||
func getSubContextsGlobal(ctx scanner_task.TaskContext) []scanner_task.TaskContext {
|
||||
return ctx.Value(tasksSubContextsGlobal).([]scanner_task.TaskContext)
|
||||
}
|
||||
|
||||
func getSubContextsProcessing(ctx scanner_task.TaskContext) []scanner_task.TaskContext {
|
||||
return ctx.Value(tasksSubContextsGlobal).([]scanner_task.TaskContext)
|
||||
}
|
||||
|
||||
func (t scannerTasks) BeforeScanAlbum(ctx scanner_task.TaskContext) (scanner_task.TaskContext, error) {
|
||||
subContexts := make([]scanner_task.TaskContext, len(allTasks))
|
||||
|
||||
for i, task := range allTasks {
|
||||
var err error
|
||||
subContexts[i], err = task.BeforeScanAlbum(ctx)
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
}
|
||||
|
||||
return ctx.WithValue(tasksSubContextsGlobal, subContexts), nil
|
||||
}
|
||||
|
||||
func (t scannerTasks) MediaFound(ctx scanner_task.TaskContext, fileInfo fs.FileInfo, mediaPath string) (bool, error) {
|
||||
|
||||
subContexts := getSubContextsGlobal(ctx)
|
||||
|
||||
for i, task := range allTasks {
|
||||
skip, err := task.MediaFound(subContexts[i], fileInfo, mediaPath)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if skip {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (t scannerTasks) AfterScanAlbum(ctx scanner_task.TaskContext, albumHadChanges bool) error {
|
||||
subContexts := getSubContextsGlobal(ctx)
|
||||
for i, task := range allTasks {
|
||||
err := task.AfterScanAlbum(subContexts[i], albumHadChanges)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t scannerTasks) AfterMediaFound(ctx scanner_task.TaskContext, media *models.Media, newMedia bool) error {
|
||||
subContexts := getSubContextsGlobal(ctx)
|
||||
for i, task := range allTasks {
|
||||
err := task.AfterMediaFound(subContexts[i], media, newMedia)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t scannerTasks) BeforeProcessMedia(ctx scanner_task.TaskContext, media *models.Media) (scanner_task.TaskContext, error) {
|
||||
subContexts := make([]scanner_task.TaskContext, len(allTasks))
|
||||
|
||||
for i, task := range allTasks {
|
||||
var err error
|
||||
subContexts[i], err = task.BeforeProcessMedia(ctx, media)
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
}
|
||||
|
||||
return ctx.WithValue(tasksSubContextsProcessing, subContexts), nil
|
||||
}
|
||||
|
||||
func (t scannerTasks) ProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData, mediaCachePath string) (bool, error) {
|
||||
subContexts := getSubContextsProcessing(ctx)
|
||||
didProcess := false
|
||||
for i, task := range allTasks {
|
||||
singleDidProcess, err := task.ProcessMedia(subContexts[i], mediaData, mediaCachePath)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if singleDidProcess {
|
||||
didProcess = true
|
||||
}
|
||||
}
|
||||
return didProcess, nil
|
||||
}
|
||||
|
||||
func (t scannerTasks) AfterProcessMedia(ctx scanner_task.TaskContext, media *models.Media, didProcess bool, mediaIndex int, mediaTotal int) error {
|
||||
subContexts := getSubContextsProcessing(ctx)
|
||||
for i, task := range allTasks {
|
||||
err := task.AfterProcessMedia(subContexts[i], media, didProcess, mediaIndex, mediaTotal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -42,7 +42,7 @@ func getPhotoviewIgnore(ignorePath string) ([]string, error) {
|
|||
return photoviewIgnore, scanner.Err()
|
||||
}
|
||||
|
||||
func findAlbumsForUser(db *gorm.DB, user *models.User, album_cache *scanner_cache.AlbumScannerCache) ([]*models.Album, []error) {
|
||||
func FindAlbumsForUser(db *gorm.DB, user *models.User, album_cache *scanner_cache.AlbumScannerCache) ([]*models.Album, []error) {
|
||||
|
||||
if err := user.FillAlbums(db); err != nil {
|
||||
return nil, []error{err}
|
||||
|
|
|
@ -15,10 +15,11 @@ import (
|
|||
"github.com/photoview/photoview/api/graphql/auth"
|
||||
graphql_endpoint "github.com/photoview/photoview/api/graphql/endpoint"
|
||||
"github.com/photoview/photoview/api/routes"
|
||||
"github.com/photoview/photoview/api/scanner"
|
||||
"github.com/photoview/photoview/api/scanner/exif"
|
||||
"github.com/photoview/photoview/api/scanner/face_detection"
|
||||
"github.com/photoview/photoview/api/scanner/media_encoding/executable_worker"
|
||||
"github.com/photoview/photoview/api/scanner/periodic_scanner"
|
||||
"github.com/photoview/photoview/api/scanner/scanner_queue"
|
||||
"github.com/photoview/photoview/api/server"
|
||||
"github.com/photoview/photoview/api/utils"
|
||||
|
||||
|
@ -45,11 +46,11 @@ func main() {
|
|||
log.Panicf("Could not migrate database: %s\n", err)
|
||||
}
|
||||
|
||||
if err := scanner.InitializeScannerQueue(db); err != nil {
|
||||
if err := scanner_queue.InitializeScannerQueue(db); err != nil {
|
||||
log.Panicf("Could not initialize scanner queue: %s\n", err)
|
||||
}
|
||||
|
||||
if err := scanner.InitializePeriodicScanner(db); err != nil {
|
||||
if err := periodic_scanner.InitializePeriodicScanner(db); err != nil {
|
||||
log.Panicf("Could not initialize periodic scanner: %s", err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue