Add db
This commit is contained in:
@@ -29,13 +29,14 @@ func (dr *requestDownloadRequest) Bind(r *http.Request) error {
|
||||
}
|
||||
|
||||
func (a *api) requestDownload(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
data := &requestDownloadRequest{}
|
||||
if err := render.Bind(r, data); err != nil {
|
||||
_ = render.Render(w, r, responses.ErrInvalidRequest(err))
|
||||
return
|
||||
}
|
||||
|
||||
download, err := a.drService.Schedule(data.Link)
|
||||
download, err := a.drService.Schedule(ctx, data.Link)
|
||||
if err != nil {
|
||||
_ = render.Render(w, r, responses.ErrInvalidRequest(err))
|
||||
return
|
||||
@@ -46,8 +47,9 @@ func (a *api) requestDownload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (a *api) getDownloads(writer http.ResponseWriter, request *http.Request) {
|
||||
ctx := request.Context()
|
||||
active := request.URL.Query().Get("active") == "true"
|
||||
downloads, err := a.drService.GetAll(active)
|
||||
downloads, err := a.drService.GetAll(ctx, active)
|
||||
if err != nil {
|
||||
_ = render.Render(writer, request, responses.ErrInvalidRequest(err))
|
||||
return
|
||||
@@ -61,9 +63,10 @@ func (a *api) getDownloads(writer http.ResponseWriter, request *http.Request) {
|
||||
}
|
||||
|
||||
func (a *api) getDownloadById(w http.ResponseWriter, r *http.Request) {
|
||||
downloadId := r.Context().Value("downloadId").(string)
|
||||
ctx := r.Context()
|
||||
downloadId := ctx.Value("downloadId").(string)
|
||||
|
||||
download, err := a.drService.Get(downloadId)
|
||||
download, err := a.drService.Get(ctx, downloadId)
|
||||
if err != nil {
|
||||
_ = render.Render(w, r, responses.ErrNotFound())
|
||||
return
|
||||
@@ -80,9 +83,14 @@ func newRequestDownloadResponse(download *entities.Download) *requestDownloadRes
|
||||
}
|
||||
|
||||
func newDownloadsResponse(downloads []*entities.Download) []render.Renderer {
|
||||
list := []render.Renderer{}
|
||||
var list []render.Renderer
|
||||
for _, download := range downloads {
|
||||
list = append(list, newRequestDownloadResponse(download))
|
||||
}
|
||||
|
||||
if len(list) == 0 {
|
||||
return []render.Renderer{}
|
||||
}
|
||||
|
||||
return list
|
||||
}
|
||||
|
29
api/internal/app/persistence/database.go
Normal file
29
api/internal/app/persistence/database.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"downloader/internal/app/persistence/migrations"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/dialect/pgdialect"
|
||||
"github.com/uptrace/bun/driver/pgdriver"
|
||||
"github.com/uptrace/bun/extra/bundebug"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
func NewPostgresDB() *bun.DB {
|
||||
dsn := "postgres://downloader:downloadersecret@localhost:5432/downloader?sslmode=disable"
|
||||
sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn)))
|
||||
|
||||
db := bun.NewDB(sqldb, pgdialect.New())
|
||||
db.AddQueryHook(bundebug.NewQueryHook(
|
||||
bundebug.WithVerbose(true),
|
||||
bundebug.FromEnv("BUNDEBUG")))
|
||||
|
||||
migrator := migrate.NewMigrator(db, migrations.Migrations)
|
||||
bgCtx := context.Background()
|
||||
migrator.Init(bgCtx)
|
||||
migrator.Migrate(bgCtx)
|
||||
|
||||
return db
|
||||
}
|
@@ -0,0 +1,27 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/ports/download_request/sql"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
|
||||
_, err := db.Exec(`CREATE EXTENSION IF NOT EXISTS "uuid-ossp";`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.RegisterModel((*sql.Download)(nil))
|
||||
_, err = db.NewCreateTable().
|
||||
Model((*sql.Download)(nil)).
|
||||
Exec(ctx)
|
||||
return err
|
||||
}, func(ctx context.Context, db *bun.DB) error {
|
||||
_, err := db.NewDropTable().
|
||||
Model((*sql.Download)(nil)).
|
||||
Exec(ctx)
|
||||
return err
|
||||
})
|
||||
}
|
17
api/internal/app/persistence/migrations/main.go
Normal file
17
api/internal/app/persistence/migrations/main.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"embed"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
var Migrations = migrate.NewMigrations()
|
||||
|
||||
// go:embed *.sql
|
||||
var sqlMigrations embed.FS
|
||||
|
||||
func init() {
|
||||
if err := Migrations.Discover(sqlMigrations); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
@@ -3,7 +3,8 @@ package router
|
||||
import (
|
||||
"downloader/internal/app/api/download"
|
||||
"downloader/internal/app/infrastructure/logger"
|
||||
"downloader/internal/core/ports/download_request/in_memory"
|
||||
"downloader/internal/app/persistence"
|
||||
"downloader/internal/core/ports/download_request/sql"
|
||||
"downloader/internal/core/ports/downloadhandler"
|
||||
"downloader/internal/core/ports/filehandler/mover/local"
|
||||
"downloader/internal/core/ports/fileorchestrator"
|
||||
@@ -64,7 +65,9 @@ func setupDownloadRoute(router *router) {
|
||||
destinationHandler := destinationhandler.New(mover)
|
||||
fileOrchestrator := fileorchestrator.New(newLogger, sourceHandler, destinationHandler)
|
||||
|
||||
drRepository := in_memory.NewInMemoryRepository(newLogger)
|
||||
db := persistence.NewPostgresDB()
|
||||
drRepository := sql.NewDownloadRequestSqlRepository(db, newLogger)
|
||||
//drRepository := in_memory.NewInMemoryRepository(newLogger)
|
||||
//dlHandler := downloadhandler.NewYoutubeDlDownloader(newLogger)
|
||||
dlHandler := downloadhandler.NewYtDlpDownloader(newLogger)
|
||||
ondlHandler := handlers.New(drRepository, newLogger)
|
||||
|
@@ -1,10 +1,13 @@
|
||||
package download_request
|
||||
|
||||
import "downloader/internal/core/entities"
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/entities"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
Create(download *entities.Download) (*entities.Download, error)
|
||||
GetById(id string) (*entities.Download, error)
|
||||
Update(download *entities.Download) error
|
||||
Get(active bool) ([]*entities.Download, error)
|
||||
Create(ctx context.Context, download *entities.Download) (*entities.Download, error)
|
||||
GetById(ctx context.Context, id string) (*entities.Download, error)
|
||||
Update(ctx context.Context, download *entities.Download) error
|
||||
Get(ctx context.Context, active bool) ([]*entities.Download, error)
|
||||
}
|
||||
|
29
api/internal/core/ports/download_request/sql/download.go
Normal file
29
api/internal/core/ports/download_request/sql/download.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/uptrace/bun"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Download struct {
|
||||
bun.BaseModel `bun:"table:downloads,alias:d"`
|
||||
ID string `bun:"id,pk,type:uuid,default:uuid_generate_v4()"`
|
||||
Status string `bun:"status,notnull"`
|
||||
Link string `bun:"link,notnull"`
|
||||
CreatedAt time.Time `bun:"created_at,nullzero,notnull,default:current_timestamp"`
|
||||
UpdatedAt time.Time `bun:"updated_at,nullzero,notnull,default:current_timestamp"`
|
||||
}
|
||||
|
||||
var _ bun.BeforeAppendModelHook = (*Download)(nil)
|
||||
|
||||
func (u *Download) BeforeAppendModel(ctx context.Context, query bun.Query) error {
|
||||
switch query.(type) {
|
||||
case *bun.InsertQuery:
|
||||
u.CreatedAt = time.Now()
|
||||
u.UpdatedAt = time.Now()
|
||||
case *bun.UpdateQuery:
|
||||
u.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
102
api/internal/core/ports/download_request/sql/repository.go
Normal file
102
api/internal/core/ports/download_request/sql/repository.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/entities"
|
||||
"github.com/uptrace/bun"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type repository struct {
|
||||
db *bun.DB
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func NewDownloadRequestSqlRepository(db *bun.DB, logger *zap.SugaredLogger) *repository {
|
||||
return &repository{
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (r repository) Create(ctx context.Context, download *entities.Download) (*entities.Download, error) {
|
||||
insertDownload := &Download{
|
||||
ID: download.ID,
|
||||
Status: download.Status,
|
||||
Link: download.Link,
|
||||
}
|
||||
|
||||
_, err := r.db.NewInsert().
|
||||
Model(insertDownload).
|
||||
Returning("*").
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &entities.Download{
|
||||
ID: insertDownload.ID,
|
||||
Status: insertDownload.Status,
|
||||
Link: insertDownload.Link,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r repository) GetById(ctx context.Context, id string) (*entities.Download, error) {
|
||||
download := &Download{
|
||||
ID: id,
|
||||
}
|
||||
|
||||
err := r.db.NewSelect().
|
||||
Model(download).
|
||||
WherePK().
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &entities.Download{
|
||||
ID: download.ID,
|
||||
Status: download.Status,
|
||||
Link: download.Link,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r repository) Update(ctx context.Context, download *entities.Download) error {
|
||||
updateDownload := &Download{
|
||||
ID: download.ID,
|
||||
Status: download.Status,
|
||||
Link: download.Link,
|
||||
}
|
||||
|
||||
_, err := r.db.NewUpdate().
|
||||
Model(updateDownload).
|
||||
ExcludeColumn("created_at").
|
||||
WherePK().
|
||||
Exec(ctx)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r repository) Get(ctx context.Context, active bool) ([]*entities.Download, error) {
|
||||
var downloads []Download
|
||||
err := r.db.NewSelect().
|
||||
Model(&downloads).
|
||||
Column("id", "status", "link").
|
||||
Limit(20).
|
||||
Order("created_at ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var responseDownloads []*entities.Download
|
||||
for _, download := range downloads {
|
||||
responseDownloads = append(responseDownloads, &entities.Download{
|
||||
ID: download.ID,
|
||||
Status: download.Status,
|
||||
Link: download.Link,
|
||||
})
|
||||
}
|
||||
|
||||
return responseDownloads, nil
|
||||
}
|
@@ -1,7 +1,10 @@
|
||||
package download
|
||||
|
||||
import "downloader/internal/core/entities"
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/entities"
|
||||
)
|
||||
|
||||
type BackgroundService interface {
|
||||
Run(download *entities.Download) error
|
||||
Run(ctx context.Context, download *entities.Download) error
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package _default
|
||||
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/entities"
|
||||
"downloader/internal/core/ports/download_request"
|
||||
"downloader/internal/core/services/download"
|
||||
@@ -18,12 +19,13 @@ func NewLocalBackgroundService(repository download_request.Repository, logger *z
|
||||
return &localBackgroundService{repository: repository, logger: logger, downloader: downloader}
|
||||
}
|
||||
|
||||
func (l localBackgroundService) Run(download *entities.Download) error {
|
||||
func (l localBackgroundService) Run(ctx context.Context, download *entities.Download) error {
|
||||
logger := l.logger.With("downloadId", download.ID)
|
||||
|
||||
go func() {
|
||||
longRunningCtx := context.TODO()
|
||||
download.Status = "started"
|
||||
_ = l.repository.Update(download)
|
||||
_ = l.repository.Update(longRunningCtx, download)
|
||||
|
||||
err := l.downloader.Download(download.Link, download.ID)
|
||||
download.Status = "done"
|
||||
@@ -33,13 +35,13 @@ func (l localBackgroundService) Run(download *entities.Download) error {
|
||||
download.Status = "failed"
|
||||
}
|
||||
|
||||
err = l.repository.Update(download)
|
||||
err = l.repository.Update(longRunningCtx, download)
|
||||
if err != nil {
|
||||
logger.Errorw("download request failed",
|
||||
"downloadLink", download.Link)
|
||||
download.Status = "failed"
|
||||
|
||||
if updateErr := l.repository.Update(download); updateErr != nil {
|
||||
if updateErr := l.repository.Update(longRunningCtx, download); updateErr != nil {
|
||||
panic(updateErr)
|
||||
}
|
||||
} else {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package _default
|
||||
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/entities"
|
||||
"downloader/internal/core/ports/download_request"
|
||||
"downloader/internal/core/services/download"
|
||||
@@ -24,7 +25,7 @@ func NewLocalService(repository download_request.Repository, uuidGen uuid.Gen, b
|
||||
}
|
||||
}
|
||||
|
||||
func (l *localService) Schedule(link string) (*entities.Download, error) {
|
||||
func (l *localService) Schedule(ctx context.Context, link string) (*entities.Download, error) {
|
||||
download, err := entities.NewDownload(link)(l.uuidGen)
|
||||
if err != nil {
|
||||
l.logger.Warn("Could not parse download")
|
||||
@@ -33,13 +34,13 @@ func (l *localService) Schedule(link string) (*entities.Download, error) {
|
||||
|
||||
logger := l.logger.With("downloadId", download.ID)
|
||||
|
||||
persistedDownloadRequest, uploadErr := l.repository.Create(download)
|
||||
persistedDownloadRequest, uploadErr := l.repository.Create(ctx, download)
|
||||
if uploadErr != nil {
|
||||
logger.Error("failed to insert download request")
|
||||
return nil, uploadErr
|
||||
}
|
||||
|
||||
err = l.BackgroundService.Run(persistedDownloadRequest)
|
||||
err = l.BackgroundService.Run(ctx, persistedDownloadRequest)
|
||||
if err != nil {
|
||||
logger.Error("failed to run download request")
|
||||
return nil, err
|
||||
@@ -48,10 +49,10 @@ func (l *localService) Schedule(link string) (*entities.Download, error) {
|
||||
return persistedDownloadRequest, nil
|
||||
}
|
||||
|
||||
func (l *localService) Get(id string) (*entities.Download, error) {
|
||||
return l.repository.GetById(id)
|
||||
func (l *localService) Get(ctx context.Context, id string) (*entities.Download, error) {
|
||||
return l.repository.GetById(ctx, id)
|
||||
}
|
||||
|
||||
func (l *localService) GetAll(active bool) ([]*entities.Download, error) {
|
||||
return l.repository.Get(active)
|
||||
func (l *localService) GetAll(ctx context.Context, active bool) ([]*entities.Download, error) {
|
||||
return l.repository.Get(ctx, active)
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/ports/download_request"
|
||||
"downloader/internal/core/ports/downloadhandler"
|
||||
"fmt"
|
||||
@@ -20,7 +21,7 @@ func New(repository download_request.Repository, logger *zap.SugaredLogger) down
|
||||
}
|
||||
|
||||
func (o *onDownloadEventHandler) OnTickEvent(downloadId string, progress string) {
|
||||
download, err := o.repository.GetById(downloadId)
|
||||
download, err := o.repository.GetById(context.TODO(), downloadId)
|
||||
if err != nil {
|
||||
o.logger.Warnw("could not finish updating progress as not download id available",
|
||||
"downloadId", downloadId,
|
||||
@@ -28,5 +29,5 @@ func (o *onDownloadEventHandler) OnTickEvent(downloadId string, progress string)
|
||||
return
|
||||
}
|
||||
download.Status = fmt.Sprintf("in-progress: %s", progress)
|
||||
_ = o.repository.Update(download)
|
||||
_ = o.repository.Update(context.TODO(), download)
|
||||
}
|
||||
|
@@ -1,9 +1,12 @@
|
||||
package download
|
||||
|
||||
import "downloader/internal/core/entities"
|
||||
import (
|
||||
"context"
|
||||
"downloader/internal/core/entities"
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
Schedule(link string) (*entities.Download, error)
|
||||
Get(id string) (*entities.Download, error)
|
||||
GetAll(active bool) ([]*entities.Download, error)
|
||||
Schedule(ctx context.Context, link string) (*entities.Download, error)
|
||||
Get(ctx context.Context, id string) (*entities.Download, error)
|
||||
GetAll(ctx context.Context, active bool) ([]*entities.Download, error)
|
||||
}
|
||||
|
Reference in New Issue
Block a user