From 167edf4a763661fda21929c75be20ed95cca1402 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 2 Feb 2021 11:03:12 -0800 Subject: [PATCH 1/4] simplify: remove compute.go Signed-off-by: Andrea Luzzardi --- dagger/client.go | 63 +++++++++++++++++++++++++---------------------- dagger/compute.go | 58 ------------------------------------------- 2 files changed, 34 insertions(+), 87 deletions(-) delete mode 100644 dagger/compute.go diff --git a/dagger/client.go b/dagger/client.go index 02267eb4..71dfaff9 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -20,6 +20,7 @@ import ( // buildkit bk "github.com/moby/buildkit/client" _ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver + bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output "github.com/containerd/console" @@ -28,15 +29,11 @@ import ( const ( defaultBuildkitHost = "docker-container://buildkitd" - bkUpdaterKey = "updater" - bkInputKey = "input" ) // A dagger client type Client struct { c *bk.Client - - localdirs map[string]string } func NewClient(ctx context.Context, host string) (*Client, error) { @@ -64,19 +61,6 @@ func (c *Client) Compute(ctx context.Context, env *Env) (o *Value, err error) { err = cueErr(err) } }() - // Scan local dirs to grant access - localdirs, err := env.LocalDirs(ctx) - if err != nil { - return nil, errors.Wrap(err, "scan local dirs") - } - for label, dir := range localdirs { - abs, err := filepath.Abs(dir) - if err != nil { - return nil, err - } - localdirs[label] = abs - } - c.localdirs = localdirs // FIXME: merge this into env output. out, err := env.Compiler().EmptyStruct() @@ -136,22 +120,22 @@ func (c *Client) Compute(ctx context.Context, env *Env) (o *Value, err error) { func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, w io.WriteCloser) error { lg := log.Ctx(ctx) - // Serialize input and updater - input, err := env.Input().SourceString() + // Scan local dirs to grant access + localdirs, err := env.LocalDirs(ctx) if err != nil { - return errors.Wrap(err, "serialize env input") + return errors.Wrap(err, "scan local dirs") } - updater, err := env.Updater().Value().SourceString() - if err != nil { - return errors.Wrap(err, "serialize updater script") + for label, dir := range localdirs { + abs, err := filepath.Abs(dir) + if err != nil { + return err + } + localdirs[label] = abs } + // Setup solve options opts := bk.SolveOpt{ - FrontendAttrs: map[string]string{ - bkInputKey: input, - bkUpdaterKey: updater, - }, - LocalDirs: c.localdirs, + LocalDirs: localdirs, // FIXME: catch output & return as cue value Exports: []bk.ExportEntry{ { @@ -162,12 +146,33 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, }, }, } + // Call buildkit solver lg.Debug(). Interface("localdirs", opts.LocalDirs). Interface("attrs", opts.FrontendAttrs). Msg("spawning buildkit job") - resp, err := c.c.Build(ctx, opts, "", Compute, ch) + + resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) { + s := NewSolver(c) + + if err := env.Update(ctx, s); err != nil { + return nil, err + } + lg.Debug().Msg("computing env") + // Compute output overlay + if err := env.Compute(ctx, s); err != nil { + return nil, err + } + lg.Debug().Msg("exporting env") + // Export env to a cue directory + outdir, err := env.Export(s.Scratch()) + if err != nil { + return nil, err + } + // Wrap cue directory in buildkit result + return outdir.Result(ctx) + }, ch) if err != nil { return errors.Wrap(bkCleanError(err), "buildkit solve") } diff --git a/dagger/compute.go b/dagger/compute.go deleted file mode 100644 index a8d30dec..00000000 --- a/dagger/compute.go +++ /dev/null @@ -1,58 +0,0 @@ -package dagger - -import ( - "context" - "fmt" - - cueerrors "cuelang.org/go/cue/errors" - bkgw "github.com/moby/buildkit/frontend/gateway/client" - "github.com/rs/zerolog/log" -) - -// Buildkit compute entrypoint (BK calls if "solve" or "build") -// Use by wrapping in a buildkit client Build call, or buildkit frontend. -func Compute(ctx context.Context, c bkgw.Client) (r *bkgw.Result, err error) { - lg := log.Ctx(ctx) - // FIXME: wrap errors to avoid crashing buildkit Build() - // with cue error types (why??) - defer func() { - if err != nil { - err = fmt.Errorf("%s", cueerrors.Details(err, nil)) - } - }() - - s := NewSolver(c) - // Retrieve updater script form client - var updater interface{} - if o, exists := c.BuildOpts().Opts[bkUpdaterKey]; exists { - updater = o - } - env, err := NewEnv() - if err != nil { - return nil, err - } - if err := env.SetUpdater(updater); err != nil { - return nil, err - } - if err := env.Update(ctx, s); err != nil { - return nil, err - } - if input, exists := c.BuildOpts().Opts["input"]; exists { - if err := env.SetInput(input); err != nil { - return nil, err - } - } - lg.Debug().Msg("computing env") - // Compute output overlay - if err := env.Compute(ctx, s); err != nil { - return nil, err - } - lg.Debug().Msg("exporting env") - // Export env to a cue directory - outdir, err := env.Export(s.Scratch()) - if err != nil { - return nil, err - } - // Wrap cue directory in buildkit result - return outdir.Result(ctx) -} From c9e0d0854d62967207ca15b63ea01267a910394f Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 2 Feb 2021 11:10:47 -0800 Subject: [PATCH 2/4] client: remove unused printer Signed-off-by: Andrea Luzzardi --- dagger/client.go | 139 +---------------------------------------------- 1 file changed, 3 insertions(+), 136 deletions(-) diff --git a/dagger/client.go b/dagger/client.go index 71dfaff9..e9e5f375 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -3,7 +3,6 @@ package dagger import ( "archive/tar" "context" - "fmt" "io" "os" "path/filepath" @@ -15,7 +14,6 @@ import ( "github.com/rs/zerolog/log" // Cue - "cuelang.org/go/cue" // buildkit bk "github.com/moby/buildkit/client" @@ -79,33 +77,11 @@ func (c *Client) Compute(ctx context.Context, env *Env) (o *Value, err error) { return c.buildfn(ctx, env, events, outw) }) - // Spawn print function(s) - dispCtx := context.TODO() + // Spawn print function if os.Getenv("DOCKER_OUTPUT") != "" { - // Multiplex events - eventsPrint := make(chan *bk.SolveStatus) - eventsDockerPrint := make(chan *bk.SolveStatus) eg.Go(func() error { - defer close(eventsPrint) - defer close(eventsDockerPrint) - - for e := range events { - eventsPrint <- e - eventsDockerPrint <- e - } - return nil - }) - - eg.Go(func() error { - return c.printfn(dispCtx, eventsPrint) - }) - - eg.Go(func() error { - return c.dockerprintfn(dispCtx, eventsDockerPrint, lg) - }) - } else { - eg.Go(func() error { - return c.printfn(dispCtx, events) + dispCtx := context.TODO() + return c.dockerprintfn(dispCtx, events, lg) }) } @@ -223,115 +199,6 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp return nil } -// Status of a node in the config tree being computed -// Node may be a component, or a value within a component -// (eg. a script or individual operation in a script) -type Node struct { - Path cue.Path - *bk.Vertex -} - -func (n Node) ComponentPath() cue.Path { - parts := []cue.Selector{} - for _, sel := range n.Path.Selectors() { - if strings.HasPrefix(sel.String(), "#") { - break - } - parts = append(parts, sel) - } - return cue.MakePath(parts...) -} - -func (n Node) Logf(ctx context.Context, msg string, args ...interface{}) { - componentPath := n.ComponentPath().String() - args = append([]interface{}{componentPath}, args...) - if msg != "" && !strings.HasSuffix(msg, "\n") { - msg += "\n" - } - fmt.Fprintf(os.Stderr, "[%s] "+msg, args...) -} - -func (n Node) LogStream(ctx context.Context, nStream int, data []byte) { - lg := log. - Ctx(ctx). - With(). - Str("path", n.ComponentPath().String()). - Logger() - - switch nStream { - case 1: - lg = lg.With().Str("stream", "stdout").Logger() - case 2: - lg = lg.With().Str("stream", "stderr").Logger() - default: - lg = lg.With().Str("stream", fmt.Sprintf("%d", nStream)).Logger() - } - - lg.Debug().Msg(string(data)) -} - -func (n Node) LogError(ctx context.Context, errmsg string) { - log. - Ctx(ctx). - Error(). - Str("path", n.ComponentPath().String()). - Msg(errmsg) -} - -func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error { - lg := log.Ctx(ctx) - - // Node status mapped to buildkit vertex digest - nodesByDigest := map[string]*Node{} - // Node status mapped to cue path - nodesByPath := map[string]*Node{} - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case status, ok := <-ch: - if !ok { - return nil - } - lg. - Debug(). - Int("vertexes", len(status.Vertexes)). - Int("statuses", len(status.Statuses)). - Int("logs", len(status.Logs)). - Msg("status event") - - for _, v := range status.Vertexes { - // FIXME: insert raw buildkit telemetry here (ie for debugging, etc.) - - // IF a buildkit vertex has a valid cue path as name, extract additional info: - p := cue.ParsePath(v.Name) - if err := p.Err(); err != nil { - // Not a valid cue path: skip. - continue - } - n := &Node{ - Path: p, - Vertex: v, - } - nodesByPath[n.Path.String()] = n - nodesByDigest[n.Digest.String()] = n - if n.Error != "" { - n.LogError(ctx, n.Error) - } - } - for _, log := range status.Logs { - if n, ok := nodesByDigest[log.Vertex.String()]; ok { - n.LogStream(ctx, log.Stream, log.Data) - } - } - // debugJSON(status) - // FIXME: callbacks for extracting stream/result - // see proto 67 - } - } -} - func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error { var cons console.Console // FIXME: use smarter writer from blr From cd0f21dbd290a1348b402c894c1e05acebe1e201 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 2 Feb 2021 11:32:35 -0800 Subject: [PATCH 3/4] client: simplify Compute logic Signed-off-by: Andrea Luzzardi --- dagger/client.go | 62 ++++++++++++++++++++++++------------------------ dagger/utils.go | 3 +++ 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/dagger/client.go b/dagger/client.go index e9e5f375..9c6c802c 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -51,46 +51,40 @@ func NewClient(ctx context.Context, host string) (*Client, error) { } // FIXME: return completed *Env, instead of *Value -func (c *Client) Compute(ctx context.Context, env *Env) (o *Value, err error) { +func (c *Client) Compute(ctx context.Context, env *Env) (*Value, error) { lg := log.Ctx(ctx) - defer func() { - if err != nil { - // Expand cue errors to get full details - err = cueErr(err) - } - }() - // FIXME: merge this into env output. - out, err := env.Compiler().EmptyStruct() - if err != nil { - return nil, err - } - - // Spawn Build() goroutine - eg, ctx := errgroup.WithContext(ctx) - events := make(chan *bk.SolveStatus) - outr, outw := io.Pipe() - - // Spawn build function - eg.Go(func() error { - defer outw.Close() - return c.buildfn(ctx, env, events, outw) - }) + eg, gctx := errgroup.WithContext(ctx) // Spawn print function + var events chan *bk.SolveStatus if os.Getenv("DOCKER_OUTPUT") != "" { + events = make(chan *bk.SolveStatus) eg.Go(func() error { dispCtx := context.TODO() return c.dockerprintfn(dispCtx, events, lg) }) } - // Retrieve output + // Spawn build function + outr, outw := io.Pipe() + eg.Go(func() error { + defer outw.Close() + return c.buildfn(gctx, env, events, outw) + }) + + // Spawn output retriever + var ( + out *Value + err error + ) eg.Go(func() error { defer outr.Close() - return c.outputfn(ctx, outr, out, env.cc) + out, err = c.outputfn(gctx, outr, env.cc) + return err }) - return out, eg.Wait() + + return out, cueErr(eg.Wait()) } func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, w io.WriteCloser) error { @@ -164,9 +158,15 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, } // Read tar export stream from buildkit Build(), and extract cue output -func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Compiler) error { +func (c *Client) outputfn(ctx context.Context, r io.Reader, cc *Compiler) (*Value, error) { lg := log.Ctx(ctx) + // FIXME: merge this into env output. + out, err := cc.EmptyStruct() + if err != nil { + return nil, err + } + tr := tar.NewReader(r) for { h, err := tr.Next() @@ -174,7 +174,7 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp break } if err != nil { - return errors.Wrap(err, "read tar stream") + return nil, errors.Wrap(err, "read tar stream") } lg := lg. @@ -190,13 +190,13 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp v, err := cc.Compile(h.Name, tr) if err != nil { - return err + return nil, err } if err := out.Fill(v); err != nil { - return errors.Wrap(err, h.Name) + return nil, errors.Wrap(err, h.Name) } } - return nil + return out, nil } func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error { diff --git a/dagger/utils.go b/dagger/utils.go index a6e517b7..cda26e20 100644 --- a/dagger/utils.go +++ b/dagger/utils.go @@ -10,6 +10,9 @@ import ( ) func cueErr(err error) error { + if err == nil { + return nil + } return errors.New(cueerrors.Details(err, &cueerrors.Config{})) } From 86aa031f9234e6ba37b6ebd102699a488df0fe2c Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 2 Feb 2021 13:52:52 -0800 Subject: [PATCH 4/4] fix concurrency issues in Compiler.Compile and Value.Source Signed-off-by: Andrea Luzzardi --- dagger/compiler.go | 3 +++ dagger/value.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/dagger/compiler.go b/dagger/compiler.go index 711ee8b6..ba84590f 100644 --- a/dagger/compiler.go +++ b/dagger/compiler.go @@ -48,6 +48,9 @@ func (cc *Compiler) EmptyStruct() (*Value, error) { } func (cc *Compiler) Compile(name string, src interface{}) (*Value, error) { + cc.Lock() + defer cc.Unlock() + inst, err := cc.Cue().Compile(name, src) if err != nil { // FIXME: cleaner way to unwrap cue error details? diff --git a/dagger/value.go b/dagger/value.go index d2823650..cf4056f5 100644 --- a/dagger/value.go +++ b/dagger/value.go @@ -255,6 +255,9 @@ func (v *Value) Validate(defs ...string) error { // Return cue source for this value func (v *Value) Source() ([]byte, error) { + v.cc.RLock() + defer v.cc.RUnlock() + return cueformat.Node(v.val.Eval().Syntax()) }