From 5381d0bfe1e43ce797dde074973c07a74b27467f Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 6 Apr 2021 17:43:12 -0700 Subject: [PATCH 1/9] up: store outputs and merge in query Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/common/common.go | 12 +- cmd/dagger/cmd/compute.go | 10 +- cmd/dagger/cmd/new.go | 2 +- cmd/dagger/cmd/query.go | 34 +++++- cmd/dagger/cmd/up.go | 6 +- dagger/client.go | 61 ++-------- dagger/deployment.go | 190 +++++++------------------------- dagger/result.go | 161 +++++++++++++++++++++++++++ dagger/state.go | 56 ++++++++++ 9 files changed, 314 insertions(+), 218 deletions(-) create mode 100644 dagger/result.go create mode 100644 dagger/state.go diff --git a/cmd/dagger/cmd/common/common.go b/cmd/dagger/cmd/common/common.go index 2014c224..fde69b17 100644 --- a/cmd/dagger/cmd/common/common.go +++ b/cmd/dagger/cmd/common/common.go @@ -2,7 +2,6 @@ package common import ( "context" - "fmt" "os" "dagger.io/go/dagger" @@ -61,22 +60,19 @@ func GetCurrentDeploymentState(ctx context.Context, store *dagger.Store) *dagger } // Re-compute a deployment (equivalent to `dagger up`). -// If printOutput is true, print the JSON-encoded computed state to standard output -func DeploymentUp(ctx context.Context, state *dagger.DeploymentState, printOutput bool) { +func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.DeploymentResult { lg := log.Ctx(ctx) c, err := dagger.NewClient(ctx, "") if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - output, err := c.Do(ctx, state, func(ctx context.Context, deployment *dagger.Deployment, s dagger.Solver) error { + result, err := c.Do(ctx, state, func(ctx context.Context, deployment *dagger.Deployment, s dagger.Solver) error { log.Ctx(ctx).Debug().Msg("bringing deployment up") - return deployment.Up(ctx, s, nil) + return deployment.Up(ctx, s) }) if err != nil { lg.Fatal().Err(err).Msg("failed to up deployment") } - if printOutput { - fmt.Println(output.JSON()) - } + return result } diff --git a/cmd/dagger/cmd/compute.go b/cmd/dagger/cmd/compute.go index 033e5f38..8a087c7e 100644 --- a/cmd/dagger/cmd/compute.go +++ b/cmd/dagger/cmd/compute.go @@ -3,6 +3,7 @@ package cmd import ( "encoding/json" "errors" + "fmt" "os" "strings" @@ -127,7 +128,14 @@ var computeCmd = &cobra.Command{ } } - common.DeploymentUp(ctx, st, true) + result := common.DeploymentUp(ctx, st) + + cueVal, err := result.Merge() + if err != nil { + lg.Fatal().Err(err).Msg("failed to merge result") + } + + fmt.Println(cueVal.JSON()) }, } diff --git a/cmd/dagger/cmd/new.go b/cmd/dagger/cmd/new.go index 9c3e3966..90d68468 100644 --- a/cmd/dagger/cmd/new.go +++ b/cmd/dagger/cmd/new.go @@ -63,7 +63,7 @@ var newCmd = &cobra.Command{ Msg("deployment created") if viper.GetBool("up") { - common.DeploymentUp(ctx, st, false) + common.DeploymentUp(ctx, st) } }, } diff --git a/cmd/dagger/cmd/query.go b/cmd/dagger/cmd/query.go index 2e4a153d..7dc71e10 100644 --- a/cmd/dagger/cmd/query.go +++ b/cmd/dagger/cmd/query.go @@ -51,12 +51,36 @@ var queryCmd = &cobra.Command{ if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - output, err := c.Do(ctx, state, nil) + result, err := c.Do(ctx, state, nil) if err != nil { lg.Fatal().Err(err).Msg("failed to query deployment") } - cueVal := output.LookupPath(cuePath) + cueVal := compiler.EmptyStruct() + + if !viper.GetBool("no-plan") { + if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge plan") + } + } + + if !viper.GetBool("no-input") { + if err := cueVal.FillPath(cue.MakePath(), result.Input()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge plan with output") + } + } + + if !viper.GetBool("no-computed") && state.Computed != "" { + computed, err := compiler.DecodeJSON("", []byte(state.Computed)) + if err != nil { + lg.Fatal().Err(err).Msg("failed to decode json") + } + if err := cueVal.FillPath(cue.MakePath(), computed); err != nil { + lg.Fatal().Err(err).Msg("failed to merge plan with computed") + } + } + + cueVal = cueVal.LookupPath(cuePath) if viper.GetBool("concrete") { if err := cueVal.IsConcreteR(); err != nil { @@ -116,9 +140,9 @@ func init() { // FIXME: implement the flags below // queryCmd.Flags().String("revision", "latest", "Query a specific version of the deployment") queryCmd.Flags().StringP("format", "f", "json", "Output format (json|yaml|cue|text|env)") - // queryCmd.Flags().BoolP("no-input", "I", false, "Exclude inputs from query") - // queryCmd.Flags().BoolP("no-output", "O", false, "Exclude outputs from query") - // queryCmd.Flags().BoolP("no-plan", "P", false, "Exclude outputs from query") + queryCmd.Flags().BoolP("no-plan", "P", false, "Exclude plan from query") + queryCmd.Flags().BoolP("no-input", "I", false, "Exclude inputs from query") + queryCmd.Flags().BoolP("no-computed", "C", false, "Exclude computed values from query") if err := viper.BindPFlags(queryCmd.Flags()); err != nil { panic(err) diff --git a/cmd/dagger/cmd/up.go b/cmd/dagger/cmd/up.go index 8d75cc9f..7d6175e8 100644 --- a/cmd/dagger/cmd/up.go +++ b/cmd/dagger/cmd/up.go @@ -31,7 +31,11 @@ var upCmd = &cobra.Command{ state := common.GetCurrentDeploymentState(ctx, store) // TODO: Implement options: --no-cache - common.DeploymentUp(ctx, state, true) + result := common.DeploymentUp(ctx, state) + state.Computed = result.Computed().JSON().String() + if err := store.UpdateDeployment(ctx, state, nil); err != nil { + lg.Fatal().Err(err).Msg("failed to update deployment") + } }, } diff --git a/dagger/client.go b/dagger/client.go index 68a44a1c..a073d9e5 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -1,16 +1,13 @@ package dagger import ( - "archive/tar" "context" - "errors" "fmt" "io" "os" "path/filepath" "strings" - "cuelang.org/go/cue" "golang.org/x/sync/errgroup" "github.com/opentracing/opentracing-go" @@ -21,7 +18,6 @@ import ( // buildkit bk "github.com/moby/buildkit/client" _ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver - "github.com/moby/buildkit/client/llb" bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output @@ -64,7 +60,7 @@ func NewClient(ctx context.Context, host string) (*Client, error) { type ClientDoFunc func(context.Context, *Deployment, Solver) error // FIXME: return completed *Route, instead of *compiler.Value -func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*compiler.Value, error) { +func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*DeploymentResult, error) { lg := log.Ctx(ctx) eg, gctx := errgroup.WithContext(ctx) @@ -90,14 +86,15 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc }) // Spawn output retriever - var out *compiler.Value + var result *DeploymentResult eg.Go(func() error { defer outr.Close() - out, err = c.outputfn(gctx, outr) + + result, err = DeploymentResultFromTar(gctx, outr) return err }) - return out, eg.Wait() + return result, eg.Wait() } func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus, w io.WriteCloser) error { @@ -154,15 +151,12 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD span, _ := opentracing.StartSpanFromContext(ctx, "Deployment.Export") defer span.Finish() - stateSource, err := deployment.State().Source() + result := deployment.Result() + st, err := result.ToLLB() if err != nil { - return nil, compiler.Err(err) + return nil, err } - st := llb.Scratch().File( - llb.Mkfile("state.cue", 0600, stateSource), - llb.WithCustomName("[internal] serializing state to CUE"), - ) ref, err := s.Solve(ctx, st) if err != nil { return nil, err @@ -185,45 +179,6 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD return nil } -// Read tar export stream from buildkit Build(), and extract cue output -func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, error) { - lg := log.Ctx(ctx) - - // FIXME: merge this into deployment output. - out := compiler.EmptyStruct() - - tr := tar.NewReader(r) - for { - h, err := tr.Next() - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, fmt.Errorf("read tar stream: %w", err) - } - - lg := lg. - With(). - Str("file", h.Name). - Logger() - - if !strings.HasSuffix(h.Name, ".cue") { - lg.Debug().Msg("skipping non-cue file from exporter tar stream") - continue - } - lg.Debug().Msg("outputfn: compiling & merging") - - v, err := compiler.Compile(h.Name, tr) - if err != nil { - return nil, err - } - if err := out.FillPath(cue.MakePath(), v); err != nil { - return nil, fmt.Errorf("%s: %w", h.Name, compiler.Err(err)) - } - } - return out, nil -} - func (c *Client) logSolveStatus(ctx context.Context, ch chan *bk.SolveStatus) error { parseName := func(v *bk.Vertex) (string, string) { // Pattern: `@name@ message`. Minimal length is len("@X@ ") diff --git a/dagger/deployment.go b/dagger/deployment.go index 5dfd5dbd..4a1a85cd 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -18,80 +18,15 @@ import ( "github.com/rs/zerolog/log" ) -// Contents of a deployment serialized to a file -type DeploymentState struct { - // Globally unique deployment ID - ID string `json:"id,omitempty"` - - // Human-friendly deployment name. - // A deployment may have more than one name. - // FIXME: store multiple names? - Name string `json:"name,omitempty"` - - // Cue module containing the deployment plan - // The input's top-level artifact is used as a module directory. - PlanSource Input `json:"plan,omitempty"` - - Inputs []inputKV `json:"inputs,omitempty"` -} - -type inputKV struct { - Key string `json:"key,omitempty"` - Value Input `json:"value,omitempty"` -} - -func (s *DeploymentState) SetInput(key string, value Input) error { - for i, inp := range s.Inputs { - if inp.Key != key { - continue - } - // Remove existing inputs with the same key - s.Inputs = append(s.Inputs[:i], s.Inputs[i+1:]...) - } - - s.Inputs = append(s.Inputs, inputKV{Key: key, Value: value}) - return nil -} - -// Remove all inputs at the given key, including sub-keys. -// For example RemoveInputs("foo.bar") will remove all inputs -// at foo.bar, foo.bar.baz, etc. -func (s *DeploymentState) RemoveInputs(key string) error { - newInputs := make([]inputKV, 0, len(s.Inputs)) - for _, i := range s.Inputs { - if i.Key == key { - continue - } - newInputs = append(newInputs, i) - } - s.Inputs = newInputs - - return nil -} - type Deployment struct { - st *DeploymentState - - // Layer 1: plan configuration - plan *compiler.Value - - // Layer 2: user inputs - input *compiler.Value - - // Layer 3: computed values - output *compiler.Value - - // All layers merged together: plan + input + output - state *compiler.Value + state *DeploymentState + result *DeploymentResult } func NewDeployment(st *DeploymentState) (*Deployment, error) { - empty := compiler.EmptyStruct() d := &Deployment{ - st: st, - plan: empty, - input: empty, - output: empty, + state: st, + result: NewDeploymentResult(), } // Prepare inputs @@ -101,47 +36,32 @@ func NewDeployment(st *DeploymentState) (*Deployment, error) { return nil, err } if input.Key == "" { - err = d.input.FillPath(cue.MakePath(), v) + err = d.result.input.FillPath(cue.MakePath(), v) } else { - err = d.input.FillPath(cue.ParsePath(input.Key), v) + err = d.result.input.FillPath(cue.ParsePath(input.Key), v) } if err != nil { return nil, err } } - if err := d.mergeState(); err != nil { - return nil, err - } return d, nil } func (d *Deployment) ID() string { - return d.st.ID + return d.state.ID } func (d *Deployment) Name() string { - return d.st.Name + return d.state.Name } func (d *Deployment) PlanSource() Input { - return d.st.PlanSource + return d.state.PlanSource } -func (d *Deployment) Plan() *compiler.Value { - return d.plan -} - -func (d *Deployment) Input() *compiler.Value { - return d.input -} - -func (d *Deployment) Output() *compiler.Value { - return d.output -} - -func (d *Deployment) State() *compiler.Value { - return d.state +func (d *Deployment) Result() *DeploymentResult { + return d.result } // LoadPlan loads the plan @@ -149,7 +69,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.LoadPlan") defer span.Finish() - planSource, err := d.st.PlanSource.Compile() + planSource, err := d.state.PlanSource.Compile() if err != nil { return err } @@ -169,10 +89,9 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { if err != nil { return fmt.Errorf("plan config: %w", err) } - d.plan = plan + d.result.plan = plan - // Commit - return d.mergeState() + return nil } // Scan all scripts in the deployment for references to local directories (do:"local"), @@ -203,15 +122,22 @@ func (d *Deployment) LocalDirs() map[string]string { } // 1. Scan the deployment state // FIXME: use a common `flow` instance to avoid rescanning the tree. - inst := d.state.CueInst() - flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner)) + src, err := d.result.Merge() + if err != nil { + panic(err) + } + flow := cueflow.New( + &cueflow.Config{}, + src.CueInst(), + newTaskFunc(src.CueInst(), noOpRunner), + ) for _, t := range flow.Tasks() { - v := compiler.Wrap(t.Value(), inst) + v := compiler.Wrap(t.Value(), src.CueInst()) localdirs(v.Lookup("#up")) } // 2. Scan the plan - plan, err := d.st.PlanSource.Compile() + plan, err := d.state.PlanSource.Compile() if err != nil { panic(err) } @@ -219,54 +145,21 @@ func (d *Deployment) LocalDirs() map[string]string { return dirs } -// FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge. -func (d *Deployment) mergeState() error { - // FIXME: make this cleaner in *compiler.Value by keeping intermediary instances - // FIXME: state.CueInst() must return an instance with the same - // contents as state.v, for the purposes of cueflow. - // That is not currently how *compiler.Value works, so we prepare the cue - // instance manually. - // --> refactor the compiler.Value API to do this for us. - var ( - state = compiler.EmptyStruct() - stateInst = state.CueInst() - err error - ) - - stateInst, err = stateInst.Fill(d.plan.Cue()) - if err != nil { - return fmt.Errorf("merge base & input: %w", err) - } - stateInst, err = stateInst.Fill(d.input.Cue()) - if err != nil { - return fmt.Errorf("merge base & input: %w", err) - } - stateInst, err = stateInst.Fill(d.output.Cue()) - if err != nil { - return fmt.Errorf("merge output with base & input: %w", err) - } - - state = compiler.Wrap(stateInst.Value(), stateInst) - - // commit - d.state = state - return nil -} - -type UpOpts struct{} - // Up missing values in deployment configuration, and write them to state. -func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "r.Compute") +func (d *Deployment) Up(ctx context.Context, s Solver) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.Up") defer span.Finish() lg := log.Ctx(ctx) - // Cueflow cue instance - inst := d.state.CueInst() + // Reset the computed values + d.result.computed = compiler.EmptyStruct() - // Reset the output - d.output = compiler.EmptyStruct() + // Cueflow cue instance + src, err := d.result.Merge() + if err != nil { + return err + } // Cueflow config flowCfg := &cueflow.Config{ @@ -285,7 +178,7 @@ func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error { return nil } // Merge task value into output - err := d.output.FillPath(t.Path(), t.Value()) + err := d.result.computed.FillPath(t.Path(), t.Value()) if err != nil { lg. Error(). @@ -297,17 +190,16 @@ func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error { }, } // Orchestrate execution with cueflow - flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s))) + flow := cueflow.New( + flowCfg, + src.CueInst(), + newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), s)), + ) if err := flow.Run(ctx); err != nil { return err } - { - span, _ := opentracing.StartSpanFromContext(ctx, "merge state") - defer span.Finish() - - return d.mergeState() - } + return nil } type DownOpts struct{} diff --git a/dagger/result.go b/dagger/result.go new file mode 100644 index 00000000..905c3f52 --- /dev/null +++ b/dagger/result.go @@ -0,0 +1,161 @@ +package dagger + +import ( + "archive/tar" + "context" + "errors" + "fmt" + "io" + "strings" + + "dagger.io/go/dagger/compiler" + "github.com/moby/buildkit/client/llb" + "github.com/rs/zerolog/log" +) + +const ( + planFile = "plan.cue" + inputFile = "input.cue" + computedFile = "computed.cue" +) + +// DeploymentResult represents the layers of a deployment run +type DeploymentResult struct { + // Layer 1: plan configuration + plan *compiler.Value + + // Layer 2: user inputs + input *compiler.Value + + // Layer 3: computed values + computed *compiler.Value +} + +func NewDeploymentResult() *DeploymentResult { + return &DeploymentResult{ + plan: compiler.EmptyStruct(), + input: compiler.EmptyStruct(), + computed: compiler.EmptyStruct(), + } +} + +func (r *DeploymentResult) Plan() *compiler.Value { + return r.plan +} + +func (r *DeploymentResult) Input() *compiler.Value { + return r.input +} + +func (r *DeploymentResult) Computed() *compiler.Value { + return r.computed +} + +func (r *DeploymentResult) Merge() (*compiler.Value, error) { + // FIXME: v.CueInst() must return an instance with the same + // contents as v, for the purposes of cueflow. + // That is not currently how *compiler.Value works, so we prepare the cue + // instance manually. + // --> refactor the compiler.Value API to do this for us. + var ( + v = compiler.EmptyStruct() + inst = v.CueInst() + err error + ) + + inst, err = inst.Fill(r.plan.Cue()) + if err != nil { + return nil, fmt.Errorf("merge plan: %w", err) + } + inst, err = inst.Fill(r.input.Cue()) + if err != nil { + return nil, fmt.Errorf("merge input: %w", err) + } + inst, err = inst.Fill(r.computed.Cue()) + if err != nil { + return nil, fmt.Errorf("merge computed: %w", err) + } + + v = compiler.Wrap(inst.Value(), inst) + return v, nil +} + +func (r *DeploymentResult) ToLLB() (llb.State, error) { + st := llb.Scratch() + + planSource, err := r.plan.Source() + if err != nil { + return st, compiler.Err(err) + } + + inputSource, err := r.input.Source() + if err != nil { + return st, compiler.Err(err) + } + + outputSource, err := r.computed.Source() + if err != nil { + return st, compiler.Err(err) + } + + st = st. + File( + llb.Mkfile(planFile, 0600, planSource), + llb.WithCustomName("[internal] serializing plan"), + ). + File( + llb.Mkfile(inputFile, 0600, inputSource), + llb.WithCustomName("[internal] serializing input"), + ). + File( + llb.Mkfile(computedFile, 0600, outputSource), + llb.WithCustomName("[internal] serializing output"), + ) + + return st, nil +} + +func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResult, error) { + lg := log.Ctx(ctx) + result := NewDeploymentResult() + tr := tar.NewReader(r) + + for { + h, err := tr.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("read tar stream: %w", err) + } + + lg := lg. + With(). + Str("file", h.Name). + Logger() + + if !strings.HasSuffix(h.Name, ".cue") { + lg.Debug().Msg("skipping non-cue file from exporter tar stream") + continue + } + + lg.Debug().Msg("outputfn: compiling") + + v, err := compiler.Compile(h.Name, tr) + if err != nil { + return nil, err + } + + switch h.Name { + case planFile: + result.plan = v + case inputFile: + result.input = v + case computedFile: + result.computed = v + default: + lg.Warn().Msg("unexpected file") + } + } + return result, nil +} diff --git a/dagger/state.go b/dagger/state.go new file mode 100644 index 00000000..debe95a9 --- /dev/null +++ b/dagger/state.go @@ -0,0 +1,56 @@ +package dagger + +// Contents of a deployment serialized to a file +type DeploymentState struct { + // Globally unique deployment ID + ID string `json:"id,omitempty"` + + // Human-friendly deployment name. + // A deployment may have more than one name. + // FIXME: store multiple names? + Name string `json:"name,omitempty"` + + // Cue module containing the deployment plan + // The input's top-level artifact is used as a module directory. + PlanSource Input `json:"plan,omitempty"` + + // User Inputs + Inputs []inputKV `json:"inputs,omitempty"` + + // Computed values + Computed string `json:"output,omitempty"` +} + +type inputKV struct { + Key string `json:"key,omitempty"` + Value Input `json:"value,omitempty"` +} + +func (s *DeploymentState) SetInput(key string, value Input) error { + for i, inp := range s.Inputs { + if inp.Key != key { + continue + } + // Remove existing inputs with the same key + s.Inputs = append(s.Inputs[:i], s.Inputs[i+1:]...) + } + + s.Inputs = append(s.Inputs, inputKV{Key: key, Value: value}) + return nil +} + +// Remove all inputs at the given key, including sub-keys. +// For example RemoveInputs("foo.bar") will remove all inputs +// at foo.bar, foo.bar.baz, etc. +func (s *DeploymentState) RemoveInputs(key string) error { + newInputs := make([]inputKV, 0, len(s.Inputs)) + for _, i := range s.Inputs { + if i.Key == key { + continue + } + newInputs = append(newInputs, i) + } + s.Inputs = newInputs + + return nil +} From e54f1b0c3ab3aaab72e5b4011a0ba7a4d630b885 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 7 Apr 2021 18:41:44 -0700 Subject: [PATCH 2/9] store only computed values in compute layer Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/query.go | 2 +- dagger/client.go | 2 +- dagger/compiler/compiler.go | 15 ++++++--- dagger/deployment.go | 66 +++++++++++++++++-------------------- dagger/pipeline.go | 39 ++++++++++++---------- dagger/result.go | 22 +++++++++---- dagger/types.go | 28 ---------------- 7 files changed, 80 insertions(+), 94 deletions(-) delete mode 100644 dagger/types.go diff --git a/cmd/dagger/cmd/query.go b/cmd/dagger/cmd/query.go index 7dc71e10..5d22ee12 100644 --- a/cmd/dagger/cmd/query.go +++ b/cmd/dagger/cmd/query.go @@ -56,7 +56,7 @@ var queryCmd = &cobra.Command{ lg.Fatal().Err(err).Msg("failed to query deployment") } - cueVal := compiler.EmptyStruct() + cueVal := compiler.NewValue() if !viper.GetBool("no-plan") { if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { diff --git a/dagger/client.go b/dagger/client.go index a073d9e5..84e2573b 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -90,7 +90,7 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc eg.Go(func() error { defer outr.Close() - result, err = DeploymentResultFromTar(gctx, outr) + result, err = ReadDeploymentResult(gctx, outr) return err }) diff --git a/dagger/compiler/compiler.go b/dagger/compiler/compiler.go index 4d265e23..26ea2156 100644 --- a/dagger/compiler/compiler.go +++ b/dagger/compiler/compiler.go @@ -19,8 +19,8 @@ func Compile(name string, src interface{}) (*Value, error) { return DefaultCompiler.Compile(name, src) } -func EmptyStruct() *Value { - return DefaultCompiler.EmptyStruct() +func NewValue() *Value { + return DefaultCompiler.NewValue() } // FIXME can be refactored away now? @@ -72,9 +72,14 @@ func (c *Compiler) Cue() *cue.Runtime { return &(c.Runtime) } -// Compile an empty struct -func (c *Compiler) EmptyStruct() *Value { - empty, err := c.Compile("", "") +// Compile an empty value +func (c *Compiler) NewValue() *Value { + empty, err := c.Compile("", ` + { + ... + _ + } + `) if err != nil { panic(err) } diff --git a/dagger/deployment.go b/dagger/deployment.go index 4a1a85cd..dbb6ba0a 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -74,7 +74,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { return err } - p := NewPipeline("[internal] source", s, nil) + p := NewPipeline("[internal] source", s) // execute updater script if err := p.Do(ctx, planSource); err != nil { return err @@ -150,10 +150,8 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.Up") defer span.Finish() - lg := log.Ctx(ctx) - // Reset the computed values - d.result.computed = compiler.EmptyStruct() + d.result.computed = compiler.NewValue() // Cueflow cue instance src, err := d.result.Merge() @@ -161,39 +159,11 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { return err } - // Cueflow config - flowCfg := &cueflow.Config{ - UpdateFunc: func(c *cueflow.Controller, t *cueflow.Task) error { - if t == nil { - return nil - } - - lg := lg. - With(). - Str("component", t.Path().String()). - Str("state", t.State().String()). - Logger() - - if t.State() != cueflow.Terminated { - return nil - } - // Merge task value into output - err := d.result.computed.FillPath(t.Path(), t.Value()) - if err != nil { - lg. - Error(). - Err(err). - Msg("failed to fill task result") - return err - } - return nil - }, - } // Orchestrate execution with cueflow flow := cueflow.New( - flowCfg, + &cueflow.Config{}, src.CueInst(), - newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), s)), + newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.result, s)), ) if err := flow.Run(ctx); err != nil { return err @@ -225,7 +195,7 @@ func noOpRunner(t *cueflow.Task) error { return nil } -func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc { +func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) cueflow.RunnerFunc { return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() lg := log. @@ -250,7 +220,7 @@ func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc { Msg("dependency detected") } v := compiler.Wrap(t.Value(), inst) - p := NewPipeline(t.Path().String(), s, NewFillable(t)) + p := NewPipeline(t.Path().String(), s) err := p.Do(ctx, v) if err != nil { span.LogFields(otlog.String("error", err.Error())) @@ -271,6 +241,30 @@ func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc { Msg("failed") return err } + + // Mirror the computed values in both `Task` and `Result` + computed := p.Computed() + if computed.IsEmptyStruct() { + return nil + } + + if err := t.Fill(computed.Cue()); err != nil { + lg. + Error(). + Err(err). + Msg("failed to fill task") + return err + } + + // Merge task value into output + if err := result.computed.FillPath(t.Path(), computed); err != nil { + lg. + Error(). + Err(err). + Msg("failed to fill task result") + return err + } + lg. Info(). Dur("duration", time.Since(start)). diff --git a/dagger/pipeline.go b/dagger/pipeline.go index c3e35e9d..9dd40ca3 100644 --- a/dagger/pipeline.go +++ b/dagger/pipeline.go @@ -10,6 +10,7 @@ import ( "path" "strings" + "cuelang.org/go/cue" "github.com/docker/distribution/reference" bk "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" @@ -29,19 +30,19 @@ const ( // An execution pipeline type Pipeline struct { - name string - s Solver - state llb.State - result bkgw.Reference - out *Fillable + name string + s Solver + state llb.State + result bkgw.Reference + computed *compiler.Value } -func NewPipeline(name string, s Solver, out *Fillable) *Pipeline { +func NewPipeline(name string, s Solver) *Pipeline { return &Pipeline{ - name: name, - s: s, - state: llb.Scratch(), - out: out, + name: name, + s: s, + state: llb.Scratch(), + computed: compiler.NewValue(), } } @@ -60,6 +61,10 @@ func (p *Pipeline) FS() fs.FS { return NewBuildkitFS(p.result) } +func (p *Pipeline) Computed() *compiler.Value { + return p.computed +} + func isComponent(v *compiler.Value) bool { return v.Lookup("#up").Exists() } @@ -246,7 +251,7 @@ func (p *Pipeline) Copy(ctx context.Context, op *compiler.Value, st llb.State) ( return st, err } // Execute 'from' in a tmp pipeline, and use the resulting fs - from := NewPipeline(op.Lookup("from").Path().String(), p.s, nil) + from := NewPipeline(op.Lookup("from").Path().String(), p.s) if err := from.Do(ctx, op.Lookup("from")); err != nil { return st, err } @@ -446,7 +451,7 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value) } } // eg. mount: "/foo": { from: www.source } - from := NewPipeline(mnt.Lookup("from").Path().String(), p.s, nil) + from := NewPipeline(mnt.Lookup("from").Path().String(), p.s) if err := from.Do(ctx, mnt.Lookup("from")); err != nil { return nil, err } @@ -488,7 +493,7 @@ func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State) Bytes("contents", contents). Msg("exporting string") - if err := p.out.Fill(string(contents)); err != nil { + if err := p.computed.FillPath(cue.MakePath(), string(contents)); err != nil { return st, err } case "json": @@ -504,7 +509,7 @@ func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State) Interface("contents", o). Msg("exporting json") - if err := p.out.Fill(o); err != nil { + if err := p.computed.FillPath(cue.MakePath(), o); err != nil { return st, err } case "yaml": @@ -520,7 +525,7 @@ func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State) Interface("contents", o). Msg("exporting yaml") - if err := p.out.Fill(o); err != nil { + if err := p.computed.FillPath(cue.MakePath(), o); err != nil { return st, err } default: @@ -550,7 +555,7 @@ func unmarshalAnything(data []byte, fn unmarshaller) (interface{}, error) { func (p *Pipeline) Load(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) { // Execute 'from' in a tmp pipeline, and use the resulting fs - from := NewPipeline(op.Lookup("from").Path().String(), p.s, nil) + from := NewPipeline(op.Lookup("from").Path().String(), p.s) if err := from.Do(ctx, op.Lookup("from")); err != nil { return st, err } @@ -683,7 +688,7 @@ func (p *Pipeline) DockerBuild(ctx context.Context, op *compiler.Value, st llb.S // docker build context. This can come from another component, so we need to // compute it first. if context.Exists() { - from := NewPipeline(op.Lookup("context").Path().String(), p.s, nil) + from := NewPipeline(op.Lookup("context").Path().String(), p.s) if err := from.Do(ctx, context); err != nil { return st, err } diff --git a/dagger/result.go b/dagger/result.go index 905c3f52..0918b854 100644 --- a/dagger/result.go +++ b/dagger/result.go @@ -33,9 +33,9 @@ type DeploymentResult struct { func NewDeploymentResult() *DeploymentResult { return &DeploymentResult{ - plan: compiler.EmptyStruct(), - input: compiler.EmptyStruct(), - computed: compiler.EmptyStruct(), + plan: compiler.NewValue(), + input: compiler.NewValue(), + computed: compiler.NewValue(), } } @@ -58,7 +58,7 @@ func (r *DeploymentResult) Merge() (*compiler.Value, error) { // instance manually. // --> refactor the compiler.Value API to do this for us. var ( - v = compiler.EmptyStruct() + v = compiler.NewValue() inst = v.CueInst() err error ) @@ -115,7 +115,7 @@ func (r *DeploymentResult) ToLLB() (llb.State, error) { return st, nil } -func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResult, error) { +func ReadDeploymentResult(ctx context.Context, r io.Reader) (*DeploymentResult, error) { lg := log.Ctx(ctx) result := NewDeploymentResult() tr := tar.NewReader(r) @@ -141,11 +141,21 @@ func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResul lg.Debug().Msg("outputfn: compiling") - v, err := compiler.Compile(h.Name, tr) + src, err := io.ReadAll(tr) if err != nil { return nil, err } + v, err := compiler.Compile(h.Name, src) + if err != nil { + lg. + Debug(). + Err(compiler.Err(err)). + Bytes("src", src). + Msg("invalid result file") + return nil, fmt.Errorf("failed to compile result: %w", compiler.Err(err)) + } + switch h.Name { case planFile: result.plan = v diff --git a/dagger/types.go b/dagger/types.go deleted file mode 100644 index 3c2a50df..00000000 --- a/dagger/types.go +++ /dev/null @@ -1,28 +0,0 @@ -package dagger - -import ( - "os" - - cueflow "cuelang.org/go/tools/flow" -) - -var ErrNotExist = os.ErrNotExist - -// Something which can be filled in-place with a cue value -type Fillable struct { - t *cueflow.Task -} - -func NewFillable(t *cueflow.Task) *Fillable { - return &Fillable{ - t: t, - } -} - -func (f *Fillable) Fill(x interface{}) error { - // Use a nil pointer receiver to discard all values - if f == nil { - return nil - } - return f.t.Fill(x) -} From b4c530653ce4b0ba7f0564fddedc1c2061ab696f Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 16:30:55 -0700 Subject: [PATCH 3/9] compiler: fix data race issue Signed-off-by: Andrea Luzzardi --- dagger/compiler/compiler.go | 46 ++++++++++++++++++++++++++++++++----- dagger/result.go | 29 ++++------------------- 2 files changed, 44 insertions(+), 31 deletions(-) diff --git a/dagger/compiler/compiler.go b/dagger/compiler/compiler.go index 26ea2156..eb4eaec1 100644 --- a/dagger/compiler/compiler.go +++ b/dagger/compiler/compiler.go @@ -2,6 +2,7 @@ package compiler import ( "errors" + "fmt" "sync" "cuelang.org/go/cue" @@ -28,6 +29,10 @@ func Wrap(v cue.Value, inst *cue.Instance) *Value { return DefaultCompiler.Wrap(v, inst) } +func InstanceMerge(src ...interface{}) (*Value, error) { + return DefaultCompiler.InstanceMerge(src...) +} + func Cue() *cue.Runtime { return DefaultCompiler.Cue() } @@ -74,12 +79,7 @@ func (c *Compiler) Cue() *cue.Runtime { // Compile an empty value func (c *Compiler) NewValue() *Value { - empty, err := c.Compile("", ` - { - ... - _ - } - `) + empty, err := c.Compile("", "_") if err != nil { panic(err) } @@ -98,6 +98,40 @@ func (c *Compiler) Compile(name string, src interface{}) (*Value, error) { return c.Wrap(inst.Value(), inst), nil } +// InstanceMerge merges multiple values and mirrors the value in the cue.Instance. +// FIXME: AVOID THIS AT ALL COST +// Special case: we must return an instance with the same +// contents as v, for the purposes of cueflow. +func (c *Compiler) InstanceMerge(src ...interface{}) (*Value, error) { + var ( + v = c.NewValue() + inst = v.CueInst() + err error + ) + + c.lock() + defer c.unlock() + + for _, s := range src { + // If calling Fill() with a Value, we want to use the underlying + // cue.Value to fill. + if val, ok := s.(*Value); ok { + inst, err = inst.Fill(val.val) + if err != nil { + return nil, fmt.Errorf("merge failed: %w", err) + } + } else { + inst, err = inst.Fill(s) + if err != nil { + return nil, fmt.Errorf("merge failed: %w", err) + } + } + } + + v = c.Wrap(inst.Value(), inst) + return v, nil +} + func (c *Compiler) DecodeJSON(path string, data []byte) (*Value, error) { inst, err := cuejson.Decode(c.Cue(), path, data) if err != nil { diff --git a/dagger/result.go b/dagger/result.go index 0918b854..25e6d87d 100644 --- a/dagger/result.go +++ b/dagger/result.go @@ -52,32 +52,11 @@ func (r *DeploymentResult) Computed() *compiler.Value { } func (r *DeploymentResult) Merge() (*compiler.Value, error) { - // FIXME: v.CueInst() must return an instance with the same - // contents as v, for the purposes of cueflow. - // That is not currently how *compiler.Value works, so we prepare the cue - // instance manually. - // --> refactor the compiler.Value API to do this for us. - var ( - v = compiler.NewValue() - inst = v.CueInst() - err error + return compiler.InstanceMerge( + r.plan, + r.input, + r.computed, ) - - inst, err = inst.Fill(r.plan.Cue()) - if err != nil { - return nil, fmt.Errorf("merge plan: %w", err) - } - inst, err = inst.Fill(r.input.Cue()) - if err != nil { - return nil, fmt.Errorf("merge input: %w", err) - } - inst, err = inst.Fill(r.computed.Cue()) - if err != nil { - return nil, fmt.Errorf("merge computed: %w", err) - } - - v = compiler.Wrap(inst.Value(), inst) - return v, nil } func (r *DeploymentResult) ToLLB() (llb.State, error) { From 10e676923c9bd9f7832fae618da15ca3f7f7f8ea Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 16:31:37 -0700 Subject: [PATCH 4/9] logger: fix concurrency issue Signed-off-by: Andrea Luzzardi --- cmd/dagger/logger/console.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/dagger/logger/console.go b/cmd/dagger/logger/console.go index 39e1476a..7e1aaccd 100644 --- a/cmd/dagger/logger/console.go +++ b/cmd/dagger/logger/console.go @@ -7,6 +7,7 @@ import ( "hash/adler32" "io" "strings" + "sync" "time" "github.com/mitchellh/colorstring" @@ -21,6 +22,7 @@ var colorize = colorstring.Colorize{ type Console struct { Out io.Writer maxLength int + l sync.Mutex } func (c *Console) Write(p []byte) (n int, err error) { @@ -31,9 +33,12 @@ func (c *Console) Write(p []byte) (n int, err error) { } source := c.parseSource(event) + + c.l.Lock() if len(source) > c.maxLength { c.maxLength = len(source) } + c.l.Unlock() return fmt.Fprintln(c.Out, colorize.Color(fmt.Sprintf("%s %s %s%s%s", From 8b0eea6e7fa1e0402c38cdb40728159df1aa9339 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 17:22:25 -0700 Subject: [PATCH 5/9] compiler: simplify InstanceMerge Signed-off-by: Andrea Luzzardi --- dagger/compiler/compiler.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/dagger/compiler/compiler.go b/dagger/compiler/compiler.go index eb4eaec1..f750136c 100644 --- a/dagger/compiler/compiler.go +++ b/dagger/compiler/compiler.go @@ -29,7 +29,7 @@ func Wrap(v cue.Value, inst *cue.Instance) *Value { return DefaultCompiler.Wrap(v, inst) } -func InstanceMerge(src ...interface{}) (*Value, error) { +func InstanceMerge(src ...*Value) (*Value, error) { return DefaultCompiler.InstanceMerge(src...) } @@ -102,7 +102,7 @@ func (c *Compiler) Compile(name string, src interface{}) (*Value, error) { // FIXME: AVOID THIS AT ALL COST // Special case: we must return an instance with the same // contents as v, for the purposes of cueflow. -func (c *Compiler) InstanceMerge(src ...interface{}) (*Value, error) { +func (c *Compiler) InstanceMerge(src ...*Value) (*Value, error) { var ( v = c.NewValue() inst = v.CueInst() @@ -113,18 +113,12 @@ func (c *Compiler) InstanceMerge(src ...interface{}) (*Value, error) { defer c.unlock() for _, s := range src { - // If calling Fill() with a Value, we want to use the underlying - // cue.Value to fill. - if val, ok := s.(*Value); ok { - inst, err = inst.Fill(val.val) - if err != nil { - return nil, fmt.Errorf("merge failed: %w", err) - } - } else { - inst, err = inst.Fill(s) - if err != nil { - return nil, fmt.Errorf("merge failed: %w", err) - } + inst, err = inst.Fill(s.val) + if err != nil { + return nil, fmt.Errorf("merge failed: %w", err) + } + if err := inst.Value().Err(); err != nil { + return nil, err } } From 60db93a24691cfe79b65a3dc3d0c97f6062650e3 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 18:09:10 -0700 Subject: [PATCH 6/9] client: pass through the deployment directly we can't serialize Cue in a lossless way. The current architecture has the frontend serialize the state (plan, input, computed) to Cue files, then the client loads them back. We lose information by doing this, and therefore the merge fails. This change removes the abstraction so that we operate directly on the cue.Instance. Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/common/common.go | 2 +- cmd/dagger/cmd/compute.go | 18 ++-- cmd/dagger/cmd/query.go | 7 +- dagger/client.go | 42 +++------ dagger/deployment.go | 54 ++++++++---- dagger/result.go | 150 -------------------------------- 6 files changed, 66 insertions(+), 207 deletions(-) delete mode 100644 dagger/result.go diff --git a/cmd/dagger/cmd/common/common.go b/cmd/dagger/cmd/common/common.go index fde69b17..83e17d70 100644 --- a/cmd/dagger/cmd/common/common.go +++ b/cmd/dagger/cmd/common/common.go @@ -60,7 +60,7 @@ func GetCurrentDeploymentState(ctx context.Context, store *dagger.Store) *dagger } // Re-compute a deployment (equivalent to `dagger up`). -func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.DeploymentResult { +func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.Deployment { lg := log.Ctx(ctx) c, err := dagger.NewClient(ctx, "") diff --git a/cmd/dagger/cmd/compute.go b/cmd/dagger/cmd/compute.go index 8a087c7e..f6624b4f 100644 --- a/cmd/dagger/cmd/compute.go +++ b/cmd/dagger/cmd/compute.go @@ -7,9 +7,11 @@ import ( "os" "strings" + "cuelang.org/go/cue" "dagger.io/go/cmd/dagger/cmd/common" "dagger.io/go/cmd/dagger/logger" "dagger.io/go/dagger" + "dagger.io/go/dagger/compiler" "go.mozilla.org/sops/v3" "go.mozilla.org/sops/v3/decrypt" @@ -128,14 +130,20 @@ var computeCmd = &cobra.Command{ } } - result := common.DeploymentUp(ctx, st) + deployment := common.DeploymentUp(ctx, st) - cueVal, err := result.Merge() - if err != nil { - lg.Fatal().Err(err).Msg("failed to merge result") + v := compiler.NewValue() + if err := v.FillPath(cue.MakePath(), deployment.Plan()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") + } + if err := v.FillPath(cue.MakePath(), deployment.Input()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") + } + if err := v.FillPath(cue.MakePath(), deployment.Computed()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") } - fmt.Println(cueVal.JSON()) + fmt.Println(v.JSON()) }, } diff --git a/cmd/dagger/cmd/query.go b/cmd/dagger/cmd/query.go index 5d22ee12..5680371f 100644 --- a/cmd/dagger/cmd/query.go +++ b/cmd/dagger/cmd/query.go @@ -51,7 +51,8 @@ var queryCmd = &cobra.Command{ if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - result, err := c.Do(ctx, state, nil) + + deployment, err := c.Do(ctx, state, nil) if err != nil { lg.Fatal().Err(err).Msg("failed to query deployment") } @@ -59,13 +60,13 @@ var queryCmd = &cobra.Command{ cueVal := compiler.NewValue() if !viper.GetBool("no-plan") { - if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { + if err := cueVal.FillPath(cue.MakePath(), deployment.Plan()); err != nil { lg.Fatal().Err(err).Msg("failed to merge plan") } } if !viper.GetBool("no-input") { - if err := cueVal.FillPath(cue.MakePath(), result.Input()); err != nil { + if err := cueVal.FillPath(cue.MakePath(), deployment.Input()); err != nil { lg.Fatal().Err(err).Msg("failed to merge plan with output") } } diff --git a/dagger/client.go b/dagger/client.go index 84e2573b..0d5cc719 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -3,7 +3,6 @@ package dagger import ( "context" "fmt" - "io" "os" "path/filepath" "strings" @@ -18,6 +17,7 @@ import ( // buildkit bk "github.com/moby/buildkit/client" _ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver + "github.com/moby/buildkit/client/llb" bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output @@ -60,7 +60,7 @@ func NewClient(ctx context.Context, host string) (*Client, error) { type ClientDoFunc func(context.Context, *Deployment, Solver) error // FIXME: return completed *Route, instead of *compiler.Value -func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*DeploymentResult, error) { +func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*Deployment, error) { lg := log.Ctx(ctx) eg, gctx := errgroup.WithContext(ctx) @@ -79,25 +79,14 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc }) // Spawn build function - outr, outw := io.Pipe() eg.Go(func() error { - defer outw.Close() - return c.buildfn(gctx, deployment, fn, events, outw) + return c.buildfn(gctx, deployment, fn, events) }) - // Spawn output retriever - var result *DeploymentResult - eg.Go(func() error { - defer outr.Close() - - result, err = ReadDeploymentResult(gctx, outr) - return err - }) - - return result, eg.Wait() + return deployment, eg.Wait() } -func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus, w io.WriteCloser) error { +func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus) error { lg := log.Ctx(ctx) // Scan local dirs to grant access @@ -113,15 +102,6 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD // Setup solve options opts := bk.SolveOpt{ LocalDirs: localdirs, - // FIXME: catch output & return as cue value - Exports: []bk.ExportEntry{ - { - Type: bk.ExporterTar, - Output: func(m map[string]string) (io.WriteCloser, error) { - return w, nil - }, - }, - }, } // Call buildkit solver @@ -151,11 +131,13 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD span, _ := opentracing.StartSpanFromContext(ctx, "Deployment.Export") defer span.Finish() - result := deployment.Result() - st, err := result.ToLLB() - if err != nil { - return nil, err - } + computed := deployment.Computed().JSON().PrettyString() + st := llb. + Scratch(). + File( + llb.Mkfile("computed.json", 0600, []byte(computed)), + llb.WithCustomName("[internal] serializing computed values"), + ) ref, err := s.Solve(ctx, st) if err != nil { diff --git a/dagger/deployment.go b/dagger/deployment.go index dbb6ba0a..603feb57 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -19,14 +19,25 @@ import ( ) type Deployment struct { - state *DeploymentState - result *DeploymentResult + state *DeploymentState + + // Layer 1: plan configuration + plan *compiler.Value + + // Layer 2: user inputs + input *compiler.Value + + // Layer 3: computed values + computed *compiler.Value } func NewDeployment(st *DeploymentState) (*Deployment, error) { d := &Deployment{ - state: st, - result: NewDeploymentResult(), + state: st, + + plan: compiler.NewValue(), + input: compiler.NewValue(), + computed: compiler.NewValue(), } // Prepare inputs @@ -36,9 +47,9 @@ func NewDeployment(st *DeploymentState) (*Deployment, error) { return nil, err } if input.Key == "" { - err = d.result.input.FillPath(cue.MakePath(), v) + err = d.input.FillPath(cue.MakePath(), v) } else { - err = d.result.input.FillPath(cue.ParsePath(input.Key), v) + err = d.input.FillPath(cue.ParsePath(input.Key), v) } if err != nil { return nil, err @@ -60,8 +71,16 @@ func (d *Deployment) PlanSource() Input { return d.state.PlanSource } -func (d *Deployment) Result() *DeploymentResult { - return d.result +func (d *Deployment) Plan() *compiler.Value { + return d.plan +} + +func (d *Deployment) Input() *compiler.Value { + return d.input +} + +func (d *Deployment) Computed() *compiler.Value { + return d.computed } // LoadPlan loads the plan @@ -89,7 +108,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { if err != nil { return fmt.Errorf("plan config: %w", err) } - d.result.plan = plan + d.plan = plan return nil } @@ -122,7 +141,7 @@ func (d *Deployment) LocalDirs() map[string]string { } // 1. Scan the deployment state // FIXME: use a common `flow` instance to avoid rescanning the tree. - src, err := d.result.Merge() + src, err := compiler.InstanceMerge(d.plan, d.input) if err != nil { panic(err) } @@ -151,10 +170,10 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { defer span.Finish() // Reset the computed values - d.result.computed = compiler.NewValue() + d.computed = compiler.NewValue() // Cueflow cue instance - src, err := d.result.Merge() + src, err := compiler.InstanceMerge(d.plan, d.input) if err != nil { return err } @@ -163,7 +182,7 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { flow := cueflow.New( &cueflow.Config{}, src.CueInst(), - newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.result, s)), + newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.computed, s)), ) if err := flow.Run(ctx); err != nil { return err @@ -195,7 +214,7 @@ func noOpRunner(t *cueflow.Task) error { return nil } -func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) cueflow.RunnerFunc { +func newPipelineRunner(inst *cue.Instance, computed *compiler.Value, s Solver) cueflow.RunnerFunc { return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() lg := log. @@ -243,12 +262,11 @@ func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) c } // Mirror the computed values in both `Task` and `Result` - computed := p.Computed() - if computed.IsEmptyStruct() { + if p.Computed().IsEmptyStruct() { return nil } - if err := t.Fill(computed.Cue()); err != nil { + if err := t.Fill(p.Computed().Cue()); err != nil { lg. Error(). Err(err). @@ -257,7 +275,7 @@ func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) c } // Merge task value into output - if err := result.computed.FillPath(t.Path(), computed); err != nil { + if err := computed.FillPath(t.Path(), p.Computed()); err != nil { lg. Error(). Err(err). diff --git a/dagger/result.go b/dagger/result.go deleted file mode 100644 index 25e6d87d..00000000 --- a/dagger/result.go +++ /dev/null @@ -1,150 +0,0 @@ -package dagger - -import ( - "archive/tar" - "context" - "errors" - "fmt" - "io" - "strings" - - "dagger.io/go/dagger/compiler" - "github.com/moby/buildkit/client/llb" - "github.com/rs/zerolog/log" -) - -const ( - planFile = "plan.cue" - inputFile = "input.cue" - computedFile = "computed.cue" -) - -// DeploymentResult represents the layers of a deployment run -type DeploymentResult struct { - // Layer 1: plan configuration - plan *compiler.Value - - // Layer 2: user inputs - input *compiler.Value - - // Layer 3: computed values - computed *compiler.Value -} - -func NewDeploymentResult() *DeploymentResult { - return &DeploymentResult{ - plan: compiler.NewValue(), - input: compiler.NewValue(), - computed: compiler.NewValue(), - } -} - -func (r *DeploymentResult) Plan() *compiler.Value { - return r.plan -} - -func (r *DeploymentResult) Input() *compiler.Value { - return r.input -} - -func (r *DeploymentResult) Computed() *compiler.Value { - return r.computed -} - -func (r *DeploymentResult) Merge() (*compiler.Value, error) { - return compiler.InstanceMerge( - r.plan, - r.input, - r.computed, - ) -} - -func (r *DeploymentResult) ToLLB() (llb.State, error) { - st := llb.Scratch() - - planSource, err := r.plan.Source() - if err != nil { - return st, compiler.Err(err) - } - - inputSource, err := r.input.Source() - if err != nil { - return st, compiler.Err(err) - } - - outputSource, err := r.computed.Source() - if err != nil { - return st, compiler.Err(err) - } - - st = st. - File( - llb.Mkfile(planFile, 0600, planSource), - llb.WithCustomName("[internal] serializing plan"), - ). - File( - llb.Mkfile(inputFile, 0600, inputSource), - llb.WithCustomName("[internal] serializing input"), - ). - File( - llb.Mkfile(computedFile, 0600, outputSource), - llb.WithCustomName("[internal] serializing output"), - ) - - return st, nil -} - -func ReadDeploymentResult(ctx context.Context, r io.Reader) (*DeploymentResult, error) { - lg := log.Ctx(ctx) - result := NewDeploymentResult() - tr := tar.NewReader(r) - - for { - h, err := tr.Next() - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, fmt.Errorf("read tar stream: %w", err) - } - - lg := lg. - With(). - Str("file", h.Name). - Logger() - - if !strings.HasSuffix(h.Name, ".cue") { - lg.Debug().Msg("skipping non-cue file from exporter tar stream") - continue - } - - lg.Debug().Msg("outputfn: compiling") - - src, err := io.ReadAll(tr) - if err != nil { - return nil, err - } - - v, err := compiler.Compile(h.Name, src) - if err != nil { - lg. - Debug(). - Err(compiler.Err(err)). - Bytes("src", src). - Msg("invalid result file") - return nil, fmt.Errorf("failed to compile result: %w", compiler.Err(err)) - } - - switch h.Name { - case planFile: - result.plan = v - case inputFile: - result.input = v - case computedFile: - result.computed = v - default: - lg.Warn().Msg("unexpected file") - } - } - return result, nil -} From 43a34f0b04ae78356e8677336240f33b9093e68c Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 17:22:34 -0700 Subject: [PATCH 7/9] tests: re-enable CLI tests, use the brand new query Signed-off-by: Andrea Luzzardi --- tests/test-cli.sh | 47 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/tests/test-cli.sh b/tests/test-cli.sh index 5d33251a..637d450d 100644 --- a/tests/test-cli.sh +++ b/tests/test-cli.sh @@ -65,13 +65,13 @@ test::cli::newgit() { DAGGER_STORE="$(mktemp -d -t dagger-store-XXXXXX)" export DAGGER_STORE - disable test::one "CLI: new git: --plan-git" \ + test::one "CLI: new git: --plan-git" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" new --plan-git https://github.com/samalba/dagger-test.git simple - disable test::one "CLI: new git: verify plan can be upped" \ + test::one "CLI: new git: verify plan can be upped" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "simple" - disable test::one "CLI: new git: verify we have the right plan" --stdout='{ + test::one "CLI: new git: verify we have the right plan" --stdout='{ foo: "value" bar: "another value" }' \ @@ -122,10 +122,10 @@ test::cli::plan() { test::one "CLI: plan dir: query non-concrete" --exit=1 \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" query -c - disable test::one "CLI: plan git (Disabled: external dependency)" \ + test::one "CLI: plan git" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" plan git https://github.com/samalba/dagger-test.git - disable test::one "CLI: plan git: verify we have the right plan" --stdout='{ + test::one "CLI: plan git: verify we have the right plan" --stdout='{ foo: "value" bar: "another value" }' \ @@ -140,21 +140,40 @@ test::cli::input() { DAGGER_STORE="$(mktemp -d -t dagger-store-XXXXXX)" export DAGGER_STORE - test::one "CLI: new input" \ + test::one "CLI: input: new" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" new --plan-dir "$d"/cli/input "input" - test::one "CLI: up: missing input" \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "input" --stdout='{"foo":"bar"}' + test::one "CLI: input: up missing input" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "input" - test::one "CLI: input dir" \ + test::one "CLI: input: query missing input" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "input" --stdout='{ + "foo": "bar" +}' + + test::one "CLI: input: set dir" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" input -d "input" dir "source" ./tests/cli/input/testdata - test::one "CLI: up: input is set with input dir" \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "input" --stdout='{"bar":"thisisatest\n","foo":"bar","source":{}}' + test::one "CLI: input: up with input dir" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "input" - disable test::one "CLI: input git" \ + test::one "CLI: input: query with input dir" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "input" --stdout='{ + "bar": "thisisatest\n", + "foo": "bar", + "source": {} +}' + + test::one "CLI: input: set git" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" input -d "input" git "source" https://github.com/samalba/dagger-test-simple.git - disable test::one "CLI: up: input is set with input git" \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "input" --stdout='{"bar":"testgit\n","foo":"bar","source":{}}' + test::one "CLI: input: up with input git" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "input" + + test::one "CLI: query with input git" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "input" --stdout='{ + "bar": "testgit\n", + "foo": "bar", + "source": {} +}' } From 65ebbc8ecb08ebfa6edb548f149a8c7b9e4ec959 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 18:31:10 -0700 Subject: [PATCH 8/9] tests: add query tests Signed-off-by: Andrea Luzzardi --- tests/cli/nonconcrete/main.cue | 3 -- tests/cli/simple/main.cue | 20 ++++++++--- tests/test-cli.sh | 65 ++++++++++++++++++++++++---------- 3 files changed, 62 insertions(+), 26 deletions(-) delete mode 100644 tests/cli/nonconcrete/main.cue diff --git a/tests/cli/nonconcrete/main.cue b/tests/cli/nonconcrete/main.cue deleted file mode 100644 index 46d2bfca..00000000 --- a/tests/cli/nonconcrete/main.cue +++ /dev/null @@ -1,3 +0,0 @@ -package testing - -nonConcrete: string diff --git a/tests/cli/simple/main.cue b/tests/cli/simple/main.cue index d2a09c27..42621fef 100644 --- a/tests/cli/simple/main.cue +++ b/tests/cli/simple/main.cue @@ -4,8 +4,18 @@ import "dagger.io/dagger/op" foo: "value" bar: "another value" - -#up: [ - op.#FetchContainer & {ref: "busybox"}, - op.#Exec & {args: ["true"]}, -] +computed: { + string + #up: [ + op.#FetchContainer & {ref: "busybox"}, + op.#Exec & { + args: ["sh", "-c", """ + printf test > /export + """] + }, + op.#Export & { + source: "/export" + format: "string" + }, + ] +} diff --git a/tests/test-cli.sh b/tests/test-cli.sh index 637d450d..e7f92f92 100644 --- a/tests/test-cli.sh +++ b/tests/test-cli.sh @@ -51,10 +51,11 @@ test::cli::newdir() { "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "simple" test::one "CLI: new: verify we have the right plan" --stdout='{ - foo: "value" - bar: "another value" + "bar": "another value", + "computed": "test", + "foo": "value" }' \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -f cue -d "simple" -c + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -f cue -d "simple" -c -f json } test::cli::newgit() { @@ -89,20 +90,45 @@ test::cli::query() { test::one "CLI: query: initialize simple" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" new --plan-dir "$d"/cli/simple simple - test::one "CLI: query: concrete" --stdout='{ - foo: "value" - bar: "another value" + test::one "CLI: query: before up" --stdout='{ + "bar": "another value", + "foo": "value" }' \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -f cue -d "simple" -c + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" + + test::one "CLI: query: concrete should fail" --exit=1 \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" -c test::one "CLI: query: target" --stdout='"value"' \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -f cue -d "simple" foo + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" foo - test::one "CLI: query: initialize nonconcrete" \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" new --plan-dir "$d"/cli/nonconcrete nonconcrete + test::one "CLI: query: compute missing values" \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" up -d "simple" - test::one "CLI: query: non concrete" --exit=1 \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -f cue -d "nonconcrete" -c + test::one "CLI: query: all values" --stdout='{ + "bar": "another value", + "computed": "test", + "foo": "value" +}' \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" + + test::one "CLI: query: concrete should work" --stdout='{ + "bar": "another value", + "computed": "test", + "foo": "value" +}' \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" -c + + test::one "CLI: query --no-computed" --stdout='{ + "bar": "another value", + "foo": "value" +}' \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" --no-computed + + test::one "CLI: query: --no-plan" --stdout='{ + "computed": "test" +}' \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" -c --no-plan } test::cli::plan() { @@ -117,19 +143,22 @@ test::cli::plan() { "$dagger" "${DAGGER_BINARY_ARGS[@]}" new --plan-dir "$d"/cli/simple simple test::one "CLI: plan dir" \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" plan dir "$d"/cli/nonconcrete + "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" plan dir "$d"/cli/simple - test::one "CLI: plan dir: query non-concrete" --exit=1 \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" query -c + test::one "CLI: plan dir: verify we have the right plan" --stdout='{ + "bar": "another value", + "foo": "value" +}' \ + "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" query test::one "CLI: plan git" \ "$dagger" "${DAGGER_BINARY_ARGS[@]}" -d "simple" plan git https://github.com/samalba/dagger-test.git test::one "CLI: plan git: verify we have the right plan" --stdout='{ - foo: "value" - bar: "another value" + "bar": "another value", + "foo": "value" }' \ - "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -f cue -d "simple" -c + "$dagger" "${DAGGER_BINARY_ARGS[@]}" query -d "simple" -c } test::cli::input() { From a92423f18635259228fdca68259186de4db17748 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Thu, 8 Apr 2021 18:48:23 -0700 Subject: [PATCH 9/9] tests: fix react example test Signed-off-by: Andrea Luzzardi --- examples/react/main.cue | 2 +- tests/examples/react/inputs.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/react/main.cue b/examples/react/main.cue index 01f0bce9..25c3b554 100644 --- a/examples/react/main.cue +++ b/examples/react/main.cue @@ -15,7 +15,7 @@ repo: git.#Repository & { // Host the application with Netlify www: netlify.#Site & { // Site name can be overridden - name: string | *"dagger-example-react" + name: string | *"dagger-examples-react" // Deploy the output of yarn build // (Netlify build feature is not used, to avoid extra cost). diff --git a/tests/examples/react/inputs.yaml b/tests/examples/react/inputs.yaml index f40e6e4d..08fbd772 100644 --- a/tests/examples/react/inputs.yaml +++ b/tests/examples/react/inputs.yaml @@ -1,14 +1,14 @@ -todoApp: +www: account: - name: ENC[AES256_GCM,data:/T5E/8rU3YVxpA==,iv:dscf+kcVg0jYd5ICyJJL/f80BHyUDtfHWW9cRSP7aCU=,tag:xHZpdqNoYa58NbAX691vhg==,type:str] - token: ENC[AES256_GCM,data:6RBg3KafhrJYS8hbACxNRtnkpSNeablIiqHhUZTjz04X3cPyP4igPn7cnw==,iv:kj0vn83GaAtocUBdacO5D+6SJu6sVxfs58LYvUdpivA=,tag:CzZFkr9sXFHOOQihqYBdTA==,type:str] + name: ENC[AES256_GCM,data:EsPTWeiDCrVeUQ==,iv:9/tZQOrrjQejsK6NFcgQO9HaAnjIUv1Qc+S0slds+4o=,tag:n3NYautI94ilmEWG9UeFzA==,type:str] + token: ENC[AES256_GCM,data:Jx7oVJXcMX3hBmC6Kld7jxOOH/3CGSAzC7rRhHgs25iLFZG+F3iN5fYYVA==,iv:9SVNNv5CTM0AZns0x7x5bSI6jW93jSh8Xt21hXN1g28=,tag:QKBf7OsKs1TxvMA4gdA53Q==,type:str] sops: kms: [] gcp_kms: [] azure_kv: [] hc_vault: [] - lastmodified: '2021-03-19T00:41:47Z' - mac: ENC[AES256_GCM,data:eaDHLQHEH8wtCiDhcl9wWIC+WwitEBRr4K/p7chOKJ8sBc3wlH+dT3HVX5hTYX5rwU0ai2wgfcOg6mw4mDksIJvU4JpZV/AIo16ETjZDhRSc5hU1hUAiOQCYuJPqyGa12HzxzA81mcFm8kJETnoTYiTo7ApG9ShmOCp6XPezjPk=,iv:5Pd9eGTrVnr0ITNj/TrgJz9UE2SjVqqmOxxZOw0Spus=,tag:VGxPwp7hXHtz9Zvk+6WsqA==,type:str] + lastmodified: '2021-04-09T01:47:48Z' + mac: ENC[AES256_GCM,data:UqOr8wGUwf6iVnurG/dvpiZVN0k9NrLTaB5CQVn+QTRQybgYuLZLTJuTFNlAqFGvNO07OoGkDx/Vmhw9F6nJS0qUcHC5iWg+Bxaa7anHwer7fkA/xTjKpqJnE6iveq+hzumDgeFbGL7+EJvbyxtJioF/LnWa5gnduBVacizjbKc=,iv:x4ulAfd2R0BdkGF4zkMn+wX+Y8wF3jEeFOu3+5t+wz4=,tag:e5uOip0iAt9xJa6RJGukCA==,type:str] pgp: - created_at: '2021-03-18T22:02:51Z' enc: |