1
Fork 0
photoview/api/scanner/queue.go

129 lines
2.8 KiB
Go
Raw Normal View History

package scanner
import (
"database/sql"
2020-06-22 23:52:41 +02:00
"log"
"sync"
"github.com/viktorstrate/photoview/api/graphql/models"
)
type ScannerJob struct {
2020-06-22 23:52:41 +02:00
album *models.Album
cache *AlbumScannerCache
}
2020-06-22 23:52:41 +02:00
func (job *ScannerJob) Run(db *sql.DB) {
scanAlbum(job.album, job.cache, db)
}
2020-06-22 23:52:41 +02:00
type ScannerQueueSettings struct {
max_concurrent_tasks int
}
type ScannerQueue struct {
mutex sync.Mutex
idle_chan chan bool
in_progress []ScannerJob
up_next []ScannerJob
db *sql.DB
2020-06-22 23:52:41 +02:00
settings ScannerQueueSettings
}
var global_scanner_queue ScannerQueue
func InitializeScannerQueue(db *sql.DB) {
global_scanner_queue = ScannerQueue{
idle_chan: make(chan bool, 1),
in_progress: make([]ScannerJob, 0),
up_next: make([]ScannerJob, 0),
db: db,
2020-06-22 23:52:41 +02:00
settings: ScannerQueueSettings{max_concurrent_tasks: 3},
}
2020-06-22 23:52:41 +02:00
go global_scanner_queue.startBackgroundWorker()
}
func (queue *ScannerQueue) startBackgroundWorker() {
for {
<-queue.idle_chan
queue.mutex.Lock()
defer queue.mutex.Unlock()
2020-06-22 23:52:41 +02:00
for len(queue.in_progress) < queue.settings.max_concurrent_tasks && len(queue.up_next) > 0 {
nextJob := queue.up_next[0]
queue.up_next = queue.up_next[1:]
queue.in_progress = append(queue.in_progress, nextJob)
go func() {
nextJob.Run(queue.db)
queue.mutex.Lock()
defer queue.mutex.Unlock()
// Delete finished job from queue
for i, x := range queue.in_progress {
if x == nextJob {
queue.in_progress[i] = queue.in_progress[len(queue.in_progress)-1]
queue.in_progress = queue.in_progress[0 : len(queue.in_progress)-1]
break
}
}
2020-06-22 23:52:41 +02:00
queue.Notify()
}()
}
}
}
2020-06-22 23:52:41 +02:00
// Notifies the queue that the jobs has changed
func (queue *ScannerQueue) Notify() bool {
select {
case queue.idle_chan <- true:
return true
default:
return false
}
}
2020-06-22 23:52:41 +02:00
func (queue *ScannerQueue) ScanUser(user *models.User) {
album_cache := MakeAlbumCache()
albums, album_errors := findAlbumsForUser(queue.db, user, album_cache)
for _, err := range album_errors {
log.Printf("User scanner error: %s", err)
}
2020-06-22 23:52:41 +02:00
queue.mutex.Lock()
for _, album := range albums {
queue.addJob(&ScannerJob{
album: album,
cache: album_cache,
})
}
queue.mutex.Unlock()
}
2020-06-22 23:52:41 +02:00
// Queue should be locked prior to calling this function
func (queue *ScannerQueue) addJob(job *ScannerJob) error {
if exists, err := queue.jobOnQueue(job); exists || err != nil {
return err
}
queue.up_next = append(queue.up_next, *job)
queue.Notify()
2020-06-22 23:52:41 +02:00
return nil
}
2020-06-22 23:52:41 +02:00
// Queue should be locked prior to calling this function
func (queue *ScannerQueue) jobOnQueue(job *ScannerJob) (bool, error) {
2020-06-22 23:52:41 +02:00
scannerJobs := append(queue.in_progress, queue.up_next...)
2020-06-22 23:52:41 +02:00
for _, scannerJob := range scannerJobs {
if scannerJob.album.AlbumID == job.album.AlbumID {
return true, nil
}
}
return false, nil
}