Add yt-downloader
This commit is contained in:
@@ -9,12 +9,11 @@ type Download struct {
|
||||
ID string `json:"id"`
|
||||
Status string `json:"status"`
|
||||
Link string `json:"link"`
|
||||
Data string `json:"data"`
|
||||
}
|
||||
|
||||
func NewDownload(link string, data string) func(uuidGen uuid.Gen) (*Download, error) {
|
||||
func NewDownload(link string) func(uuidGen uuid.Gen) (*Download, error) {
|
||||
return func(uuidGen uuid.Gen) (*Download, error) {
|
||||
if link == "" || data == "" {
|
||||
if link == "" {
|
||||
return nil, errors.New("A field was not valid")
|
||||
}
|
||||
|
||||
@@ -22,7 +21,6 @@ func NewDownload(link string, data string) func(uuidGen uuid.Gen) (*Download, er
|
||||
ID: uuidGen.Create(),
|
||||
Status: "scheduled",
|
||||
Link: link,
|
||||
Data: data,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
@@ -1,110 +0,0 @@
|
||||
package download_request
|
||||
|
||||
import (
|
||||
"downloader/internal/core/entities"
|
||||
"downloader/pkg/common/uuid"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Repository
|
||||
type inMemoryRepository struct {
|
||||
collection map[string]*entities.Download
|
||||
}
|
||||
|
||||
func NewInMemoryRepository() Repository {
|
||||
return &inMemoryRepository{collection: make(map[string]*entities.Download)}
|
||||
}
|
||||
|
||||
func (i *inMemoryRepository) Create(download *entities.Download) (*entities.Download, error) {
|
||||
if doc := i.collection[download.ID]; doc != nil {
|
||||
return nil, errors.New("download request already exists")
|
||||
}
|
||||
|
||||
i.collection[download.ID] = download
|
||||
|
||||
return download, nil
|
||||
}
|
||||
|
||||
func (i *inMemoryRepository) Update(download *entities.Download) error {
|
||||
|
||||
if doc := i.collection[download.ID]; doc == nil {
|
||||
return errors.New("download request doesn't exist exists")
|
||||
}
|
||||
|
||||
i.collection[download.ID] = download
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *inMemoryRepository) GetById(id string) (*entities.Download, error) {
|
||||
if download := i.collection[id]; download != nil {
|
||||
return download, nil
|
||||
} else {
|
||||
return nil, errors.New("download was not found in the database")
|
||||
}
|
||||
}
|
||||
|
||||
// Service
|
||||
type localService struct {
|
||||
repository Repository
|
||||
uuidGen uuid.Gen
|
||||
BackgroundService BackgroundService
|
||||
}
|
||||
|
||||
func NewLocalService(repository Repository, uuidGen uuid.Gen, backgroundService BackgroundService) Service {
|
||||
return &localService{
|
||||
repository: repository,
|
||||
uuidGen: uuidGen,
|
||||
BackgroundService: backgroundService,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *localService) Schedule(provider string, link string) (*entities.Download, error) {
|
||||
download, err := entities.NewDownload(link, provider)(l.uuidGen)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
persistedDownloadRequest, uploadErr := l.repository.Create(download)
|
||||
if uploadErr != nil {
|
||||
return nil, uploadErr
|
||||
}
|
||||
|
||||
err = l.BackgroundService.Run(persistedDownloadRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return persistedDownloadRequest, nil
|
||||
}
|
||||
|
||||
func (l *localService) Get(id string) (*entities.Download, error) {
|
||||
return l.repository.GetById(id)
|
||||
}
|
||||
|
||||
// Background service
|
||||
type localBackgroundService struct {
|
||||
repository Repository
|
||||
}
|
||||
|
||||
func NewLocalBackgroundService(repository Repository) BackgroundService {
|
||||
return &localBackgroundService{repository: repository}
|
||||
}
|
||||
|
||||
func (l localBackgroundService) Run(download *entities.Download) error {
|
||||
go func() {
|
||||
time.Sleep(time.Second * 5)
|
||||
download.Status = "done"
|
||||
err := l.repository.Update(download)
|
||||
if err != nil {
|
||||
fmt.Printf("Download request: %s failed\n", download.ID)
|
||||
panic(err)
|
||||
} else {
|
||||
fmt.Printf("Download request: %s done\n", download.ID)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
@@ -0,0 +1,56 @@
|
||||
package in_memory
|
||||
|
||||
import (
|
||||
"downloader/internal/core/entities"
|
||||
"downloader/internal/core/ports/download_request"
|
||||
"errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type inMemoryRepository struct {
|
||||
collection map[string]*entities.Download
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func NewInMemoryRepository(logger *zap.SugaredLogger) download_request.Repository {
|
||||
return &inMemoryRepository{collection: make(map[string]*entities.Download), logger: logger}
|
||||
}
|
||||
|
||||
func (i *inMemoryRepository) Create(download *entities.Download) (*entities.Download, error) {
|
||||
logger := i.logger.With("downloadId", download.ID)
|
||||
|
||||
if doc := i.collection[download.ID]; doc != nil {
|
||||
logger.Warn("create: download already exists")
|
||||
return nil, errors.New("download request already exists")
|
||||
}
|
||||
|
||||
i.collection[download.ID] = download
|
||||
logger.Info("added download to database")
|
||||
|
||||
return download, nil
|
||||
}
|
||||
|
||||
func (i *inMemoryRepository) Update(download *entities.Download) error {
|
||||
logger := i.logger.With("downloadId", download.ID)
|
||||
|
||||
if doc := i.collection[download.ID]; doc == nil {
|
||||
logger.Warn("update: download doesnt exist")
|
||||
return errors.New("download request doesn't exist exists")
|
||||
}
|
||||
|
||||
i.collection[download.ID] = download
|
||||
logger.Info("updated download to database")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *inMemoryRepository) GetById(id string) (*entities.Download, error) {
|
||||
logger := i.logger.With("downloadId", id)
|
||||
|
||||
if download := i.collection[id]; download != nil {
|
||||
return download, nil
|
||||
} else {
|
||||
logger.Warn("download was not found in the database")
|
||||
return nil, errors.New("download was not found in the database")
|
||||
}
|
||||
}
|
@@ -2,17 +2,8 @@ package download_request
|
||||
|
||||
import "downloader/internal/core/entities"
|
||||
|
||||
type Service interface {
|
||||
Schedule(provider string, link string) (*entities.Download, error)
|
||||
Get(id string) (*entities.Download, error)
|
||||
}
|
||||
|
||||
type Repository interface {
|
||||
Create(download *entities.Download) (*entities.Download, error)
|
||||
GetById(id string) (*entities.Download, error)
|
||||
Update(download *entities.Download) error
|
||||
}
|
||||
|
||||
type BackgroundService interface {
|
||||
Run(download *entities.Download) error
|
||||
}
|
5
api/internal/core/ports/downloader/downloader.go
Normal file
5
api/internal/core/ports/downloader/downloader.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package downloader
|
||||
|
||||
type Downloader interface {
|
||||
Download(link string, updateEvent func(progress string)) error
|
||||
}
|
130
api/internal/core/ports/downloader/yt_downloader/yt.go
Normal file
130
api/internal/core/ports/downloader/yt_downloader/yt.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package yt_downloader
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"downloader/internal/core/ports/downloader"
|
||||
"downloader/pkg/files"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type YtDownloader struct {
|
||||
outputDirectory string
|
||||
tempDirectory string
|
||||
checkFrequencyMs time.Duration
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func New(logger *zap.SugaredLogger) downloader.Downloader {
|
||||
return &YtDownloader{
|
||||
outputDirectory: "/home/hermansen/Downloads/yt",
|
||||
tempDirectory: "/tmp/downloader",
|
||||
checkFrequencyMs: 5000,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
_, err := exec.Command("youtube-dl", "--version").Output()
|
||||
if err != nil {
|
||||
log.Fatal("Youtube download (youtube-dl) isn't installed on the device")
|
||||
}
|
||||
}
|
||||
|
||||
func (y *YtDownloader) Download(link string, updateEvent func(progress string)) error {
|
||||
baseDir := fmt.Sprintf("%s/%x",
|
||||
y.tempDirectory,
|
||||
sha256.Sum256([]byte(link)))
|
||||
err := os.MkdirAll(baseDir, os.ModePerm)
|
||||
err = os.MkdirAll(y.outputDirectory, os.ModePerm)
|
||||
if err != nil {
|
||||
y.logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
filePath := fmt.Sprintf("%s/%s", baseDir, "%(title)s-%(id)s.%(ext)s")
|
||||
|
||||
command := exec.Command("youtube-dl",
|
||||
"-R 3",
|
||||
"-o",
|
||||
filePath,
|
||||
link,
|
||||
)
|
||||
|
||||
var stdout io.ReadCloser
|
||||
stdout, err = command.StdoutPipe()
|
||||
|
||||
go func() {
|
||||
for true {
|
||||
bytes := make([]byte, 1024)
|
||||
_, err = stdout.Read(bytes)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
output := string(bytes)
|
||||
|
||||
compile := regexp.MustCompile(`[a-z\[\]\s]+([\d\.]+).[a-z\s]+([\d\.]+)([a-zA-Z]+)[a-z\s]+`)
|
||||
if err != nil {
|
||||
y.logger.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
res := compile.FindAllStringSubmatch(output, -1)
|
||||
if len(res) != 0 && strings.Contains(res[0][0], "download") && len(res[0]) >= 2 {
|
||||
progress := res[0][1]
|
||||
|
||||
y.logger.Debugw("progress",
|
||||
"percentage", progress,
|
||||
"link", link)
|
||||
updateEvent(progress)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * y.checkFrequencyMs)
|
||||
}
|
||||
}()
|
||||
|
||||
err = command.Start()
|
||||
|
||||
if err != nil {
|
||||
y.logger.Warn(err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = command.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var dir []fs.DirEntry
|
||||
dir, err = os.ReadDir(baseDir)
|
||||
if err != nil {
|
||||
y.logger.Error("Could not read directory")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fileInfo := range dir {
|
||||
oldPath := fmt.Sprintf("%s/%s", baseDir, fileInfo.Name())
|
||||
newPath := fmt.Sprintf("%s/%s", y.outputDirectory, fileInfo.Name())
|
||||
|
||||
if err := files.MoveFile(oldPath, newPath); err != nil {
|
||||
return err
|
||||
} else {
|
||||
y.logger.Infow("moved file",
|
||||
"fileName", fileInfo.Name())
|
||||
if err := os.Remove(baseDir); err != nil {
|
||||
y.logger.Warn("could not cleanup",
|
||||
"path", oldPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
7
api/internal/core/services/download/background.go
Normal file
7
api/internal/core/services/download/background.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package download
|
||||
|
||||
import "downloader/internal/core/entities"
|
||||
|
||||
type BackgroundService interface {
|
||||
Run(download *entities.Download) error
|
||||
}
|
55
api/internal/core/services/download/default/background.go
Normal file
55
api/internal/core/services/download/default/background.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package _default
|
||||
|
||||
import (
|
||||
"downloader/internal/core/entities"
|
||||
"downloader/internal/core/ports/download_request"
|
||||
"downloader/internal/core/ports/downloader"
|
||||
"downloader/internal/core/services/download"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type localBackgroundService struct {
|
||||
repository download_request.Repository
|
||||
logger *zap.SugaredLogger
|
||||
downloader downloader.Downloader
|
||||
}
|
||||
|
||||
func NewLocalBackgroundService(repository download_request.Repository, logger *zap.SugaredLogger, downloader downloader.Downloader) download.BackgroundService {
|
||||
return &localBackgroundService{repository: repository, logger: logger, downloader: downloader}
|
||||
}
|
||||
|
||||
func (l localBackgroundService) Run(download *entities.Download) error {
|
||||
logger := l.logger.With("downloadId", download.ID)
|
||||
|
||||
go func() {
|
||||
download.Status = "started"
|
||||
_ = l.repository.Update(download)
|
||||
|
||||
err := l.downloader.Download(download.Link, func(progress string) {
|
||||
download.Status = fmt.Sprintf("in-progress: %s", progress)
|
||||
_ = l.repository.Update(download)
|
||||
})
|
||||
download.Status = "done"
|
||||
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
download.Status = "failed"
|
||||
}
|
||||
|
||||
err = l.repository.Update(download)
|
||||
if err != nil {
|
||||
logger.Errorw("download request failed",
|
||||
"downloadLink", download.Link)
|
||||
download.Status = "failed"
|
||||
|
||||
if updateErr := l.repository.Update(download); updateErr != nil {
|
||||
panic(updateErr)
|
||||
}
|
||||
} else {
|
||||
logger.Info("download request done")
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
53
api/internal/core/services/download/default/service.go
Normal file
53
api/internal/core/services/download/default/service.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package _default
|
||||
|
||||
import (
|
||||
"downloader/internal/core/entities"
|
||||
"downloader/internal/core/ports/download_request"
|
||||
"downloader/internal/core/services/download"
|
||||
"downloader/pkg/common/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type localService struct {
|
||||
repository download_request.Repository
|
||||
uuidGen uuid.Gen
|
||||
BackgroundService download.BackgroundService
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func NewLocalService(repository download_request.Repository, uuidGen uuid.Gen, backgroundService download.BackgroundService, logger *zap.SugaredLogger) download.Service {
|
||||
return &localService{
|
||||
repository: repository,
|
||||
uuidGen: uuidGen,
|
||||
BackgroundService: backgroundService,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *localService) Schedule(link string) (*entities.Download, error) {
|
||||
download, err := entities.NewDownload(link)(l.uuidGen)
|
||||
if err != nil {
|
||||
l.logger.Warn("Could not parse download")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger := l.logger.With("downloadId", download.ID)
|
||||
|
||||
persistedDownloadRequest, uploadErr := l.repository.Create(download)
|
||||
if uploadErr != nil {
|
||||
logger.Error("failed to insert download request")
|
||||
return nil, uploadErr
|
||||
}
|
||||
|
||||
err = l.BackgroundService.Run(persistedDownloadRequest)
|
||||
if err != nil {
|
||||
logger.Error("failed to run download request")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return persistedDownloadRequest, nil
|
||||
}
|
||||
|
||||
func (l *localService) Get(id string) (*entities.Download, error) {
|
||||
return l.repository.GetById(id)
|
||||
}
|
8
api/internal/core/services/download/service.go
Normal file
8
api/internal/core/services/download/service.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package download
|
||||
|
||||
import "downloader/internal/core/entities"
|
||||
|
||||
type Service interface {
|
||||
Schedule(link string) (*entities.Download, error)
|
||||
Get(id string) (*entities.Download, error)
|
||||
}
|
Reference in New Issue
Block a user