re-wire logging on top of zerolog

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi
2021-01-13 17:38:16 -08:00
parent 49f0c0e149
commit e09723861f
19 changed files with 270 additions and 111 deletions

View File

@@ -11,6 +11,8 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/rs/zerolog/log"
// Cue
"cuelang.org/go/cue"
@@ -80,12 +82,12 @@ func NewClient(ctx context.Context, host, boot, bootdir string) (*Client, error)
}, nil
}
func (c *Client) LocalDirs() ([]string, error) {
func (c *Client) LocalDirs(ctx context.Context) ([]string, error) {
boot, err := c.BootScript()
if err != nil {
return nil, err
}
return boot.LocalDirs()
return boot.LocalDirs(ctx)
}
func (c *Client) BootScript() (*Script, error) {
@@ -102,6 +104,8 @@ func (c *Client) BootScript() (*Script, error) {
}
func (c *Client) Compute(ctx context.Context) (*Value, error) {
lg := log.Ctx(ctx)
cc := &Compiler{}
out, err := cc.EmptyStruct()
if err != nil {
@@ -140,7 +144,7 @@ func (c *Client) Compute(ctx context.Context) (*Value, error) {
})
eg.Go(func() error {
return c.dockerprintfn(dispCtx, eventsDockerPrint, os.Stderr)
return c.dockerprintfn(dispCtx, eventsDockerPrint, lg)
})
} else {
eg.Go(func() error {
@@ -157,6 +161,8 @@ func (c *Client) Compute(ctx context.Context) (*Value, error) {
}
func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.WriteCloser) error {
lg := log.Ctx(ctx)
boot, err := c.BootScript()
if err != nil {
return errors.Wrap(err, "assemble boot script")
@@ -165,7 +171,7 @@ func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.Writ
if err != nil {
return errors.Wrap(err, "serialize boot script")
}
debugf("client: assembled boot script: %s\n", bootSource)
lg.Debug().Bytes("bootSource", bootSource).Msg("assembled boot script")
// Setup solve options
opts := bk.SolveOpt{
FrontendAttrs: map[string]string{
@@ -184,7 +190,7 @@ func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.Writ
},
}
// Connect local dirs
localdirs, err := c.LocalDirs()
localdirs, err := c.LocalDirs(ctx)
if err != nil {
return errors.Wrap(err, "connect local dirs")
}
@@ -199,14 +205,19 @@ func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.Writ
}
for k, v := range resp.ExporterResponse {
// FIXME consume exporter response
fmt.Printf("exporter response: %s=%s\n", k, v)
lg.
Debug().
Str("key", k).
Str("value", v).
Msg("exporter response")
}
return nil
}
// Read tar export stream from buildkit Build(), and extract cue output
func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) error {
defer debugf("outputfn complete")
func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value) error {
lg := log.Ctx(ctx)
tr := tar.NewReader(r)
for {
h, err := tr.Next()
@@ -216,11 +227,17 @@ func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) error {
if err != nil {
return errors.Wrap(err, "read tar stream")
}
lg := lg.
With().
Str("file", h.Name).
Logger()
if !strings.HasSuffix(h.Name, ".cue") {
debugf("skipping non-cue file from exporter tar stream: %s", h.Name)
lg.Debug().Msg("skipping non-cue file from exporter tar stream")
continue
}
debugf("outputfn: compiling & merging %q", h.Name)
lg.Debug().Msg("outputfn: compiling & merging")
cc := out.Compiler()
v, err := cc.Compile(h.Name, tr)
@@ -230,7 +247,6 @@ func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) error {
if err := out.Fill(v); err != nil {
return errors.Wrap(err, h.Name)
}
debugf("outputfn: DONE: compiling & merging %q", h.Name)
}
return nil
}
@@ -254,7 +270,7 @@ func (n Node) ComponentPath() cue.Path {
return cue.MakePath(parts...)
}
func (n Node) Logf(msg string, args ...interface{}) {
func (n Node) Logf(ctx context.Context, msg string, args ...interface{}) {
componentPath := n.ComponentPath().String()
args = append([]interface{}{componentPath}, args...)
if msg != "" && !strings.HasSuffix(msg, "\n") {
@@ -263,34 +279,41 @@ func (n Node) Logf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "[%s] "+msg, args...)
}
func (n Node) LogStream(nStream int, data []byte) {
var stream string
func (n Node) LogStream(ctx context.Context, nStream int, data []byte) {
lg := log.
Ctx(ctx).
With().
Str("path", n.ComponentPath().String()).
Logger()
switch nStream {
case 1:
stream = "stdout"
lg = lg.With().Str("stream", "stdout").Logger()
case 2:
stream = "stderr"
lg = lg.With().Str("stream", "stderr").Logger()
default:
stream = fmt.Sprintf("%d", nStream)
}
// FIXME: use bufio reader?
lines := strings.Split(string(data), "\n")
for _, line := range lines {
n.Logf("[%s] %s", stream, line)
lg = lg.With().Str("stream", fmt.Sprintf("%d", nStream)).Logger()
}
lg.Debug().Msg(string(data))
}
func (n Node) LogError(errmsg string) {
n.Logf("ERROR: %s", bkCleanError(errmsg))
func (n Node) LogError(ctx context.Context, errmsg string) {
log.
Ctx(ctx).
Error().
Str("path", n.ComponentPath().String()).
Msg(bkCleanError(errmsg))
}
func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error {
lg := log.Ctx(ctx)
// Node status mapped to buildkit vertex digest
nodesByDigest := map[string]*Node{}
// Node status mapped to cue path
nodesByPath := map[string]*Node{}
defer debugf("printfn complete")
for {
select {
case <-ctx.Done():
@@ -299,11 +322,13 @@ func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error {
if !ok {
return nil
}
debugf("status event: vertexes:%d statuses:%d logs:%d\n",
len(status.Vertexes),
len(status.Statuses),
len(status.Logs),
)
lg.
Debug().
Int("vertexes", len(status.Vertexes)).
Int("statuses", len(status.Statuses)).
Int("logs", len(status.Logs)).
Msg("status event")
for _, v := range status.Vertexes {
// FIXME: insert raw buildkit telemetry here (ie for debugging, etc.)
@@ -320,12 +345,12 @@ func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error {
nodesByPath[n.Path.String()] = n
nodesByDigest[n.Digest.String()] = n
if n.Error != "" {
n.LogError(n.Error)
n.LogError(ctx, n.Error)
}
}
for _, log := range status.Logs {
if n, ok := nodesByDigest[log.Vertex.String()]; ok {
n.LogStream(log.Stream, log.Data)
n.LogStream(ctx, log.Stream, log.Data)
}
}
// debugJSON(status)
@@ -351,7 +376,6 @@ func bkCleanError(msg string) string {
}
func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error {
defer debugf("dockerprintfn complete")
var cons console.Console
// FIXME: use smarter writer from blr
return progressui.DisplaySolveStatus(ctx, "", cons, out, ch)

View File

@@ -11,6 +11,7 @@ import (
cueerrors "cuelang.org/go/cue/errors"
cueload "cuelang.org/go/cue/load"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
// Polyfill for a cue runtime
@@ -66,8 +67,8 @@ func (cc *Compiler) CompileScript(name string, src interface{}) (*Script, error)
// Build a cue configuration tree from the files in fs.
func (cc *Compiler) Build(ctx context.Context, fs FS, args ...string) (*Value, error) {
debugf("Compiler.Build")
defer debugf("COMPLETE: Compiler.Build")
lg := log.Ctx(ctx)
// The CUE overlay needs to be prefixed by a non-conflicting path with the
// local filesystem, otherwise Cue will merge the Overlay with whatever Cue
// files it finds locally.
@@ -80,7 +81,7 @@ func (cc *Compiler) Build(ctx context.Context, fs FS, args ...string) (*Value, e
buildArgs := args
err := fs.Walk(ctx, func(p string, f Stat) error {
debugf(" Compiler.Build: processing %q", p)
lg.Debug().Str("path", p).Msg("Compiler.Build: processing")
if f.IsDir() {
return nil
}

View File

@@ -67,10 +67,10 @@ func (c *Component) Execute(ctx context.Context, fs FS, out Fillable) (FS, error
return script.Execute(ctx, fs, out)
}
func (c *Component) Walk(fn func(*Op) error) error {
func (c *Component) Walk(ctx context.Context, fn func(*Op) error) error {
script, err := c.ComputeScript()
if err != nil {
return err
}
return script.Walk(fn)
return script.Walk(ctx, fn)
}

View File

@@ -1,6 +1,7 @@
package dagger
import (
"context"
"testing"
)
@@ -31,7 +32,7 @@ func TestValidateSimpleComponent(t *testing.T) {
t.Fatal(err)
}
n := 0
if err := s.Walk(func(op *Op) error {
if err := s.Walk(context.TODO(), func(op *Op) error {
n++
return nil
}); err != nil {

View File

@@ -6,17 +6,19 @@ import (
cueerrors "cuelang.org/go/cue/errors"
bkgw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/rs/zerolog/log"
)
// Buildkit compute entrypoint (BK calls if "solve" or "build")
// Use by wrapping in a buildkit client Build call, or buildkit frontend.
func Compute(ctx context.Context, c bkgw.Client) (r *bkgw.Result, err error) {
lg := log.Ctx(ctx)
// FIXME: wrap errors to avoid crashing buildkit Build()
// with cue error types (why??)
defer func() {
if err != nil {
err = fmt.Errorf("%s", cueerrors.Details(err, nil))
debugf("execute returned an error. Wrapping...")
}
}()
// Retrieve boot script form client
@@ -24,12 +26,12 @@ func Compute(ctx context.Context, c bkgw.Client) (r *bkgw.Result, err error) {
if err != nil {
return nil, err
}
debugf("computing env")
lg.Debug().Msg("computing env")
// Compute output overlay
if err := env.Compute(ctx); err != nil {
return nil, err
}
debugf("exporting env")
lg.Debug().Msg("exporting env")
// Export env to a cue directory
outdir := NewSolver(c).Scratch()
outdir, err = env.Export(outdir)

View File

@@ -8,6 +8,7 @@ import (
"cuelang.org/go/cue"
cueflow "cuelang.org/go/tools/flow"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
type Env struct {
@@ -29,7 +30,14 @@ type Env struct {
// Initialize a new environment
func NewEnv(ctx context.Context, s Solver, bootsrc, inputsrc string) (*Env, error) {
debugf("NewEnv(boot=%q input=%q)", bootsrc, inputsrc)
lg := log.Ctx(ctx)
lg.
Debug().
Str("boot", bootsrc).
Str("input", inputsrc).
Msg("New Env")
cc := &Compiler{}
// 1. Compile & execute boot script
boot, err := cc.CompileScript("boot.cue", bootsrc)
@@ -42,13 +50,13 @@ func NewEnv(ctx context.Context, s Solver, bootsrc, inputsrc string) (*Env, erro
}
// 2. load cue files produced by boot script
// FIXME: BuildAll() to force all files (no required package..)
debugf("building cue configuration from boot state")
lg.Debug().Msg("building cue configuration from boot state")
base, err := cc.Build(ctx, bootfs)
if err != nil {
return nil, errors.Wrap(err, "load base config")
}
// 3. Compile & merge input overlay (user settings, input directories, secrets.)
debugf("Loading input overlay")
lg.Debug().Msg("loading input overlay")
input, err := cc.Compile("input.cue", inputsrc)
if err != nil {
return nil, err
@@ -58,7 +66,11 @@ func NewEnv(ctx context.Context, s Solver, bootsrc, inputsrc string) (*Env, erro
return nil, errors.Wrap(err, "merge base & input")
}
debugf("ENV: base=%q input=%q", base.JSON(), input.JSON())
lg.
Debug().
Str("base", base.JSON().String()).
Str("input", input.JSON().String()).
Msg("ENV")
return &Env{
base: base,
@@ -70,9 +82,18 @@ func NewEnv(ctx context.Context, s Solver, bootsrc, inputsrc string) (*Env, erro
// Compute missing values in env configuration, and write them to state.
func (env *Env) Compute(ctx context.Context) error {
debugf("Computing environment")
lg := log.Ctx(ctx)
output, err := env.Walk(ctx, func(c *Component, out Fillable) error {
debugf(" [Env.Compute] processing %s", c.Value().Path().String())
lg := lg.
With().
Str("path", c.Value().Path().String()).
Logger()
ctx := lg.WithContext(ctx)
lg.
Debug().
Msg("[Env.Compute] processing")
_, err := c.Compute(ctx, env.s, out)
return err
})
@@ -103,35 +124,51 @@ type EnvWalkFunc func(*Component, Fillable) error
// Walk components and return any computed values
func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
debugf("Env.Walk")
defer debugf("COMPLETE: Env.Walk")
l := sync.Mutex{}
lg := log.Ctx(ctx)
// Cueflow cue instance
// FIXME: make this cleaner in *Value by keeping intermediary instances
flowInst, err := env.base.CueInst().Fill(env.input.CueInst().Value())
if err != nil {
return nil, err
}
debugf("walking: \n----\n%s\n----\n", env.cc.Wrap(flowInst.Value(), flowInst).JSON())
// Initialize empty output
lg.
Debug().
Str("value", env.cc.Wrap(flowInst.Value(), flowInst).JSON().String()).
Msg("walking")
// Initialize empty output
out, err := env.cc.EmptyStruct()
if err != nil {
return nil, err
}
l := sync.Mutex{}
// Cueflow config
flowCfg := &cueflow.Config{
UpdateFunc: func(c *cueflow.Controller, t *cueflow.Task) error {
l.Lock()
defer l.Unlock()
debugf("compute step")
if t == nil {
return nil
}
debugf("cueflow task %q: %s", t.Path().String(), t.State().String())
lg := lg.
With().
Str("path", t.Path().String()).
Str("state", t.State().String()).
Logger()
lg.
Debug().
Msg("cueflow task")
if t.State() != cueflow.Terminated {
return nil
}
debugf("cueflow task %q: filling result", t.Path().String())
lg.Debug().Msg("cueflow task: filling result")
// Merge task value into output
var err error
// FIXME: does cueflow.Task.Value() contain only filled values,
@@ -147,7 +184,13 @@ func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
flowMatchFn := func(v cue.Value) (cueflow.Runner, error) {
l.Lock()
defer l.Unlock()
debugf("Env.Walk: processing %s", v.Path().String())
lg := lg.
With().
Str("path", v.Path().String()).
Logger()
lg.Debug().Msg("Env.Walk: processing")
val := env.cc.Wrap(v, flowInst)
c, err := val.Component()
if os.IsNotExist(err) {
@@ -160,6 +203,7 @@ func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
l.Lock()
defer l.Unlock()
return fn(c, t)
}), nil
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/moby/buildkit/client/llb"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
type Op struct {
@@ -21,14 +22,15 @@ func (op *Op) Execute(ctx context.Context, fs FS, out Fillable) (FS, error) {
return action(ctx, fs, out)
}
func (op *Op) Walk(fn func(*Op) error) error {
debugf("Walk %v", op.v)
func (op *Op) Walk(ctx context.Context, fn func(*Op) error) error {
lg := log.Ctx(ctx)
lg.Debug().Interface("v", op.v).Msg("Op.Walk")
isCopy := (op.Validate("#Copy") == nil)
isLoad := (op.Validate("#Load") == nil)
if isCopy || isLoad {
debugf("MATCH %v", op.v)
if from, err := op.Get("from").Executable(); err == nil {
if err := from.Walk(fn); err != nil {
if err := from.Walk(ctx, fn); err != nil {
return err
}
}
@@ -37,7 +39,7 @@ func (op *Op) Walk(fn func(*Op) error) error {
if err := op.Validate("#Exec"); err == nil {
return op.Get("mount").RangeStruct(func(k string, v *Value) error {
if from, err := op.Get("from").Executable(); err == nil {
if err := from.Walk(fn); err != nil {
if err := from.Walk(ctx, fn); err != nil {
return err
}
}
@@ -75,7 +77,6 @@ func (op *Op) Action() (Action, error) {
}
for def, action := range actions {
if err := op.Validate(def); err == nil {
op.v.Debugf("OP MATCH: %s", def)
return action, nil
}
}

View File

@@ -1,10 +1,13 @@
package dagger
import (
"context"
"testing"
)
func TestLocalMatch(t *testing.T) {
ctx := context.TODO()
cc := &Compiler{}
src := `do: "local", dir: "foo"`
v, err := cc.Compile("", src)
@@ -16,7 +19,7 @@ func TestLocalMatch(t *testing.T) {
t.Fatal(err)
}
n := 0
err = op.Walk(func(op *Op) error {
err = op.Walk(ctx, func(op *Op) error {
n++
return nil
})
@@ -29,6 +32,8 @@ func TestLocalMatch(t *testing.T) {
}
func TestCopyMatch(t *testing.T) {
ctx := context.TODO()
cc := &Compiler{}
src := `do: "copy", from: [{do: "local", dir: "foo"}]`
v, err := cc.Compile("", src)
@@ -43,7 +48,7 @@ func TestCopyMatch(t *testing.T) {
t.Fatal(err)
}
n := 0
err = op.Walk(func(op *Op) error {
err = op.Walk(ctx, func(op *Op) error {
n++
return nil
})

View File

@@ -41,22 +41,22 @@ func (s *Script) Execute(ctx context.Context, fs FS, out Fillable) (FS, error) {
return fs, err
}
func (s *Script) Walk(fn func(op *Op) error) error {
func (s *Script) Walk(ctx context.Context, fn func(op *Op) error) error {
return s.v.RangeList(func(idx int, v *Value) error {
op, err := v.Op()
if err != nil {
return errors.Wrapf(err, "validate op %d/%d", idx+1, s.v.Len())
}
if err := op.Walk(fn); err != nil {
if err := op.Walk(ctx, fn); err != nil {
return err
}
return nil
})
}
func (s *Script) LocalDirs() ([]string, error) {
func (s *Script) LocalDirs(ctx context.Context) ([]string, error) {
var dirs []string
err := s.Walk(func(op *Op) error {
err := s.Walk(ctx, func(op *Op) error {
if err := op.Validate("#Local"); err != nil {
return nil
}

View File

@@ -1,6 +1,7 @@
package dagger
import (
"context"
"strings"
"testing"
)
@@ -17,6 +18,8 @@ func TestValidateEmptyValue(t *testing.T) {
}
func TestLocalScript(t *testing.T) {
ctx := context.TODO()
cc := &Compiler{}
src := `[{do: "local", dir: "foo"}]`
v, err := cc.Compile("", src)
@@ -28,7 +31,7 @@ func TestLocalScript(t *testing.T) {
t.Fatal(err)
}
n := 0
err = s.Walk(func(op *Op) error {
err = s.Walk(ctx, func(op *Op) error {
n++
return nil
})
@@ -41,6 +44,8 @@ func TestLocalScript(t *testing.T) {
}
func TestWalkBootScript(t *testing.T) {
ctx := context.TODO()
cc := &Compiler{}
cfg, err := cc.Compile("clientconfig.cue", defaultBootScript)
if err != nil {
@@ -50,7 +55,7 @@ func TestWalkBootScript(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dirs, err := script.LocalDirs()
dirs, err := script.LocalDirs(ctx)
if err != nil {
t.Fatal(err)
}
@@ -64,6 +69,8 @@ func TestWalkBootScript(t *testing.T) {
func TestWalkBiggerScript(t *testing.T) {
t.Skip("FIXME")
ctx := context.TODO()
cc := &Compiler{}
script, err := cc.CompileScript("boot.cue", `
[
@@ -102,7 +109,7 @@ func TestWalkBiggerScript(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dirs, err := script.LocalDirs()
dirs, err := script.LocalDirs(ctx)
if err != nil {
t.Fatal(err)
}

View File

@@ -1,10 +1,9 @@
package dagger
import (
"fmt"
"cuelang.org/go/cue"
cueerrors "cuelang.org/go/cue/errors"
"github.com/pkg/errors"
)
// Cue spec validator
@@ -18,8 +17,7 @@ func (s Spec) Validate(v *Value, defpath string) (err error) {
// FIXME: there is probably a cleaner way to do this.
defer func() {
if err != nil {
//debugf("ERROR while validating %v against %v err=%q", v, defpath, err)
err = fmt.Errorf("%s", cueerrors.Details(err, nil))
err = errors.New(cueerrors.Details(err, nil))
}
}()

View File

@@ -7,7 +7,7 @@ import (
// Implemented by Component, Script, Op
type Executable interface {
Execute(context.Context, FS, Fillable) (FS, error)
Walk(func(*Op) error) error
Walk(context.Context, func(*Op) error) error
}
// Something which can be filled in-place with a cue value

View File

@@ -3,24 +3,14 @@ package dagger
import (
"crypto/rand"
"fmt"
"os"
"strings"
"cuelang.org/go/cue"
cueerrors "cuelang.org/go/cue/errors"
"github.com/pkg/errors"
)
func cueErr(err error) error {
return fmt.Errorf("%s", cueerrors.Details(err, &cueerrors.Config{}))
}
func debugf(msg string, args ...interface{}) {
if !strings.HasSuffix(msg, "\n") {
msg += "\n"
}
if os.Getenv("DEBUG") != "" {
fmt.Fprintf(os.Stderr, msg, args...)
}
return errors.New(cueerrors.Details(err, &cueerrors.Config{}))
}
func randomID(size int) (string, error) {

View File

@@ -319,12 +319,6 @@ func (v *Value) Compiler() *Compiler {
return v.cc
}
func (v *Value) Debugf(msg string, args ...interface{}) {
prefix := v.Path().String()
args = append([]interface{}{prefix}, args...)
debugf("%s: "+msg, args...)
}
func (v *Value) Wrap(v2 cue.Value) *Value {
return wrapValue(v2, v.inst, v.cc)
}