Refactor scanner
- Move scanner cache to its own file - Move album scanner related code to its own file
This commit is contained in:
parent
2f4212eeab
commit
666ecef4b3
|
@ -0,0 +1,59 @@
|
|||
package scanner
|
||||
|
||||
import "path"
|
||||
|
||||
type ScannerCache struct {
|
||||
cache map[string]interface{}
|
||||
photo_paths_scanned []interface{}
|
||||
album_paths_scanned []interface{}
|
||||
}
|
||||
|
||||
func MakeScannerCache() ScannerCache {
|
||||
return ScannerCache{
|
||||
cache: make(map[string]interface{}),
|
||||
photo_paths_scanned: make([]interface{}, 0),
|
||||
album_paths_scanned: make([]interface{}, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ScannerCache) insert_photo_type(path string, content_type ImageType) {
|
||||
(c.cache)["photo_type//"+path] = content_type
|
||||
}
|
||||
|
||||
func (c *ScannerCache) get_photo_type(path string) *string {
|
||||
result, found := (c.cache)["photo_type//"+path].(string)
|
||||
if found {
|
||||
// log.Printf("Image cache hit: %s\n", path)
|
||||
return &result
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Insert single album directory in cache
|
||||
func (c *ScannerCache) insert_album_path(path string, contains_photo bool) {
|
||||
(c.cache)["album_path//"+path] = contains_photo
|
||||
}
|
||||
|
||||
// Insert album path and all parent directories up to the given root directory in cache
|
||||
func (c *ScannerCache) insert_album_paths(end_path string, root string, contains_photo bool) {
|
||||
curr_path := path.Clean(end_path)
|
||||
root_path := path.Clean(root)
|
||||
|
||||
for curr_path != root_path || curr_path == "." {
|
||||
|
||||
c.insert_album_path(curr_path, contains_photo)
|
||||
|
||||
curr_path = path.Dir(curr_path)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ScannerCache) album_contains_photo(path string) *bool {
|
||||
contains_photo, found := (c.cache)["album_path//"+path].(bool)
|
||||
if found {
|
||||
// log.Printf("Album cache hit: %s\n", path)
|
||||
return &contains_photo
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -234,7 +234,7 @@ func getImageType(path string) (*ImageType, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func isPathImage(path string, cache *scanner_cache) bool {
|
||||
func isPathImage(path string, cache *ScannerCache) bool {
|
||||
if cache.get_photo_type(path) != nil {
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -37,6 +37,10 @@ func (job *ScannerJob) modelAsAlbum() (*models.Album, error) {
|
|||
return &album, nil
|
||||
}
|
||||
|
||||
func (job *ScannerJob) Run() {
|
||||
// TODO: Not implemented
|
||||
}
|
||||
|
||||
type ScannerQueue struct {
|
||||
mutex sync.Mutex
|
||||
idle_chan chan bool
|
||||
|
@ -56,11 +60,18 @@ func InitializeScannerQueue(db *sql.DB) {
|
|||
}
|
||||
}
|
||||
|
||||
func (job *ScannerJob) Run() {
|
||||
// TODO: Not implemented
|
||||
func (queue *ScannerQueue) startBackgroundWorker() {
|
||||
for {
|
||||
<-queue.idle_chan
|
||||
queue.mutex.Lock()
|
||||
defer queue.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (queue *ScannerQueue) AddJob(job *ScannerJob) error {
|
||||
queue.mutex.Lock()
|
||||
defer queue.mutex.Unlock()
|
||||
|
||||
if exists, err := queue.jobOnQueue(job); exists || err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -80,8 +91,6 @@ func (queue *ScannerQueue) Notify() bool {
|
|||
}
|
||||
|
||||
func (queue *ScannerQueue) jobOnQueue(job *ScannerJob) (bool, error) {
|
||||
queue.mutex.Lock()
|
||||
defer queue.mutex.Unlock()
|
||||
|
||||
scannerJobs := append(queue.in_progress, queue.up_next...)
|
||||
|
|
@ -0,0 +1,107 @@
|
|||
package scanner
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/viktorstrate/photoview/api/graphql/models"
|
||||
)
|
||||
|
||||
func TestScannerQueue_AddJob(t *testing.T) {
|
||||
|
||||
scannerJobs := []ScannerJob{
|
||||
{scope: JOB_SCAN_ALBUM, model: models.Album{AlbumID: 100, OwnerID: 123}},
|
||||
{scope: JOB_SCAN_USER, model: models.User{UserID: 20}},
|
||||
}
|
||||
|
||||
mockScannerQueue := ScannerQueue{
|
||||
idle_chan: make(chan bool, 1),
|
||||
in_progress: make([]ScannerJob, 0),
|
||||
up_next: scannerJobs,
|
||||
db: nil,
|
||||
}
|
||||
|
||||
t.Run("add new job to scanner queue", func(t *testing.T) {
|
||||
newJob := ScannerJob{
|
||||
scope: JOB_SCAN_USER,
|
||||
model: models.User{UserID: 253},
|
||||
}
|
||||
|
||||
startingJobs := len(mockScannerQueue.up_next)
|
||||
|
||||
err := mockScannerQueue.AddJob(&newJob)
|
||||
if err != nil {
|
||||
t.Errorf(".AddJob() returned an unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if len(mockScannerQueue.up_next) != startingJobs+1 {
|
||||
t.Errorf("Expected scanner queue length to be %d but got %d", startingJobs+1, len(mockScannerQueue.up_next))
|
||||
} else if mockScannerQueue.up_next[len(mockScannerQueue.up_next)-1] != newJob {
|
||||
t.Errorf("Expected scanner queue to contain the job that was added: %+v", newJob)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("add existing job to scanner queue", func(t *testing.T) {
|
||||
startingJobs := len(mockScannerQueue.up_next)
|
||||
|
||||
err := mockScannerQueue.AddJob(&ScannerJob{
|
||||
scope: JOB_SCAN_USER,
|
||||
model: models.User{UserID: 20},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf(".AddJob() returned an unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if len(mockScannerQueue.up_next) != startingJobs {
|
||||
t.Errorf("Expected scanner queue length not to change: start length %d, new length %d", startingJobs, len(mockScannerQueue.up_next))
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestScannerQueue_JobOnQueue(t *testing.T) {
|
||||
|
||||
scannerJobs := []ScannerJob{
|
||||
{scope: JOB_SCAN_ALBUM, model: models.Album{AlbumID: 100, OwnerID: 123}},
|
||||
{scope: JOB_SCAN_USER, model: models.User{UserID: 20}},
|
||||
}
|
||||
|
||||
mockScannerQueue := ScannerQueue{
|
||||
idle_chan: make(chan bool, 1),
|
||||
in_progress: make([]ScannerJob, 0),
|
||||
up_next: scannerJobs,
|
||||
db: nil,
|
||||
}
|
||||
|
||||
onQueueTests := []struct {
|
||||
string
|
||||
bool
|
||||
ScannerJob
|
||||
}{
|
||||
{"user that is already on the queue", true, ScannerJob{
|
||||
scope: JOB_SCAN_USER,
|
||||
model: models.User{UserID: 20},
|
||||
}},
|
||||
{"album which owner is already on the queue", true, ScannerJob{
|
||||
scope: JOB_SCAN_ALBUM,
|
||||
model: models.Album{AlbumID: 40, OwnerID: 20},
|
||||
}},
|
||||
{"album that is not on the queue", false, ScannerJob{
|
||||
scope: JOB_SCAN_ALBUM,
|
||||
model: models.Album{AlbumID: 321, OwnerID: 11},
|
||||
}},
|
||||
}
|
||||
|
||||
for _, test := range onQueueTests {
|
||||
t.Run(test.string, func(t *testing.T) {
|
||||
onQueue, err := mockScannerQueue.jobOnQueue(&test.ScannerJob)
|
||||
if err != nil {
|
||||
t.Error("Expected jobOnQueue not to return an error")
|
||||
} else if onQueue != test.bool {
|
||||
t.Fail()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package scanner
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
|
||||
"github.com/viktorstrate/photoview/api/graphql/models"
|
||||
)
|
||||
|
||||
func findPhotosForAlbum(album *models.Album, cache *ScannerCache, db *sql.DB, onScanPhoto func(photo *models.Photo, newPhoto bool)) ([]*models.Photo, error) {
|
||||
|
||||
newPhotos := make([]*models.Photo, 0)
|
||||
|
||||
dirContent, err := ioutil.ReadDir(album.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, item := range dirContent {
|
||||
photoPath := path.Join(album.Path, item.Name())
|
||||
|
||||
if !item.IsDir() && isPathImage(photoPath, cache) {
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
ScannerError("Could not begin database transaction for image %s: %s\n", photoPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
cache.photo_paths_scanned = append(cache.photo_paths_scanned, photoPath)
|
||||
|
||||
photo, isNewPhoto, err := ScanPhoto(tx, photoPath, album.AlbumID)
|
||||
if err != nil {
|
||||
ScannerError("Scanning image %s: %s", photoPath, err)
|
||||
tx.Rollback()
|
||||
continue
|
||||
}
|
||||
|
||||
onScanPhoto(photo, isNewPhoto)
|
||||
|
||||
if isNewPhoto {
|
||||
newPhotos = append(newPhotos, photo)
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
}
|
||||
}
|
||||
|
||||
return newPhotos, nil
|
||||
}
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/viktorstrate/photoview/api/graphql/models"
|
||||
)
|
||||
|
||||
func ScanPhoto(tx *sql.Tx, photoPath string, albumId int, notificationKey string) (*models.Photo, bool, error) {
|
||||
func ScanPhoto(tx *sql.Tx, photoPath string, albumId int) (*models.Photo, bool, error) {
|
||||
|
||||
log.Printf("Scanning image: %s\n", photoPath)
|
||||
|
|
@ -1,53 +0,0 @@
|
|||
package scanner
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/viktorstrate/photoview/api/graphql/models"
|
||||
)
|
||||
|
||||
func TestScannerQueue_JobOnQueue(t *testing.T) {
|
||||
|
||||
scannerJobs := []ScannerJob{
|
||||
{scope: JOB_SCAN_ALBUM, model: models.Album{AlbumID: 100, OwnerID: 123}},
|
||||
{scope: JOB_SCAN_USER, model: models.User{UserID: 20}},
|
||||
}
|
||||
|
||||
mockScannerQueue := ScannerQueue{
|
||||
idle_chan: make(chan bool, 1),
|
||||
in_progress: make([]ScannerJob, 0),
|
||||
up_next: scannerJobs,
|
||||
db: nil,
|
||||
}
|
||||
|
||||
onQueueTests := []struct {
|
||||
string
|
||||
bool
|
||||
ScannerJob
|
||||
}{
|
||||
{"user that is already on the queue", true, ScannerJob{
|
||||
scope: JOB_SCAN_USER,
|
||||
model: models.User{UserID: 20},
|
||||
}},
|
||||
{"album which owner is already on the queue", true, ScannerJob{
|
||||
scope: JOB_SCAN_ALBUM,
|
||||
model: models.Album{AlbumID: 40, OwnerID: 20},
|
||||
}},
|
||||
{"album that is not on the queue", false, ScannerJob{
|
||||
scope: JOB_SCAN_ALBUM,
|
||||
model: models.Album{AlbumID: 321, OwnerID: 11},
|
||||
}},
|
||||
}
|
||||
|
||||
for _, test := range onQueueTests {
|
||||
t.Run(test.string, func(t *testing.T) {
|
||||
onQueue, err := mockScannerQueue.jobOnQueue(&test.ScannerJob)
|
||||
if err != nil {
|
||||
t.Error("Expected jobOnQueue not to return an error")
|
||||
} else if onQueue != test.bool {
|
||||
t.Fail()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
|
@ -17,50 +17,6 @@ import (
|
|||
"github.com/viktorstrate/photoview/api/utils"
|
||||
)
|
||||
|
||||
type scanner_cache map[string]interface{}
|
||||
|
||||
func (cache *scanner_cache) insert_photo_type(path string, content_type ImageType) {
|
||||
(*cache)["photo_type//"+path] = content_type
|
||||
}
|
||||
|
||||
func (cache *scanner_cache) get_photo_type(path string) *string {
|
||||
result, found := (*cache)["photo_type//"+path].(string)
|
||||
if found {
|
||||
// log.Printf("Image cache hit: %s\n", path)
|
||||
return &result
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Insert single album directory in cache
|
||||
func (cache *scanner_cache) insert_album_path(path string, contains_photo bool) {
|
||||
(*cache)["album_path//"+path] = contains_photo
|
||||
}
|
||||
|
||||
// Insert album path and all parent directories up to the given root directory in cache
|
||||
func (cache *scanner_cache) insert_album_paths(end_path string, root string, contains_photo bool) {
|
||||
curr_path := path.Clean(end_path)
|
||||
root_path := path.Clean(root)
|
||||
|
||||
for curr_path != root_path || curr_path == "." {
|
||||
|
||||
cache.insert_album_path(curr_path, contains_photo)
|
||||
|
||||
curr_path = path.Dir(curr_path)
|
||||
}
|
||||
}
|
||||
|
||||
func (cache *scanner_cache) album_contains_photo(path string) *bool {
|
||||
contains_photo, found := (*cache)["album_path//"+path].(bool)
|
||||
if found {
|
||||
// log.Printf("Album cache hit: %s\n", path)
|
||||
return &contains_photo
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ScanAll(database *sql.DB) error {
|
||||
rows, err := database.Query("SELECT * FROM user")
|
||||
if err != nil {
|
||||
|
@ -123,9 +79,7 @@ func scan(database *sql.DB, user *models.User) {
|
|||
})
|
||||
|
||||
// Start scanning
|
||||
scanner_cache := make(scanner_cache)
|
||||
album_paths_scanned := make([]interface{}, 0)
|
||||
photo_paths_scanned := make([]interface{}, 0)
|
||||
cache := MakeScannerCache()
|
||||
|
||||
type scanInfo struct {
|
||||
path string
|
||||
|
@ -138,7 +92,7 @@ func scan(database *sql.DB, user *models.User) {
|
|||
parentId: nil,
|
||||
})
|
||||
|
||||
newPhotos := list.New()
|
||||
newPhotos := make([]*models.Photo, 0)
|
||||
|
||||
for scanQueue.Front() != nil {
|
||||
albumInfo := scanQueue.Front().Value.(scanInfo)
|
||||
|
@ -147,7 +101,7 @@ func scan(database *sql.DB, user *models.User) {
|
|||
albumPath := albumInfo.path
|
||||
albumParentId := albumInfo.parentId
|
||||
|
||||
album_paths_scanned = append(album_paths_scanned, albumPath)
|
||||
cache.album_paths_scanned = append(cache.album_paths_scanned, albumPath)
|
||||
|
||||
// Read path
|
||||
dirContent, err := ioutil.ReadDir(albumPath)
|
||||
|
@ -173,10 +127,10 @@ func scan(database *sql.DB, user *models.User) {
|
|||
continue
|
||||
}
|
||||
|
||||
row := tx.QueryRow("SELECT album_id FROM album WHERE path = ?", albumPath)
|
||||
var albumId int
|
||||
if err := row.Scan(&albumId); err != nil {
|
||||
ScannerError("Could not get id of album: %s\n", err)
|
||||
row := tx.QueryRow("SELECT * FROM album WHERE path = ?", albumPath)
|
||||
album, err := models.NewAlbumFromRow(row)
|
||||
if err != nil {
|
||||
ScannerError("Could not get album: %s\n", err)
|
||||
tx.Rollback()
|
||||
return
|
||||
}
|
||||
|
@ -188,41 +142,20 @@ func scan(database *sql.DB, user *models.User) {
|
|||
}
|
||||
|
||||
// Scan for photos
|
||||
for _, item := range dirContent {
|
||||
photoPath := path.Join(albumPath, item.Name())
|
||||
|
||||
if !item.IsDir() && isPathImage(photoPath, &scanner_cache) {
|
||||
tx, err := database.Begin()
|
||||
if err != nil {
|
||||
ScannerError("Could not begin database transaction for image %s: %s\n", photoPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
photo_paths_scanned = append(photo_paths_scanned, photoPath)
|
||||
|
||||
photo, isNewPhoto, err := ScanPhoto(tx, photoPath, albumId, processKey)
|
||||
if err != nil {
|
||||
ScannerError("Scanning image %s: %s", photoPath, err)
|
||||
tx.Rollback()
|
||||
continue
|
||||
}
|
||||
|
||||
if isNewPhoto {
|
||||
newPhotos.PushBack(photo)
|
||||
|
||||
notifyThrottle.Trigger(func() {
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: processKey,
|
||||
Type: models.NotificationTypeMessage,
|
||||
Header: fmt.Sprintf("Scanning photo for user '%s'", user.Username),
|
||||
Content: fmt.Sprintf("Scanning image at %s", photoPath),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
}
|
||||
newFoundPhotos, err := findPhotosForAlbum(album, &cache, database, func(photo *models.Photo, newPhoto bool) {
|
||||
notifyThrottle.Trigger(func() {
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
Key: processKey,
|
||||
Type: models.NotificationTypeMessage,
|
||||
Header: fmt.Sprintf("Scanning photo for user '%s'", user.Username),
|
||||
Content: fmt.Sprintf("Scanning image at %s", photo.Path),
|
||||
})
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
ScannerError("Failed to scan album for new photos (album_id %d)", album.AlbumID)
|
||||
}
|
||||
newPhotos = append(newPhotos, newFoundPhotos...)
|
||||
|
||||
// Scan for sub-albums
|
||||
for _, item := range dirContent {
|
||||
|
@ -233,18 +166,18 @@ func scan(database *sql.DB, user *models.User) {
|
|||
continue
|
||||
}
|
||||
|
||||
if item.IsDir() && directoryContainsPhotos(subalbumPath, &scanner_cache) {
|
||||
if item.IsDir() && directoryContainsPhotos(subalbumPath, &cache) {
|
||||
scanQueue.PushBack(scanInfo{
|
||||
path: subalbumPath,
|
||||
parentId: &albumId,
|
||||
parentId: &album.AlbumID,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
completeMessage := "No new photos were found"
|
||||
if newPhotos.Len() > 0 {
|
||||
completeMessage = fmt.Sprintf("%d new photos were found", newPhotos.Len())
|
||||
if len(newPhotos) > 0 {
|
||||
completeMessage = fmt.Sprintf("%d new photos were found", len(newPhotos))
|
||||
}
|
||||
|
||||
notification.BroadcastNotification(&models.Notification{
|
||||
|
@ -255,7 +188,7 @@ func scan(database *sql.DB, user *models.User) {
|
|||
Positive: true,
|
||||
})
|
||||
|
||||
cleanupCache(database, album_paths_scanned, photo_paths_scanned, user)
|
||||
cleanupCache(database, &cache, user)
|
||||
|
||||
err := processUnprocessedPhotos(database, user, notifyKey)
|
||||
if err != nil {
|
||||
|
@ -265,7 +198,7 @@ func scan(database *sql.DB, user *models.User) {
|
|||
log.Printf("Done scanning user '%s'\n", user.Username)
|
||||
}
|
||||
|
||||
func directoryContainsPhotos(rootPath string, cache *scanner_cache) bool {
|
||||
func directoryContainsPhotos(rootPath string, cache *ScannerCache) bool {
|
||||
|
||||
if contains_image := cache.album_contains_photo(rootPath); contains_image != nil {
|
||||
return *contains_image
|
||||
|
@ -390,17 +323,17 @@ func processUnprocessedPhotos(database *sql.DB, user *models.User, notifyKey str
|
|||
return nil
|
||||
}
|
||||
|
||||
func cleanupCache(database *sql.DB, scanned_albums []interface{}, scanned_photos []interface{}, user *models.User) {
|
||||
if len(scanned_albums) == 0 {
|
||||
func cleanupCache(database *sql.DB, cache *ScannerCache, user *models.User) {
|
||||
if len(cache.album_paths_scanned) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Delete old albums
|
||||
album_args := make([]interface{}, 0)
|
||||
album_args = append(album_args, user.UserID)
|
||||
album_args = append(album_args, scanned_albums...)
|
||||
album_args = append(album_args, cache.album_paths_scanned...)
|
||||
|
||||
albums_questions := strings.Repeat("?,", len(scanned_albums))[:len(scanned_albums)*2-1]
|
||||
albums_questions := strings.Repeat("?,", len(cache.album_paths_scanned))[:len(cache.album_paths_scanned)*2-1]
|
||||
rows, err := database.Query("SELECT album_id FROM album WHERE album.owner_id = ? AND path NOT IN ("+albums_questions+")", album_args...)
|
||||
if err != nil {
|
||||
ScannerError("Could not get albums from database: %s\n", err)
|
||||
|
@ -434,9 +367,9 @@ func cleanupCache(database *sql.DB, scanned_albums []interface{}, scanned_photos
|
|||
// Delete old photos
|
||||
photo_args := make([]interface{}, 0)
|
||||
photo_args = append(photo_args, user.UserID)
|
||||
photo_args = append(photo_args, scanned_photos...)
|
||||
photo_args = append(photo_args, cache.photo_paths_scanned...)
|
||||
|
||||
photo_questions := strings.Repeat("?,", len(scanned_photos))[:len(scanned_photos)*2-1]
|
||||
photo_questions := strings.Repeat("?,", len(cache.photo_paths_scanned))[:len(cache.photo_paths_scanned)*2-1]
|
||||
|
||||
rows, err = database.Query(`
|
||||
SELECT photo.photo_id as photo_id, album.album_id as album_id FROM photo JOIN album ON photo.album_id = album.album_id
|
Loading…
Reference in New Issue