From 60db93a24691cfe79b65a3dc3d0c97f6062650e3 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 18:09:10 -0700 Subject: [PATCH] client: pass through the deployment directly we can't serialize Cue in a lossless way. The current architecture has the frontend serialize the state (plan, input, computed) to Cue files, then the client loads them back. We lose information by doing this, and therefore the merge fails. This change removes the abstraction so that we operate directly on the cue.Instance. Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/common/common.go | 2 +- cmd/dagger/cmd/compute.go | 18 ++-- cmd/dagger/cmd/query.go | 7 +- dagger/client.go | 42 +++------ dagger/deployment.go | 54 ++++++++---- dagger/result.go | 150 -------------------------------- 6 files changed, 66 insertions(+), 207 deletions(-) delete mode 100644 dagger/result.go diff --git a/cmd/dagger/cmd/common/common.go b/cmd/dagger/cmd/common/common.go index fde69b17..83e17d70 100644 --- a/cmd/dagger/cmd/common/common.go +++ b/cmd/dagger/cmd/common/common.go @@ -60,7 +60,7 @@ func GetCurrentDeploymentState(ctx context.Context, store *dagger.Store) *dagger } // Re-compute a deployment (equivalent to `dagger up`). -func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.DeploymentResult { +func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.Deployment { lg := log.Ctx(ctx) c, err := dagger.NewClient(ctx, "") diff --git a/cmd/dagger/cmd/compute.go b/cmd/dagger/cmd/compute.go index 8a087c7e..f6624b4f 100644 --- a/cmd/dagger/cmd/compute.go +++ b/cmd/dagger/cmd/compute.go @@ -7,9 +7,11 @@ import ( "os" "strings" + "cuelang.org/go/cue" "dagger.io/go/cmd/dagger/cmd/common" "dagger.io/go/cmd/dagger/logger" "dagger.io/go/dagger" + "dagger.io/go/dagger/compiler" "go.mozilla.org/sops/v3" "go.mozilla.org/sops/v3/decrypt" @@ -128,14 +130,20 @@ var computeCmd = &cobra.Command{ } } - result := common.DeploymentUp(ctx, st) + deployment := common.DeploymentUp(ctx, st) - cueVal, err := result.Merge() - if err != nil { - lg.Fatal().Err(err).Msg("failed to merge result") + v := compiler.NewValue() + if err := v.FillPath(cue.MakePath(), deployment.Plan()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") + } + if err := v.FillPath(cue.MakePath(), deployment.Input()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") + } + if err := v.FillPath(cue.MakePath(), deployment.Computed()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") } - fmt.Println(cueVal.JSON()) + fmt.Println(v.JSON()) }, } diff --git a/cmd/dagger/cmd/query.go b/cmd/dagger/cmd/query.go index 5d22ee12..5680371f 100644 --- a/cmd/dagger/cmd/query.go +++ b/cmd/dagger/cmd/query.go @@ -51,7 +51,8 @@ var queryCmd = &cobra.Command{ if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - result, err := c.Do(ctx, state, nil) + + deployment, err := c.Do(ctx, state, nil) if err != nil { lg.Fatal().Err(err).Msg("failed to query deployment") } @@ -59,13 +60,13 @@ var queryCmd = &cobra.Command{ cueVal := compiler.NewValue() if !viper.GetBool("no-plan") { - if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { + if err := cueVal.FillPath(cue.MakePath(), deployment.Plan()); err != nil { lg.Fatal().Err(err).Msg("failed to merge plan") } } if !viper.GetBool("no-input") { - if err := cueVal.FillPath(cue.MakePath(), result.Input()); err != nil { + if err := cueVal.FillPath(cue.MakePath(), deployment.Input()); err != nil { lg.Fatal().Err(err).Msg("failed to merge plan with output") } } diff --git a/dagger/client.go b/dagger/client.go index 84e2573b..0d5cc719 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -3,7 +3,6 @@ package dagger import ( "context" "fmt" - "io" "os" "path/filepath" "strings" @@ -18,6 +17,7 @@ import ( // buildkit bk "github.com/moby/buildkit/client" _ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver + "github.com/moby/buildkit/client/llb" bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output @@ -60,7 +60,7 @@ func NewClient(ctx context.Context, host string) (*Client, error) { type ClientDoFunc func(context.Context, *Deployment, Solver) error // FIXME: return completed *Route, instead of *compiler.Value -func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*DeploymentResult, error) { +func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*Deployment, error) { lg := log.Ctx(ctx) eg, gctx := errgroup.WithContext(ctx) @@ -79,25 +79,14 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc }) // Spawn build function - outr, outw := io.Pipe() eg.Go(func() error { - defer outw.Close() - return c.buildfn(gctx, deployment, fn, events, outw) + return c.buildfn(gctx, deployment, fn, events) }) - // Spawn output retriever - var result *DeploymentResult - eg.Go(func() error { - defer outr.Close() - - result, err = ReadDeploymentResult(gctx, outr) - return err - }) - - return result, eg.Wait() + return deployment, eg.Wait() } -func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus, w io.WriteCloser) error { +func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus) error { lg := log.Ctx(ctx) // Scan local dirs to grant access @@ -113,15 +102,6 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD // Setup solve options opts := bk.SolveOpt{ LocalDirs: localdirs, - // FIXME: catch output & return as cue value - Exports: []bk.ExportEntry{ - { - Type: bk.ExporterTar, - Output: func(m map[string]string) (io.WriteCloser, error) { - return w, nil - }, - }, - }, } // Call buildkit solver @@ -151,11 +131,13 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD span, _ := opentracing.StartSpanFromContext(ctx, "Deployment.Export") defer span.Finish() - result := deployment.Result() - st, err := result.ToLLB() - if err != nil { - return nil, err - } + computed := deployment.Computed().JSON().PrettyString() + st := llb. + Scratch(). + File( + llb.Mkfile("computed.json", 0600, []byte(computed)), + llb.WithCustomName("[internal] serializing computed values"), + ) ref, err := s.Solve(ctx, st) if err != nil { diff --git a/dagger/deployment.go b/dagger/deployment.go index dbb6ba0a..603feb57 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -19,14 +19,25 @@ import ( ) type Deployment struct { - state *DeploymentState - result *DeploymentResult + state *DeploymentState + + // Layer 1: plan configuration + plan *compiler.Value + + // Layer 2: user inputs + input *compiler.Value + + // Layer 3: computed values + computed *compiler.Value } func NewDeployment(st *DeploymentState) (*Deployment, error) { d := &Deployment{ - state: st, - result: NewDeploymentResult(), + state: st, + + plan: compiler.NewValue(), + input: compiler.NewValue(), + computed: compiler.NewValue(), } // Prepare inputs @@ -36,9 +47,9 @@ func NewDeployment(st *DeploymentState) (*Deployment, error) { return nil, err } if input.Key == "" { - err = d.result.input.FillPath(cue.MakePath(), v) + err = d.input.FillPath(cue.MakePath(), v) } else { - err = d.result.input.FillPath(cue.ParsePath(input.Key), v) + err = d.input.FillPath(cue.ParsePath(input.Key), v) } if err != nil { return nil, err @@ -60,8 +71,16 @@ func (d *Deployment) PlanSource() Input { return d.state.PlanSource } -func (d *Deployment) Result() *DeploymentResult { - return d.result +func (d *Deployment) Plan() *compiler.Value { + return d.plan +} + +func (d *Deployment) Input() *compiler.Value { + return d.input +} + +func (d *Deployment) Computed() *compiler.Value { + return d.computed } // LoadPlan loads the plan @@ -89,7 +108,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { if err != nil { return fmt.Errorf("plan config: %w", err) } - d.result.plan = plan + d.plan = plan return nil } @@ -122,7 +141,7 @@ func (d *Deployment) LocalDirs() map[string]string { } // 1. Scan the deployment state // FIXME: use a common `flow` instance to avoid rescanning the tree. - src, err := d.result.Merge() + src, err := compiler.InstanceMerge(d.plan, d.input) if err != nil { panic(err) } @@ -151,10 +170,10 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { defer span.Finish() // Reset the computed values - d.result.computed = compiler.NewValue() + d.computed = compiler.NewValue() // Cueflow cue instance - src, err := d.result.Merge() + src, err := compiler.InstanceMerge(d.plan, d.input) if err != nil { return err } @@ -163,7 +182,7 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { flow := cueflow.New( &cueflow.Config{}, src.CueInst(), - newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.result, s)), + newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.computed, s)), ) if err := flow.Run(ctx); err != nil { return err @@ -195,7 +214,7 @@ func noOpRunner(t *cueflow.Task) error { return nil } -func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) cueflow.RunnerFunc { +func newPipelineRunner(inst *cue.Instance, computed *compiler.Value, s Solver) cueflow.RunnerFunc { return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() lg := log. @@ -243,12 +262,11 @@ func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) c } // Mirror the computed values in both `Task` and `Result` - computed := p.Computed() - if computed.IsEmptyStruct() { + if p.Computed().IsEmptyStruct() { return nil } - if err := t.Fill(computed.Cue()); err != nil { + if err := t.Fill(p.Computed().Cue()); err != nil { lg. Error(). Err(err). @@ -257,7 +275,7 @@ func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) c } // Merge task value into output - if err := result.computed.FillPath(t.Path(), computed); err != nil { + if err := computed.FillPath(t.Path(), p.Computed()); err != nil { lg. Error(). Err(err). diff --git a/dagger/result.go b/dagger/result.go deleted file mode 100644 index 25e6d87d..00000000 --- a/dagger/result.go +++ /dev/null @@ -1,150 +0,0 @@ -package dagger - -import ( - "archive/tar" - "context" - "errors" - "fmt" - "io" - "strings" - - "dagger.io/go/dagger/compiler" - "github.com/moby/buildkit/client/llb" - "github.com/rs/zerolog/log" -) - -const ( - planFile = "plan.cue" - inputFile = "input.cue" - computedFile = "computed.cue" -) - -// DeploymentResult represents the layers of a deployment run -type DeploymentResult struct { - // Layer 1: plan configuration - plan *compiler.Value - - // Layer 2: user inputs - input *compiler.Value - - // Layer 3: computed values - computed *compiler.Value -} - -func NewDeploymentResult() *DeploymentResult { - return &DeploymentResult{ - plan: compiler.NewValue(), - input: compiler.NewValue(), - computed: compiler.NewValue(), - } -} - -func (r *DeploymentResult) Plan() *compiler.Value { - return r.plan -} - -func (r *DeploymentResult) Input() *compiler.Value { - return r.input -} - -func (r *DeploymentResult) Computed() *compiler.Value { - return r.computed -} - -func (r *DeploymentResult) Merge() (*compiler.Value, error) { - return compiler.InstanceMerge( - r.plan, - r.input, - r.computed, - ) -} - -func (r *DeploymentResult) ToLLB() (llb.State, error) { - st := llb.Scratch() - - planSource, err := r.plan.Source() - if err != nil { - return st, compiler.Err(err) - } - - inputSource, err := r.input.Source() - if err != nil { - return st, compiler.Err(err) - } - - outputSource, err := r.computed.Source() - if err != nil { - return st, compiler.Err(err) - } - - st = st. - File( - llb.Mkfile(planFile, 0600, planSource), - llb.WithCustomName("[internal] serializing plan"), - ). - File( - llb.Mkfile(inputFile, 0600, inputSource), - llb.WithCustomName("[internal] serializing input"), - ). - File( - llb.Mkfile(computedFile, 0600, outputSource), - llb.WithCustomName("[internal] serializing output"), - ) - - return st, nil -} - -func ReadDeploymentResult(ctx context.Context, r io.Reader) (*DeploymentResult, error) { - lg := log.Ctx(ctx) - result := NewDeploymentResult() - tr := tar.NewReader(r) - - for { - h, err := tr.Next() - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, fmt.Errorf("read tar stream: %w", err) - } - - lg := lg. - With(). - Str("file", h.Name). - Logger() - - if !strings.HasSuffix(h.Name, ".cue") { - lg.Debug().Msg("skipping non-cue file from exporter tar stream") - continue - } - - lg.Debug().Msg("outputfn: compiling") - - src, err := io.ReadAll(tr) - if err != nil { - return nil, err - } - - v, err := compiler.Compile(h.Name, src) - if err != nil { - lg. - Debug(). - Err(compiler.Err(err)). - Bytes("src", src). - Msg("invalid result file") - return nil, fmt.Errorf("failed to compile result: %w", compiler.Err(err)) - } - - switch h.Name { - case planFile: - result.plan = v - case inputFile: - result.input = v - case computedFile: - result.computed = v - default: - lg.Warn().Msg("unexpected file") - } - } - return result, nil -}