opentracing support
Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
// Cue
|
||||
@@ -45,7 +46,11 @@ func NewClient(ctx context.Context, host string) (*Client, error) {
|
||||
|
||||
host = h
|
||||
}
|
||||
c, err := bk.New(ctx, host)
|
||||
opts := []bk.ClientOpt{}
|
||||
if span := opentracing.SpanFromContext(ctx); span != nil {
|
||||
opts = append(opts, bk.WithTracer(span.Tracer()))
|
||||
}
|
||||
c, err := bk.New(ctx, host, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("buildkit client: %w", err)
|
||||
}
|
||||
@@ -138,7 +143,7 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus,
|
||||
|
||||
// Export env to a cue directory
|
||||
lg.Debug().Msg("exporting env")
|
||||
outdir, err := env.Export(s.Scratch())
|
||||
outdir, err := env.Export(ctx, s.Scratch())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -8,9 +8,12 @@ import (
|
||||
|
||||
"cuelang.org/go/cue"
|
||||
cueflow "cuelang.org/go/tools/flow"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"dagger.io/go/dagger/compiler"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
otlog "github.com/opentracing/opentracing-go/log"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Env struct {
|
||||
@@ -86,6 +89,9 @@ func (env *Env) SetInput(i *compiler.Value) error {
|
||||
|
||||
// Update the base configuration
|
||||
func (env *Env) Update(ctx context.Context, s Solver) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Env.Update")
|
||||
defer span.Finish()
|
||||
|
||||
p := NewPipeline("[internal] source", s, nil)
|
||||
// execute updater script
|
||||
if err := p.Do(ctx, env.updater); err != nil {
|
||||
@@ -186,7 +192,10 @@ func (env *Env) mergeState() error {
|
||||
|
||||
// Export env to a directory of cue files
|
||||
// (Use with FS.Change)
|
||||
func (env *Env) Export(fs FS) (FS, error) {
|
||||
func (env *Env) Export(ctx context.Context, fs FS) (FS, error) {
|
||||
span, _ := opentracing.StartSpanFromContext(ctx, "Env.Export")
|
||||
defer span.Finish()
|
||||
|
||||
// FIXME: we serialize as JSON to guarantee a self-contained file.
|
||||
// compiler.Value.Save() leaks imports, so requires a shared cue.mod with
|
||||
// client which is undesirable.
|
||||
@@ -210,6 +219,9 @@ func (env *Env) Export(fs FS) (FS, error) {
|
||||
|
||||
// Compute missing values in env configuration, and write them to state.
|
||||
func (env *Env) Compute(ctx context.Context, s Solver) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Env.Compute")
|
||||
defer span.Finish()
|
||||
|
||||
lg := log.Ctx(ctx)
|
||||
|
||||
// Cueflow cue instance
|
||||
@@ -248,11 +260,17 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
||||
},
|
||||
}
|
||||
// Orchestrate execution with cueflow
|
||||
flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(ctx, flowInst, s))
|
||||
flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(flowInst, s))
|
||||
if err := flow.Run(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return env.mergeState()
|
||||
|
||||
{
|
||||
span, _ := opentracing.StartSpanFromContext(ctx, "Env.Compute: merge state")
|
||||
defer span.Finish()
|
||||
|
||||
return env.mergeState()
|
||||
}
|
||||
}
|
||||
|
||||
func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc {
|
||||
@@ -268,7 +286,7 @@ func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cueflow.TaskFunc {
|
||||
func newPipelineTaskFunc(inst *cue.Instance, s Solver) cueflow.TaskFunc {
|
||||
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
||||
v := compiler.Wrap(flowVal, inst)
|
||||
if !isComponent(v) {
|
||||
@@ -277,12 +295,17 @@ func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cuef
|
||||
}
|
||||
// Cueflow run func:
|
||||
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
||||
ctx := t.Context()
|
||||
lg := log.
|
||||
Ctx(ctx).
|
||||
With().
|
||||
Str("component", t.Path().String()).
|
||||
Logger()
|
||||
ctx := lg.WithContext(ctx)
|
||||
ctx = lg.WithContext(ctx)
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx,
|
||||
fmt.Sprintf("compute: %s", t.Path().String()),
|
||||
)
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
lg.
|
||||
@@ -298,6 +321,9 @@ func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cuef
|
||||
p := NewPipeline(t.Path().String(), s, NewFillable(t))
|
||||
err := p.Do(ctx, v)
|
||||
if err != nil {
|
||||
span.LogFields(otlog.String("error", err.Error()))
|
||||
ext.Error.Set(span, true)
|
||||
|
||||
// FIXME: this should use errdefs.IsCanceled(err)
|
||||
if strings.Contains(err.Error(), "context canceled") {
|
||||
lg.
|
||||
|
Reference in New Issue
Block a user