1
Fork 0

Make scanner tasks share ctx

This commit is contained in:
viktorstrate 2022-07-07 22:00:05 +02:00
parent f512db6c32
commit a5d152f0c0
No known key found for this signature in database
GPG Key ID: 3F855605109C1E8A
3 changed files with 25 additions and 54 deletions

View File

@ -270,7 +270,6 @@ func GetExtensionMediaType(ext string) (MediaType, bool) {
} }
func GetMediaType(path string) (*MediaType, error) { func GetMediaType(path string) (*MediaType, error) {
ext := filepath.Ext(path) ext := filepath.Ext(path)
fileExtType, found := GetExtensionMediaType(ext) fileExtType, found := GetExtensionMediaType(ext)

View File

@ -159,7 +159,6 @@ func findMediaForAlbum(ctx scanner_task.TaskContext) ([]*models.Media, error) {
} }
if !item.IsDir() && !isDirSymlink && ctx.GetCache().IsPathMedia(mediaPath) { if !item.IsDir() && !isDirSymlink && ctx.GetCache().IsPathMedia(mediaPath) {
skip, err := scanner_tasks.Tasks.MediaFound(ctx, item, mediaPath) skip, err := scanner_tasks.Tasks.MediaFound(ctx, item, mediaPath)
if err != nil { if err != nil {
return nil, err return nil, err
@ -169,7 +168,6 @@ func findMediaForAlbum(ctx scanner_task.TaskContext) ([]*models.Media, error) {
} }
err = ctx.DatabaseTransaction(func(ctx scanner_task.TaskContext) error { err = ctx.DatabaseTransaction(func(ctx scanner_task.TaskContext) error {
media, isNewMedia, err := ScanMedia(ctx.GetDB(), mediaPath, ctx.GetAlbum().ID, ctx.GetCache()) media, isNewMedia, err := ScanMedia(ctx.GetDB(), mediaPath, ctx.GetAlbum().ID, ctx.GetCache())
if err != nil { if err != nil {
return errors.Wrapf(err, "scanning media error (%s)", mediaPath) return errors.Wrapf(err, "scanning media error (%s)", mediaPath)
@ -188,8 +186,8 @@ func findMediaForAlbum(ctx scanner_task.TaskContext) ([]*models.Media, error) {
scanner_utils.ScannerError("Error scanning media for album (%d): %s\n", ctx.GetAlbum().ID, err) scanner_utils.ScannerError("Error scanning media for album (%d): %s\n", ctx.GetAlbum().ID, err)
continue continue
} }
} }
} }
return albumMedia, nil return albumMedia, nil

View File

@ -29,31 +29,15 @@ type scannerTasks struct {
var Tasks scannerTasks = scannerTasks{} var Tasks scannerTasks = scannerTasks{}
type scannerTasksKey string func simpleCombinedTasks(ctx scanner_task.TaskContext, doTask func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error) error {
for _, task := range allTasks {
const (
tasksSubContextsGlobal scannerTasksKey = "tasks_sub_contexts_global"
tasksSubContextsProcessing scannerTasksKey = "tasks_sub_contexts_processing"
)
func getSubContextsGlobal(ctx scanner_task.TaskContext) []scanner_task.TaskContext {
return ctx.Value(tasksSubContextsGlobal).([]scanner_task.TaskContext)
}
func getSubContextsProcessing(ctx scanner_task.TaskContext) []scanner_task.TaskContext {
return ctx.Value(tasksSubContextsGlobal).([]scanner_task.TaskContext)
}
func simpleCombinedTasks(subContexts []scanner_task.TaskContext, doTask func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error) error {
for i, task := range allTasks {
subCtx := subContexts[i]
select { select {
case <-subCtx.Done(): case <-ctx.Done():
continue return ctx.Err()
default: default:
} }
err := doTask(subCtx, task) err := doTask(ctx, task)
if err != nil { if err != nil {
return err return err
} }
@ -62,11 +46,9 @@ func simpleCombinedTasks(subContexts []scanner_task.TaskContext, doTask func(ctx
} }
func (t scannerTasks) BeforeScanAlbum(ctx scanner_task.TaskContext) (scanner_task.TaskContext, error) { func (t scannerTasks) BeforeScanAlbum(ctx scanner_task.TaskContext) (scanner_task.TaskContext, error) {
subContexts := make([]scanner_task.TaskContext, len(allTasks)) for _, task := range allTasks {
for i, task := range allTasks {
var err error var err error
subContexts[i], err = task.BeforeScanAlbum(ctx) ctx, err = task.BeforeScanAlbum(ctx)
if err != nil { if err != nil {
return ctx, err return ctx, err
} }
@ -78,22 +60,18 @@ func (t scannerTasks) BeforeScanAlbum(ctx scanner_task.TaskContext) (scanner_tas
} }
} }
return ctx.WithValue(tasksSubContextsGlobal, subContexts), nil return ctx, nil
} }
func (t scannerTasks) MediaFound(ctx scanner_task.TaskContext, fileInfo fs.FileInfo, mediaPath string) (bool, error) { func (t scannerTasks) MediaFound(ctx scanner_task.TaskContext, fileInfo fs.FileInfo, mediaPath string) (bool, error) {
for _, task := range allTasks {
subContexts := getSubContextsGlobal(ctx)
for i, task := range allTasks {
subCtx := subContexts[i]
select { select {
case <-subCtx.Done(): case <-ctx.Done():
continue return false, ctx.Err()
default: default:
} }
skip, err := task.MediaFound(subCtx, fileInfo, mediaPath) skip, err := task.MediaFound(ctx, fileInfo, mediaPath)
if err != nil { if err != nil {
return false, err return false, err
@ -108,50 +86,46 @@ func (t scannerTasks) MediaFound(ctx scanner_task.TaskContext, fileInfo fs.FileI
} }
func (t scannerTasks) AfterScanAlbum(ctx scanner_task.TaskContext, changedMedia []*models.Media, albumMedia []*models.Media) error { func (t scannerTasks) AfterScanAlbum(ctx scanner_task.TaskContext, changedMedia []*models.Media, albumMedia []*models.Media) error {
return simpleCombinedTasks(getSubContextsGlobal(ctx), func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error { return simpleCombinedTasks(ctx, func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error {
return task.AfterScanAlbum(ctx, changedMedia, albumMedia) return task.AfterScanAlbum(ctx, changedMedia, albumMedia)
}) })
} }
func (t scannerTasks) AfterMediaFound(ctx scanner_task.TaskContext, media *models.Media, newMedia bool) error { func (t scannerTasks) AfterMediaFound(ctx scanner_task.TaskContext, media *models.Media, newMedia bool) error {
return simpleCombinedTasks(getSubContextsGlobal(ctx), func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error { return simpleCombinedTasks(ctx, func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error {
return task.AfterMediaFound(ctx, media, newMedia) return task.AfterMediaFound(ctx, media, newMedia)
}) })
} }
func (t scannerTasks) BeforeProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData) (scanner_task.TaskContext, error) { func (t scannerTasks) BeforeProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData) (scanner_task.TaskContext, error) {
globalSubContexts := getSubContextsGlobal(ctx) for _, task := range allTasks {
processSubContexts := make([]scanner_task.TaskContext, len(allTasks))
for i, task := range allTasks {
select { select {
case <-globalSubContexts[i].Done(): case <-ctx.Done():
continue return ctx, ctx.Err()
default: default:
} }
var err error var err error
processSubContexts[i], err = task.BeforeProcessMedia(ctx, mediaData) ctx, err = task.BeforeProcessMedia(ctx, mediaData)
if err != nil { if err != nil {
return ctx, err return ctx, err
} }
} }
return ctx.WithValue(tasksSubContextsProcessing, processSubContexts), nil return ctx, nil
} }
func (t scannerTasks) ProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData, mediaCachePath string) ([]*models.MediaURL, error) { func (t scannerTasks) ProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData, mediaCachePath string) ([]*models.MediaURL, error) {
subContexts := getSubContextsProcessing(ctx)
allNewMedia := make([]*models.MediaURL, 0) allNewMedia := make([]*models.MediaURL, 0)
for i, task := range allTasks { for _, task := range allTasks {
select { select {
case <-subContexts[i].Done(): case <-ctx.Done():
continue return nil, ctx.Err()
default: default:
} }
newMedia, err := task.ProcessMedia(subContexts[i], mediaData, mediaCachePath) newMedia, err := task.ProcessMedia(ctx, mediaData, mediaCachePath)
if err != nil { if err != nil {
return []*models.MediaURL{}, err return []*models.MediaURL{}, err
} }
@ -163,7 +137,7 @@ func (t scannerTasks) ProcessMedia(ctx scanner_task.TaskContext, mediaData *medi
} }
func (t scannerTasks) AfterProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData, updatedURLs []*models.MediaURL, mediaIndex int, mediaTotal int) error { func (t scannerTasks) AfterProcessMedia(ctx scanner_task.TaskContext, mediaData *media_encoding.EncodeMediaData, updatedURLs []*models.MediaURL, mediaIndex int, mediaTotal int) error {
return simpleCombinedTasks(getSubContextsProcessing(ctx), func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error { return simpleCombinedTasks(ctx, func(ctx scanner_task.TaskContext, task scanner_task.ScannerTask) error {
return task.AfterProcessMedia(ctx, mediaData, updatedURLs, mediaIndex, mediaTotal) return task.AfterProcessMedia(ctx, mediaData, updatedURLs, mediaIndex, mediaTotal)
}) })
} }