Adding Initial action (#1)

Co-authored-by: kjuulh <contact@kjuulh.io>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/kraken/pulls/1
This commit is contained in:
2022-09-12 22:12:02 +02:00
parent b3302bb3c6
commit 50228f0aff
23 changed files with 1155 additions and 59 deletions

16
internal/api/health.go Normal file
View File

@@ -0,0 +1,16 @@
package api
import (
"net/http"
"github.com/gin-gonic/gin"
)
func HealthRoute(app *gin.Engine) {
healthRoute := app.Group("/health")
healthRoute.GET("/ready", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "healthy",
})
})
}

View File

@@ -0,0 +1,39 @@
package api
import (
"context"
"net/http"
"git.front.kjuulh.io/kjuulh/kraken/internal/commands"
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/jobs"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.uber.org/zap"
)
func CommandRoute(logger *zap.Logger, app *gin.Engine, deps *serverdeps.ServerDeps) {
commandRoute := app.Group("commands")
commandRoute.POST("processRepos", func(c *gin.Context) {
type processReposRequest struct {
RepositoryUrls []string `json:"repositoryUrls"`
}
var request processReposRequest
err := c.BindJSON(&request)
if err != nil {
logger.Info("could not bind request", zap.String("request", "processRepo"), zap.Error(err))
c.AbortWithStatus(http.StatusBadRequest)
return
}
jobId := uuid.New().String()
go func(repositoryUrls []string, jobId string) {
ctx := context.WithValue(context.Background(), jobs.JobId{}, jobId)
processRepos := commands.NewProcessRepos(logger, deps)
err = processRepos.Process(ctx, repositoryUrls)
}(request.RepositoryUrls, jobId)
c.Status(http.StatusAccepted)
})
}

12
internal/api/root.go Normal file
View File

@@ -0,0 +1,12 @@
package api
import (
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
func BuildApi(logger *zap.Logger, app *gin.Engine, deps *serverdeps.ServerDeps) {
HealthRoute(app)
CommandRoute(logger, app, deps)
}

View File

@@ -0,0 +1,152 @@
package commands
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"sync"
"time"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/actions"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/providers"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"go.uber.org/zap"
)
type (
ProcessRepos struct {
logger *zap.Logger
storage *storage.Service
git *providers.Git
action *actions.Action
}
ProcessReposDeps interface {
GetStorageService() *storage.Service
GetGitProvider() *providers.Git
GetAction() *actions.Action
}
)
func NewProcessRepos(logger *zap.Logger, deps ProcessReposDeps) *ProcessRepos {
return &ProcessRepos{
logger: logger,
storage: deps.GetStorageService(),
git: deps.GetGitProvider(),
action: deps.GetAction(),
}
}
func (pr *ProcessRepos) Process(ctx context.Context, repositoryUrls []string) error {
// Clone repos
wg := sync.WaitGroup{}
wg.Add(len(repositoryUrls))
errChan := make(chan error, 1)
for _, repoUrl := range repositoryUrls {
go func(ctx context.Context, repoUrl string) {
defer func() {
wg.Done()
}()
pr.logger.Debug("Creating area", zap.String("repoUrl", repoUrl))
area, err := pr.storage.CreateArea(ctx)
if err != nil {
pr.logger.Error("failed to allocate area", zap.Error(err))
errChan <- err
return
}
defer func(ctx context.Context) {
pr.logger.Debug("Removing area", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
err = pr.storage.RemoveArea(ctx, area)
if err != nil {
errChan <- err
return
}
}(ctx)
pr.logger.Debug("Cloning repo", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
cloneCtx, _ := context.WithTimeout(ctx, time.Second*5)
repo, err := pr.git.Clone(cloneCtx, area, repoUrl)
if err != nil {
pr.logger.Error("could not clone repo", zap.Error(err))
errChan <- err
return
}
err = pr.git.CreateBranch(ctx, repo)
if err != nil {
pr.logger.Error("could not create branch", zap.Error(err))
errChan <- err
return
}
err = pr.action.Run(
ctx,
area,
func(_ context.Context, area *storage.Area) (bool, error) {
pr.logger.Debug("checking predicate", zap.String("area", area.Path))
contains := false
filepath.WalkDir(area.Path, func(path string, d fs.DirEntry, err error) error {
if d.Name() == "roadmap.md" {
contains = true
}
return nil
})
return contains, nil
},
func(_ context.Context, area *storage.Area) error {
pr.logger.Debug("running action", zap.String("area", area.Path))
readme := path.Join(area.Path, "README.md")
file, err := os.Create(readme)
if err != nil {
return fmt.Errorf("could not create readme: %w", err)
}
_, err = file.WriteString("# Readme")
if err != nil {
return fmt.Errorf("could not write readme: %w", err)
}
_, err = pr.git.Add(ctx, area, repo)
if err != nil {
return fmt.Errorf("could not add file: %w", err)
}
err = pr.git.Commit(ctx, repo)
if err != nil {
return fmt.Errorf("could not get diff: %w", err)
}
return nil
}, false)
if err != nil {
pr.logger.Error("could not run action", zap.Error(err))
errChan <- err
return
}
err = pr.git.Push(ctx, repo)
if err != nil {
pr.logger.Error("could not push to repo", zap.Error(err))
errChan <- err
return
}
pr.logger.Debug("processing done", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
}(ctx, repoUrl)
}
wg.Wait()
close(errChan)
pr.logger.Debug("finished processing all repos")
for err := range errChan {
return err
}
return nil
}

View File

@@ -4,46 +4,28 @@ import (
"context"
"errors"
"net/http"
"time"
"git.front.kjuulh.io/kjuulh/curre"
"git.front.kjuulh.io/kjuulh/kraken/internal/api"
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
ginzap "github.com/gin-contrib/zap"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
func NewHttpServer(deps *serverdeps.ServerDeps) curre.Component {
return curre.NewFunctionalComponent(&curre.FunctionalComponent{
StartFunc: func(_ *curre.FunctionalComponent, _ context.Context) error {
handler := http.NewServeMux()
handler.HandleFunc(
"/health/ready",
func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("ready"))
w.WriteHeader(http.StatusOK)
})
http.ListenAndServe("127.0.0.1:3000", handler)
return nil
},
},
)
}
func NewGinHttpServer(_ *serverdeps.ServerDeps) curre.Component {
func NewGinHttpServer(logger *zap.Logger, deps *serverdeps.ServerDeps) curre.Component {
var app *gin.Engine
var server *http.Server
return curre.NewFunctionalComponent(&curre.FunctionalComponent{
InitFunc: func(_ *curre.FunctionalComponent, _ context.Context) error {
app = gin.Default()
app = gin.New()
app.UseH2C = true
app.Use(ginzap.Ginzap(logger, time.RFC3339, true))
app.Use(ginzap.RecoveryWithZap(logger, true))
healthRoute := app.Group("/health")
healthRoute.GET("/ready", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "healthy",
})
})
api.BuildApi(logger, app, deps)
server = &http.Server{
Addr: "127.0.0.1:3000",
@@ -62,6 +44,7 @@ func NewGinHttpServer(_ *serverdeps.ServerDeps) curre.Component {
return nil
},
StopFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
ctx, _ = context.WithTimeout(ctx, time.Second*10)
if server != nil {
server.Shutdown(ctx)
}

View File

@@ -5,6 +5,7 @@ import (
"git.front.kjuulh.io/kjuulh/curre"
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/signer"
"go.uber.org/zap"
)
@@ -14,6 +15,8 @@ func Start(logger *zap.Logger) error {
deps := serverdeps.NewServerDeps(logger)
return curre.NewManager().
Register(NewGinHttpServer(deps)).
Register(NewGinHttpServer(logger.With(zap.Namespace("ginHttpServer")), deps)).
Register(NewStorageServer(logger.With(zap.Namespace("storageServer")), deps)).
Register(signer.NewOpenPGPApp(deps.GetOpenPGP())).
Run(ctx)
}

View File

@@ -0,0 +1,28 @@
package server
import (
"context"
"time"
"git.front.kjuulh.io/kjuulh/curre"
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
"go.uber.org/zap"
)
func NewStorageServer(logger *zap.Logger, deps *serverdeps.ServerDeps) curre.Component {
storage := deps.GetStorageService()
return curre.NewFunctionalComponent(&curre.FunctionalComponent{
InitFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
logger.Debug("Initializing storage")
return storage.InitializeStorage(ctx)
},
StartFunc: func(fc *curre.FunctionalComponent, ctx context.Context) error {
return nil
},
StopFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
logger.Debug("Cleaning up storage")
ctx, _ = context.WithTimeout(ctx, time.Second*10)
return storage.CleanupStorage(ctx)
},
})
}

View File

@@ -1,13 +1,64 @@
package serverdeps
import "go.uber.org/zap"
import (
"git.front.kjuulh.io/kjuulh/kraken/internal/services/actions"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/providers"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/signer"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"go.uber.org/zap"
)
type ServerDeps struct {
logger *zap.Logger
storageConfig *storage.StorageConfig
gitCfg *providers.GitConfig
openPGP *signer.OpenPGP
}
func NewServerDeps(logger *zap.Logger) *ServerDeps {
return &ServerDeps{
logger: logger.With(zap.String("app", "serverdeps")),
deps := &ServerDeps{
logger: logger.With(zap.Namespace("serverdeps")),
}
if storageCfg, err := storage.NewDefaultStorageConfig(); err != nil {
panic(err)
} else {
deps.storageConfig = storageCfg
}
deps.gitCfg = &providers.GitConfig{
AuthOption: providers.GIT_AUTH_SSH,
User: "git",
Password: "",
AccessToken: "",
SshPublicKeyFilePath: "/Users/kah/.ssh/id_ed25519",
SshPrivateKeyPassword: "",
}
openPGPConfig := &signer.OpenPgpConfig{
PrivateKeyFilePath: "./example/testkey.private.pgp",
PrivateKeyPassword: "somepassword",
PrivateKeyIdentity: "kraken@kasperhermansen.com",
}
deps.openPGP = signer.NewOpenPGP(logger.With(zap.Namespace("openpgp")), openPGPConfig)
return deps
}
func (deps *ServerDeps) GetStorageService() *storage.Service {
return storage.NewService(deps.logger.With(zap.Namespace("storage")), deps.storageConfig)
}
func (deps *ServerDeps) GetGitProvider() *providers.Git {
return providers.NewGit(deps.logger.With(zap.Namespace("gitProvider")), deps.gitCfg, deps.openPGP)
}
func (deps *ServerDeps) GetAction() *actions.Action {
return actions.NewAction(deps.logger.With(zap.Namespace("action")))
}
func (deps *ServerDeps) GetOpenPGP() *signer.OpenPGP {
return deps.openPGP
}

View File

@@ -0,0 +1,43 @@
package actions
import (
"context"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"go.uber.org/zap"
)
type Predicate func(ctx context.Context, area *storage.Area) (bool, error)
type ActionFunc func(ctx context.Context, area *storage.Area) error
type Action struct {
logger *zap.Logger
}
func NewAction(logger *zap.Logger) *Action {
return &Action{logger: logger}
}
func (a *Action) Run(ctx context.Context, area *storage.Area, predicate Predicate, action ActionFunc, dryrun bool) error {
matches, err := predicate(ctx, area)
if err != nil {
return err
}
if !matches {
a.logger.Debug("repo doesn't match, skipping", zap.String("path", area.Path))
return nil
}
if dryrun {
a.logger.Panic("dryrun selected, but not implemented yet")
return nil
}
err = action(ctx, area)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,4 @@
package jobs
type JobId struct {
}

View File

@@ -0,0 +1,260 @@
package providers
import (
"context"
"fmt"
"time"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/signer"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
)
// Git is a native git provider, it can clone, pull
// , push and as in abstraction on native git operations
type Git struct {
logger *zap.Logger
gitConfig *GitConfig
openPGP *signer.OpenPGP
}
type GitRepo struct {
repo *git.Repository
}
type GitAuth string
const (
GIT_AUTH_SSH GitAuth = "ssh"
GIT_AUTH_USERNAME_PASSWORD GitAuth = "username_password"
GIT_AUTH_ACCESS_TOKEN GitAuth = "access_token"
GIT_AUTH_ANONYMOUS GitAuth = "anonymous"
GIT_AUTH_SSH_AGENT GitAuth = "ssh_agent"
)
type GitConfig struct {
AuthOption GitAuth
User string
Password string
AccessToken string
SshPublicKeyFilePath string
SshPrivateKeyPassword string
}
func NewGit(logger *zap.Logger, gitConfig *GitConfig, openPGP *signer.OpenPGP) *Git {
return &Git{logger: logger, gitConfig: gitConfig, openPGP: openPGP}
}
func (g *Git) Clone(ctx context.Context, storageArea *storage.Area, repoUrl string) (*GitRepo, error) {
g.logger.Debug(
"cloning repository",
zap.String("repoUrl", repoUrl),
zap.String("path", storageArea.Path),
)
auth, err := g.GetAuth()
if err != nil {
return nil, err
}
cloneOptions := git.CloneOptions{
URL: repoUrl,
Auth: auth,
RemoteName: "origin",
ReferenceName: "refs/heads/main",
SingleBranch: true,
NoCheckout: false,
Depth: 1,
RecurseSubmodules: 1,
Progress: g.getProgressWriter(),
Tags: 0,
InsecureSkipTLS: false,
CABundle: []byte{},
}
repo, err := git.PlainCloneContext(ctx, storageArea.Path, false, &cloneOptions)
if err != nil {
return nil, err
}
g.logger.Debug("done cloning repo")
return &GitRepo{repo: repo}, nil
}
func (g *Git) getProgressWriter() *zapio.Writer {
return &zapio.Writer{
Log: g.logger.With(zap.String("process", "go-git")),
Level: zap.DebugLevel,
}
}
func (g *Git) Add(ctx context.Context, storageArea *storage.Area, gitRepo *GitRepo) (*git.Worktree, error) {
worktree, err := gitRepo.repo.Worktree()
if err != nil {
return nil, err
}
err = worktree.AddWithOptions(&git.AddOptions{
All: true,
})
if err != nil {
return nil, err
}
status, err := worktree.Status()
if err != nil {
return nil, err
}
g.logger.Debug("git status", zap.String("status", status.String()))
return worktree, nil
}
func (g *Git) CreateBranch(ctx context.Context, gitRepo *GitRepo) error {
worktree, err := gitRepo.repo.Worktree()
if err != nil {
return err
}
refSpec := plumbing.NewBranchReferenceName("kraken-apply")
err = gitRepo.repo.CreateBranch(&config.Branch{
Name: "kraken-apply",
Remote: "origin",
Merge: refSpec,
Rebase: "",
})
if err != nil {
return fmt.Errorf("could not create branch: %w", err)
}
err = worktree.Checkout(&git.CheckoutOptions{
Branch: plumbing.ReferenceName(refSpec.String()),
Create: true,
Force: false,
Keep: false,
})
if err != nil {
return fmt.Errorf("could not checkout branch: %w", err)
}
remoteRef := plumbing.NewRemoteReferenceName("origin", "kraken-apply")
ref := plumbing.NewSymbolicReference(refSpec, remoteRef)
err = gitRepo.repo.Storer.SetReference(ref)
if err != nil {
return fmt.Errorf("could not set reference: %w", err)
}
auth, err := g.GetAuth()
if err != nil {
return err
}
err = worktree.PullContext(ctx, &git.PullOptions{
RemoteName: "origin",
ReferenceName: "refs/heads/main",
SingleBranch: true,
Depth: 1,
Auth: auth,
RecurseSubmodules: 1,
Progress: g.getProgressWriter(),
Force: true,
InsecureSkipTLS: false,
CABundle: []byte{},
})
if err != nil {
return fmt.Errorf("could not pull from origin: %w", err)
}
g.logger.Debug("done creating branches")
return nil
}
func (g *Git) Commit(ctx context.Context, gitRepo *GitRepo) error {
worktree, err := gitRepo.repo.Worktree()
if err != nil {
return err
}
_, err = worktree.Commit("some-commit", &git.CommitOptions{
All: true,
Author: &object.Signature{Name: "kraken", Email: "kraken@kasperhermansen.com", When: time.Now()},
Committer: &object.Signature{Name: "kraken", Email: "kraken@kasperhermansen.com", When: time.Now()},
SignKey: g.openPGP.SigningKey,
})
if err != nil {
return err
}
g.logger.Debug("done commiting objects")
return nil
}
func (g *Git) Push(ctx context.Context, gitRepo *GitRepo) error {
auth, err := g.GetAuth()
if err != nil {
return err
}
err = gitRepo.repo.PushContext(ctx, &git.PushOptions{
RemoteName: "origin",
RefSpecs: []config.RefSpec{},
Auth: auth,
Progress: g.getProgressWriter(),
Prune: false,
Force: false,
InsecureSkipTLS: false,
CABundle: []byte{},
RequireRemoteRefs: []config.RefSpec{},
})
if err != nil {
return err
}
g.logger.Debug("done pushing branch")
return nil
}
func (g *Git) GetAuth() (transport.AuthMethod, error) {
switch g.gitConfig.AuthOption {
case GIT_AUTH_SSH:
sshKey, err := ssh.NewPublicKeysFromFile(
g.gitConfig.User,
g.gitConfig.SshPublicKeyFilePath,
g.gitConfig.SshPrivateKeyPassword,
)
if err != nil {
return nil, err
}
return sshKey, nil
case GIT_AUTH_USERNAME_PASSWORD:
return &http.BasicAuth{
Username: g.gitConfig.User,
Password: g.gitConfig.Password,
}, nil
case GIT_AUTH_ACCESS_TOKEN:
return &http.BasicAuth{
Username: "required-username",
Password: g.gitConfig.AccessToken,
}, nil
case GIT_AUTH_ANONYMOUS:
return nil, nil
case GIT_AUTH_SSH_AGENT:
return ssh.NewSSHAgentAuth(g.gitConfig.User)
default:
return nil, nil
}
}

View File

@@ -0,0 +1,81 @@
package signer
import (
"context"
"errors"
"os"
"strings"
"git.front.kjuulh.io/kjuulh/curre"
"github.com/ProtonMail/go-crypto/openpgp"
"go.uber.org/zap"
)
type OpenPGP struct {
logger *zap.Logger
SigningKey *openpgp.Entity
config *OpenPgpConfig
}
type OpenPgpConfig struct {
PrivateKeyFilePath string
PrivateKeyPassword string
PrivateKeyIdentity string
}
func NewOpenPGP(logger *zap.Logger, config *OpenPgpConfig) *OpenPGP {
return &OpenPGP{
logger: logger,
config: config,
}
}
func NewOpenPGPApp(openPGP *OpenPGP) curre.Component {
return curre.NewFunctionalComponent(&curre.FunctionalComponent{
InitFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
keyring, err := buildKeyring(ctx, openPGP)
if err != nil {
openPGP.logger.Panic("could not build keyring", zap.Error(err))
return err
}
openPGP.SigningKey = keyring
return nil
},
StartFunc: func(fc *curre.FunctionalComponent, ctx context.Context) error {
return nil
},
StopFunc: func(fc *curre.FunctionalComponent, ctx context.Context) error {
return nil
},
})
}
func buildKeyring(_ context.Context, openPGP *OpenPGP) (*openpgp.Entity, error) {
content, err := os.ReadFile(openPGP.config.PrivateKeyFilePath)
if err != nil {
return nil, err
}
reader := strings.NewReader(string(content))
es, err := openpgp.ReadArmoredKeyRing(reader)
if err != nil {
return nil, err
}
for _, key := range es {
for k := range key.Identities {
if strings.Contains(k, openPGP.config.PrivateKeyIdentity) {
err = key.PrivateKey.Decrypt([]byte(openPGP.config.PrivateKeyPassword))
if err != nil {
return nil, err
}
return key, nil
}
}
}
return nil, errors.New("could not find key matching identity")
}

View File

@@ -0,0 +1,7 @@
package storage
type (
Area struct {
Path string
}
)

View File

@@ -0,0 +1,77 @@
package storage
import (
"errors"
"os"
"path"
"go.uber.org/zap"
"golang.org/x/net/context"
)
// The idea behind storage is that we have file dir, with a git repo.
// This file repo can now take certain actions
type StorageConfig struct {
Path string
}
func NewDefaultStorageConfig() (*StorageConfig, error) {
tempDir, err := os.MkdirTemp(os.TempDir(), "")
if err != nil {
return nil, err
}
return &StorageConfig{
Path: path.Join(tempDir, "kraken"),
}, nil
}
type Service struct {
logger *zap.Logger
cfg *StorageConfig
}
func NewService(logger *zap.Logger, cfg *StorageConfig) *Service {
return &Service{logger: logger, cfg: cfg}
}
func (s *Service) getStoragePath(ctx context.Context) string {
return path.Join(s.cfg.Path, "storage")
}
func (s *Service) InitializeStorage(ctx context.Context) error {
return os.MkdirAll(s.getStoragePath(ctx), 0755)
}
func (s *Service) CleanupStorage(ctx context.Context) error {
doneRemovingChan := make(chan struct{}, 1)
go func(ctx context.Context) {
s.logger.Debug("Removing all temp storage")
os.RemoveAll(s.getStoragePath(ctx))
doneRemovingChan <- struct{}{}
}(ctx)
select {
case <-ctx.Done():
return errors.New("could not cleanup storage aborting")
case <-doneRemovingChan:
return nil
}
return nil
}
func (s *Service) CreateArea(ctx context.Context) (*Area, error) {
dir, err := os.MkdirTemp(s.getStoragePath(ctx), "*")
if err != nil {
return nil, err
}
return &Area{
Path: dir,
}, nil
}
func (s *Service) RemoveArea(ctx context.Context, area *Area) error {
return os.RemoveAll(area.Path)
}