add basic leader election on top of postgres
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -21,5 +21,5 @@ func (a *App) Logger() *slog.Logger {
|
||||
}
|
||||
|
||||
func (a *App) Scheduler() *scheduler.Scheduler {
|
||||
return scheduler.NewScheduler(a.logger)
|
||||
return scheduler.NewScheduler(a.logger, Postgres())
|
||||
}
|
||||
|
23
internal/app/postgres.go
Normal file
23
internal/app/postgres.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/utilities"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
var Postgres = utilities.Singleton(func() (*pgx.Conn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
conn, err := pgx.Connect(ctx, os.Getenv("ORBIS_POSTGRES_DB"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to orbis postgres database: %w", err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
})
|
@@ -2,18 +2,24 @@ package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
logger *slog.Logger
|
||||
db *pgx.Conn
|
||||
}
|
||||
|
||||
func NewScheduler(logger *slog.Logger) *Scheduler {
|
||||
func NewScheduler(logger *slog.Logger, db *pgx.Conn) *Scheduler {
|
||||
return &Scheduler{
|
||||
logger: logger,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,8 +54,21 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
|
||||
return false, nil
|
||||
|
||||
default:
|
||||
// Attempt to acquire leader
|
||||
//
|
||||
var acquiredLock bool
|
||||
if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
if !acquiredLock {
|
||||
wait := time.Second * time.Duration(rand.Float32()*9+1)
|
||||
|
||||
s.logger.Debug("failed to acquire lock, parking non-elected scheduler", "wait_seconds", wait)
|
||||
time.Sleep(wait)
|
||||
continue
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user