Hello,
I tried to use otel bridge to trace temporal workflows and activity opentelemetry-go/bridge/opentracing at main · open-telemetry/opentelemetry-go · GitHub.
For some reason thee workflow is in its own trace and every activity is in it’s own trace.
For temporal propagator implementation we used the same one in internal package we just changed the carrier to be HTTP instead of TextMap.
couldn’t understand why this is happening so I cloned otel and logged every single what’s going on. I wish I know what is happening here sdk-go/propagation.go at 6580cbe0aa41a8b515791f95c2c15bb37db1dab1 · temporalio/sdk-go · GitHub
as from the logs I added this shows (check open-telemetry/opentelemetry-go/blob/main/bridge/opentracing/bridge.go Line #412) zero references so that’s why every thing is in its own trace.
Here is the propagator impl
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package tracer
import (
"context"
"fmt"
"go.temporal.io/sdk/workflow"
"github.com/opentracing/opentracing-go"
otbridge "cigar/bridge/opentracing"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"
)
const (
tracerHeaderKey = "_tracer-data"
)
// tracingContextPropagator implements the ContextPropagator interface for
// tracing context propagation.
//
// Inject -> context.Context to Header - this extracts the Span from the
// context and places the SpanContext into the Header
// Extract -> Header to context.Context - this extracts the SpanContext from
// the header, returns a context.Context containing the SpanContext
// InjectFromWorkflow -> Context to Header - extracts a SpanContext from the
// workflow context and puts it in the header
// ExtractToWorkflow -> Header to Context - takes the SpanContext present in
// the header and puts it in the Context object. Does not start a new span
// as that is started outside when the workflow is actually executed
type tracingContextPropagator struct {
logger log.Logger
tracer opentracing.Tracer
}
const activeSpanContextKey = "activeSpanContextKey"
// NewTracingContextPropagator returns new tracing context propagator object
func NewTracingContextPropagator(logger log.Logger, tracer opentracing.Tracer) workflow.ContextPropagator {
return &tracingContextPropagator{logger, tracer}
}
func (t *tracingContextPropagator) Inject(
ctx context.Context,
hw workflow.HeaderWriter,
) error {
// retrieve span from context object
span := opentracing.SpanFromContext(ctx)
if span == nil {
println("%%%%%%%%%%%%%%% NO SPAN IN CTX")
val := ctx.Value(activeSpanContextKey)
if spCtx, ok := val.(opentracing.SpanContext); ok {
return t.writeSpanContextToHeader(spCtx, hw)
}
return nil
}
if bt, ok := t.tracer.(*otbridge.BridgeTracer); ok {
println("Extract **************************** bridge tracer")
ctx = bt.NewHookedContext(ctx)
}
print(fmt.Sprintf("--=--=----=------=----= FOUND IT ..... traced by %T, %v", span.Tracer(), span.Tracer()))
print(fmt.Sprintf("--=--=----=------=----= FOUND IT ..... traced by %T, %v", span, span))
return t.writeSpanContextToHeader(span.Context(), hw)
}
func (t *tracingContextPropagator) Extract(
ctx context.Context,
hr workflow.HeaderReader,
) (context.Context, error) {
spanContext := t.readSpanContextFromHeader(hr)
if spanContext == nil {
println("Extract &&&&&&&&&&&&&&&&& NO SPAN CTX")
return ctx, nil
}
println(fmt.Sprintf("Extract **************************** %v", spanContext))
if bt, ok := t.tracer.(*otbridge.BridgeTracer); ok {
println("Extract **************************** bridge tracer")
ctx = bt.NewHookedContext(ctx)
}
return context.WithValue(ctx, activeSpanContextKey, spanContext), nil
}
func (t *tracingContextPropagator) InjectFromWorkflow(
ctx workflow.Context,
hw workflow.HeaderWriter,
) error {
// retrieve span from context object
spanContext := spanFromContext(ctx)
if spanContext == nil {
println("InjectFromWorkflow &&&&&&&&&&&&&&&&& NO SPAN CTX")
return nil
}
println(fmt.Sprintf("InjectFromWorkflow **************************** %v", spanContext))
return t.writeSpanContextToHeader(spanContext, hw)
}
func (t *tracingContextPropagator) ExtractToWorkflow(
ctx workflow.Context,
hr workflow.HeaderReader,
) (workflow.Context, error) {
spanContext := t.readSpanContextFromHeader(hr)
if spanContext == nil {
println("ExtractToWorkflow &&&&&&&&&&&&&&&&& NO SPAN CTX")
return ctx, nil
}
println(fmt.Sprintf("ExtractToWorkflow **************************** %v", spanContext))
wctx := contextWithSpan(ctx, spanContext)
//if spanCtx, ok := wctx.Value(activeSpanContextKey).(opentracing.SpanContext); ok {
// println(fmt.Sprintf("ExtractToWorkflow **********True True****************** %v", spanCtx))
//}
//if parentSpan := opentracing.SpanFromContext(wctx); parentSpan != nil {
// println(fmt.Sprintf("ExtractToWorkflow **********True True****************** %v", spanCtx))
//}
return wctx, nil
}
func (t *tracingContextPropagator) writeSpanContextToHeader(
spanContext opentracing.SpanContext, hw workflow.HeaderWriter) error {
print("--- writeSpanContextToHeader: started")
carrier := opentracing.HTTPHeadersCarrier{}
err := t.tracer.Inject(spanContext,
opentracing.HTTPHeaders, carrier)
if err != nil {
print("--- writeSpanContextToHeader: failed Inject")
return err
}
tracerData := make(map[string][]string)
for k, v := range carrier {
tracerData[k] = v
}
tracerPayload, err := converter.GetDefaultDataConverter().ToPayload(&tracerData)
if err != nil {
print("--- writeSpanContextToHeader: failed to ToPayload(&tracerData)")
return err
}
hw.Set(tracerHeaderKey, tracerPayload)
return nil
}
func (t *tracingContextPropagator) readSpanContextFromHeader(hr workflow.HeaderReader) opentracing.SpanContext {
print("--- readSpanContextFromHeader: starteed")
tracerPayload, tracerExist := hr.Get(tracerHeaderKey)
if !tracerExist {
print("--- readSpanContextFromHeader: !tracerExist")
return nil
}
var tracerData map[string][]string
err := converter.GetDefaultDataConverter().
FromPayload(tracerPayload, &tracerData)
if err != nil {
print("--- readSpanContextFromHeader: failed FromPayload")
return nil
}
carrier := opentracing.HTTPHeadersCarrier{}
for k, v := range tracerData {
carrier[k] = v
}
spanContext, err := t.tracer.Extract(opentracing.HTTPHeaders, carrier)
if err != nil {
print("--- readSpanContextFromHeader: failed to extract")
return nil
}
return spanContext
}
func spanFromContext(ctx workflow.Context) opentracing.SpanContext {
val := ctx.Value(activeSpanContextKey)
if sp, ok := val.(opentracing.SpanContext); ok {
return sp
}
return nil
}
func contextWithSpan(ctx workflow.Context, spanContext opentracing.SpanContext) workflow.Context {
return workflow.WithValue(ctx, activeSpanContextKey, spanContext)
}