diff --git a/cmd/dagger/cmd/common/common.go b/cmd/dagger/cmd/common/common.go index fde69b17..83e17d70 100644 --- a/cmd/dagger/cmd/common/common.go +++ b/cmd/dagger/cmd/common/common.go @@ -60,7 +60,7 @@ func GetCurrentDeploymentState(ctx context.Context, store *dagger.Store) *dagger } // Re-compute a deployment (equivalent to `dagger up`). -func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.DeploymentResult { +func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.Deployment { lg := log.Ctx(ctx) c, err := dagger.NewClient(ctx, "") diff --git a/cmd/dagger/cmd/compute.go b/cmd/dagger/cmd/compute.go index 8a087c7e..f6624b4f 100644 --- a/cmd/dagger/cmd/compute.go +++ b/cmd/dagger/cmd/compute.go @@ -7,9 +7,11 @@ import ( "os" "strings" + "cuelang.org/go/cue" "dagger.io/go/cmd/dagger/cmd/common" "dagger.io/go/cmd/dagger/logger" "dagger.io/go/dagger" + "dagger.io/go/dagger/compiler" "go.mozilla.org/sops/v3" "go.mozilla.org/sops/v3/decrypt" @@ -128,14 +130,20 @@ var computeCmd = &cobra.Command{ } } - result := common.DeploymentUp(ctx, st) + deployment := common.DeploymentUp(ctx, st) - cueVal, err := result.Merge() - if err != nil { - lg.Fatal().Err(err).Msg("failed to merge result") + v := compiler.NewValue() + if err := v.FillPath(cue.MakePath(), deployment.Plan()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") + } + if err := v.FillPath(cue.MakePath(), deployment.Input()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") + } + if err := v.FillPath(cue.MakePath(), deployment.Computed()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge") } - fmt.Println(cueVal.JSON()) + fmt.Println(v.JSON()) }, } diff --git a/cmd/dagger/cmd/query.go b/cmd/dagger/cmd/query.go index 5d22ee12..5680371f 100644 --- a/cmd/dagger/cmd/query.go +++ b/cmd/dagger/cmd/query.go @@ -51,7 +51,8 @@ var queryCmd = &cobra.Command{ if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - result, err := c.Do(ctx, state, nil) + + deployment, err := c.Do(ctx, state, nil) if err != nil { lg.Fatal().Err(err).Msg("failed to query deployment") } @@ -59,13 +60,13 @@ var queryCmd = &cobra.Command{ cueVal := compiler.NewValue() if !viper.GetBool("no-plan") { - if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { + if err := cueVal.FillPath(cue.MakePath(), deployment.Plan()); err != nil { lg.Fatal().Err(err).Msg("failed to merge plan") } } if !viper.GetBool("no-input") { - if err := cueVal.FillPath(cue.MakePath(), result.Input()); err != nil { + if err := cueVal.FillPath(cue.MakePath(), deployment.Input()); err != nil { lg.Fatal().Err(err).Msg("failed to merge plan with output") } } diff --git a/dagger/client.go b/dagger/client.go index 84e2573b..0d5cc719 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -3,7 +3,6 @@ package dagger import ( "context" "fmt" - "io" "os" "path/filepath" "strings" @@ -18,6 +17,7 @@ import ( // buildkit bk "github.com/moby/buildkit/client" _ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver + "github.com/moby/buildkit/client/llb" bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output @@ -60,7 +60,7 @@ func NewClient(ctx context.Context, host string) (*Client, error) { type ClientDoFunc func(context.Context, *Deployment, Solver) error // FIXME: return completed *Route, instead of *compiler.Value -func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*DeploymentResult, error) { +func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*Deployment, error) { lg := log.Ctx(ctx) eg, gctx := errgroup.WithContext(ctx) @@ -79,25 +79,14 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc }) // Spawn build function - outr, outw := io.Pipe() eg.Go(func() error { - defer outw.Close() - return c.buildfn(gctx, deployment, fn, events, outw) + return c.buildfn(gctx, deployment, fn, events) }) - // Spawn output retriever - var result *DeploymentResult - eg.Go(func() error { - defer outr.Close() - - result, err = ReadDeploymentResult(gctx, outr) - return err - }) - - return result, eg.Wait() + return deployment, eg.Wait() } -func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus, w io.WriteCloser) error { +func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus) error { lg := log.Ctx(ctx) // Scan local dirs to grant access @@ -113,15 +102,6 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD // Setup solve options opts := bk.SolveOpt{ LocalDirs: localdirs, - // FIXME: catch output & return as cue value - Exports: []bk.ExportEntry{ - { - Type: bk.ExporterTar, - Output: func(m map[string]string) (io.WriteCloser, error) { - return w, nil - }, - }, - }, } // Call buildkit solver @@ -151,11 +131,13 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD span, _ := opentracing.StartSpanFromContext(ctx, "Deployment.Export") defer span.Finish() - result := deployment.Result() - st, err := result.ToLLB() - if err != nil { - return nil, err - } + computed := deployment.Computed().JSON().PrettyString() + st := llb. + Scratch(). + File( + llb.Mkfile("computed.json", 0600, []byte(computed)), + llb.WithCustomName("[internal] serializing computed values"), + ) ref, err := s.Solve(ctx, st) if err != nil { diff --git a/dagger/deployment.go b/dagger/deployment.go index dbb6ba0a..603feb57 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -19,14 +19,25 @@ import ( ) type Deployment struct { - state *DeploymentState - result *DeploymentResult + state *DeploymentState + + // Layer 1: plan configuration + plan *compiler.Value + + // Layer 2: user inputs + input *compiler.Value + + // Layer 3: computed values + computed *compiler.Value } func NewDeployment(st *DeploymentState) (*Deployment, error) { d := &Deployment{ - state: st, - result: NewDeploymentResult(), + state: st, + + plan: compiler.NewValue(), + input: compiler.NewValue(), + computed: compiler.NewValue(), } // Prepare inputs @@ -36,9 +47,9 @@ func NewDeployment(st *DeploymentState) (*Deployment, error) { return nil, err } if input.Key == "" { - err = d.result.input.FillPath(cue.MakePath(), v) + err = d.input.FillPath(cue.MakePath(), v) } else { - err = d.result.input.FillPath(cue.ParsePath(input.Key), v) + err = d.input.FillPath(cue.ParsePath(input.Key), v) } if err != nil { return nil, err @@ -60,8 +71,16 @@ func (d *Deployment) PlanSource() Input { return d.state.PlanSource } -func (d *Deployment) Result() *DeploymentResult { - return d.result +func (d *Deployment) Plan() *compiler.Value { + return d.plan +} + +func (d *Deployment) Input() *compiler.Value { + return d.input +} + +func (d *Deployment) Computed() *compiler.Value { + return d.computed } // LoadPlan loads the plan @@ -89,7 +108,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { if err != nil { return fmt.Errorf("plan config: %w", err) } - d.result.plan = plan + d.plan = plan return nil } @@ -122,7 +141,7 @@ func (d *Deployment) LocalDirs() map[string]string { } // 1. Scan the deployment state // FIXME: use a common `flow` instance to avoid rescanning the tree. - src, err := d.result.Merge() + src, err := compiler.InstanceMerge(d.plan, d.input) if err != nil { panic(err) } @@ -151,10 +170,10 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { defer span.Finish() // Reset the computed values - d.result.computed = compiler.NewValue() + d.computed = compiler.NewValue() // Cueflow cue instance - src, err := d.result.Merge() + src, err := compiler.InstanceMerge(d.plan, d.input) if err != nil { return err } @@ -163,7 +182,7 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error { flow := cueflow.New( &cueflow.Config{}, src.CueInst(), - newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.result, s)), + newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.computed, s)), ) if err := flow.Run(ctx); err != nil { return err @@ -195,7 +214,7 @@ func noOpRunner(t *cueflow.Task) error { return nil } -func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) cueflow.RunnerFunc { +func newPipelineRunner(inst *cue.Instance, computed *compiler.Value, s Solver) cueflow.RunnerFunc { return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() lg := log. @@ -243,12 +262,11 @@ func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) c } // Mirror the computed values in both `Task` and `Result` - computed := p.Computed() - if computed.IsEmptyStruct() { + if p.Computed().IsEmptyStruct() { return nil } - if err := t.Fill(computed.Cue()); err != nil { + if err := t.Fill(p.Computed().Cue()); err != nil { lg. Error(). Err(err). @@ -257,7 +275,7 @@ func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) c } // Merge task value into output - if err := result.computed.FillPath(t.Path(), computed); err != nil { + if err := computed.FillPath(t.Path(), p.Computed()); err != nil { lg. Error(). Err(err). diff --git a/dagger/result.go b/dagger/result.go deleted file mode 100644 index 25e6d87d..00000000 --- a/dagger/result.go +++ /dev/null @@ -1,150 +0,0 @@ -package dagger - -import ( - "archive/tar" - "context" - "errors" - "fmt" - "io" - "strings" - - "dagger.io/go/dagger/compiler" - "github.com/moby/buildkit/client/llb" - "github.com/rs/zerolog/log" -) - -const ( - planFile = "plan.cue" - inputFile = "input.cue" - computedFile = "computed.cue" -) - -// DeploymentResult represents the layers of a deployment run -type DeploymentResult struct { - // Layer 1: plan configuration - plan *compiler.Value - - // Layer 2: user inputs - input *compiler.Value - - // Layer 3: computed values - computed *compiler.Value -} - -func NewDeploymentResult() *DeploymentResult { - return &DeploymentResult{ - plan: compiler.NewValue(), - input: compiler.NewValue(), - computed: compiler.NewValue(), - } -} - -func (r *DeploymentResult) Plan() *compiler.Value { - return r.plan -} - -func (r *DeploymentResult) Input() *compiler.Value { - return r.input -} - -func (r *DeploymentResult) Computed() *compiler.Value { - return r.computed -} - -func (r *DeploymentResult) Merge() (*compiler.Value, error) { - return compiler.InstanceMerge( - r.plan, - r.input, - r.computed, - ) -} - -func (r *DeploymentResult) ToLLB() (llb.State, error) { - st := llb.Scratch() - - planSource, err := r.plan.Source() - if err != nil { - return st, compiler.Err(err) - } - - inputSource, err := r.input.Source() - if err != nil { - return st, compiler.Err(err) - } - - outputSource, err := r.computed.Source() - if err != nil { - return st, compiler.Err(err) - } - - st = st. - File( - llb.Mkfile(planFile, 0600, planSource), - llb.WithCustomName("[internal] serializing plan"), - ). - File( - llb.Mkfile(inputFile, 0600, inputSource), - llb.WithCustomName("[internal] serializing input"), - ). - File( - llb.Mkfile(computedFile, 0600, outputSource), - llb.WithCustomName("[internal] serializing output"), - ) - - return st, nil -} - -func ReadDeploymentResult(ctx context.Context, r io.Reader) (*DeploymentResult, error) { - lg := log.Ctx(ctx) - result := NewDeploymentResult() - tr := tar.NewReader(r) - - for { - h, err := tr.Next() - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, fmt.Errorf("read tar stream: %w", err) - } - - lg := lg. - With(). - Str("file", h.Name). - Logger() - - if !strings.HasSuffix(h.Name, ".cue") { - lg.Debug().Msg("skipping non-cue file from exporter tar stream") - continue - } - - lg.Debug().Msg("outputfn: compiling") - - src, err := io.ReadAll(tr) - if err != nil { - return nil, err - } - - v, err := compiler.Compile(h.Name, src) - if err != nil { - lg. - Debug(). - Err(compiler.Err(err)). - Bytes("src", src). - Msg("invalid result file") - return nil, fmt.Errorf("failed to compile result: %w", compiler.Err(err)) - } - - switch h.Name { - case planFile: - result.plan = v - case inputFile: - result.input = v - case computedFile: - result.computed = v - default: - lg.Warn().Msg("unexpected file") - } - } - return result, nil -}