diff --git a/cliext/client.go b/cliext/client.go index db6506078..8512f34ba 100644 --- a/cliext/client.go +++ b/cliext/client.go @@ -29,6 +29,11 @@ type ClientOptionsBuilder struct { // Logger is the slog logger to use for the client. If set, it will be // wrapped with the SDK's structured logger adapter. Logger *slog.Logger + + // PayloadCodec is populated by Build when a remote payload codec is + // configured. Callers can use it to decode payloads outside the gRPC + // interceptor chain (e.g. payloads nested inside opaque proto bytes). + PayloadCodec converter.PayloadCodec } type oauthCredentials struct { @@ -248,13 +253,18 @@ func (b *ClientOptionsBuilder) Build(ctx context.Context) (client.Options, error if err != nil { return client.Options{}, fmt.Errorf("invalid codec headers: %w", err) } - interceptor, err := newPayloadCodecInterceptor( + payloadCodec := newRemotePayloadCodec( profile.Namespace, profile.Codec.Endpoint, profile.Codec.Auth, codecHeaders) + interceptor, err := converter.NewPayloadCodecGRPCClientInterceptor( + converter.PayloadCodecGRPCClientInterceptorOptions{ + Codecs: []converter.PayloadCodec{payloadCodec}, + }) if err != nil { return client.Options{}, fmt.Errorf("failed creating payload codec interceptor: %w", err) } clientOpts.ConnectionOptions.DialOptions = append( clientOpts.ConnectionOptions.DialOptions, grpc.WithChainUnaryInterceptor(interceptor)) + b.PayloadCodec = payloadCodec } // Set connect timeout for GetSystemInfo if provided. @@ -278,16 +288,17 @@ func parseKeyValuePairs(pairs []string) (map[string]string, error) { return result, nil } -// newPayloadCodecInterceptor creates a gRPC interceptor for remote payload codec. -func newPayloadCodecInterceptor( +// newRemotePayloadCodec constructs a remote payload codec from the configured endpoint, +// auth, and headers. The returned codec can be used both inside a gRPC interceptor and +// to decode payloads nested inside opaque proto bytes (e.g. system Nexus operation inputs). +func newRemotePayloadCodec( namespace string, codecEndpoint string, codecAuth string, codecHeaders map[string]string, -) (grpc.UnaryClientInterceptor, error) { +) converter.PayloadCodec { codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace) - - payloadCodec := converter.NewRemotePayloadCodec( + return converter.NewRemotePayloadCodec( converter.RemotePayloadCodecOptions{ Endpoint: codecEndpoint, ModifyRequest: func(req *http.Request) error { @@ -302,11 +313,6 @@ func newPayloadCodecInterceptor( }, }, ) - return converter.NewPayloadCodecGRPCClientInterceptor( - converter.PayloadCodecGRPCClientInterceptorOptions{ - Codecs: []converter.PayloadCodec{payloadCodec}, - }, - ) } func (c *oauthCredentials) getToken(ctx context.Context) (string, error) { diff --git a/internal/temporalcli/client.go b/internal/temporalcli/client.go index 751c61c06..230ff5a35 100644 --- a/internal/temporalcli/client.go +++ b/internal/temporalcli/client.go @@ -21,8 +21,17 @@ import ( // so often used by callers after this call to know the currently configured // namespace. func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, error) { + cl, _, err := dialClientWithCodec(cctx, c) + return cl, err +} + +// dialClientWithCodec is like [dialClient] but also returns the configured remote +// payload codec, or nil if no codec is configured. The codec is the same instance +// used by the gRPC interceptor; callers can use it to decode payloads nested inside +// opaque proto bytes (e.g. the request/response of a system Nexus operation). +func dialClientWithCodec(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, converter.PayloadCodec, error) { if cctx.RootCommand == nil { - return nil, fmt.Errorf("root command unexpectedly missing when dialing client") + return nil, nil, fmt.Errorf("root command unexpectedly missing when dialing client") } // Set default identity if not provided @@ -47,7 +56,7 @@ func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, e } clientOpts, err := builder.Build(cctx) if err != nil { - return nil, err + return nil, nil, err } // We do not put codec on data converter here, it is applied via @@ -78,14 +87,14 @@ func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, e cl, err := client.DialContext(dialCtx, clientOpts) if err != nil { - return nil, err + return nil, nil, err } // Since this namespace value is used by many commands after this call, // we are mutating it to be the derived one c.Namespace = clientOpts.Namespace - return cl, nil + return cl, builder.PayloadCodec, nil } func fixedHeaderOverrideInterceptor( diff --git a/internal/temporalcli/commands.system_nexus.go b/internal/temporalcli/commands.system_nexus.go new file mode 100644 index 000000000..346290935 --- /dev/null +++ b/internal/temporalcli/commands.system_nexus.go @@ -0,0 +1,51 @@ +package temporalcli + +import ( + "context" + + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/proxy" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" + "google.golang.org/protobuf/proto" +) + +// systemNexusOpKey identifies a system Nexus operation by its (endpoint, operation) pair. +type systemNexusOpKey struct { + Endpoint string + Operation string +} + +// systemNexusOpTypes maps a system Nexus operation to the proto request and response types +// whose bytes are serialized in NexusOperationScheduled.Input and NexusOperationCompleted.Result. +type systemNexusOpTypes struct { + // NewRequest returns a fresh, zero-valued instance of the request proto. + NewRequest func() proto.Message + // NewResponse returns a fresh, zero-valued instance of the response proto. + NewResponse func() proto.Message +} + +// systemNexusOps is the global registry of known system Nexus operations on the +// __temporal_system endpoint. Add new entries here as the server adds support for more +// system operations. The keys' Operation values must match what the server records in +// NexusOperationScheduledEventAttributes.Operation. +var systemNexusOps = map[systemNexusOpKey]systemNexusOpTypes{ + { + Endpoint: temporalSystemNexusEndpoint, + Operation: "SignalWithStartWorkflowExecution", + }: { + NewRequest: func() proto.Message { return &workflowservice.SignalWithStartWorkflowExecutionRequest{} }, + NewResponse: func() proto.Message { return &workflowservice.SignalWithStartWorkflowExecutionResponse{} }, + }, +} + +// decodePayloadsInProto walks a proto message and applies codec.Decode to every Payload +// found inside it (including nested messages). The message is mutated in place. +func decodePayloadsInProto(ctx context.Context, msg proto.Message, codec converter.PayloadCodec) error { + return proxy.VisitPayloads(ctx, msg, proxy.VisitPayloadsOptions{ + SkipSearchAttributes: true, + Visitor: func(_ *proxy.VisitPayloadsContext, payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + return codec.Decode(payloads) + }, + }) +} diff --git a/internal/temporalcli/commands.system_nexus_test.go b/internal/temporalcli/commands.system_nexus_test.go new file mode 100644 index 000000000..ad160df2e --- /dev/null +++ b/internal/temporalcli/commands.system_nexus_test.go @@ -0,0 +1,332 @@ +package temporalcli + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/temporalproto" + "go.temporal.io/api/workflowservice/v1" + "google.golang.org/protobuf/proto" +) + +// markingCodec is a test [converter.PayloadCodec] that prefixes every payload's +// data with "decoded:" on Decode and tracks how many payloads it saw. It is used +// to verify that the codec is actually invoked on payloads nested inside opaque +// system Nexus operation bytes. +type markingCodec struct { + decodeCalls int +} + +func (c *markingCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + return payloads, nil +} + +func (c *markingCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + out := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + c.decodeCalls++ + out[i] = &commonpb.Payload{ + Metadata: p.Metadata, + Data: append([]byte("decoded:"), p.Data...), + } + } + return out, nil +} + +// failingCodec always returns an error from Decode; used to verify error propagation. +type failingCodec struct{} + +func (failingCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + return payloads, nil +} + +func (failingCodec) Decode(_ []*commonpb.Payload) ([]*commonpb.Payload, error) { + return nil, fmt.Errorf("codec decode failure for testing") +} + +func signalWithStartRequestPayload(t *testing.T, req *workflowservice.SignalWithStartWorkflowExecutionRequest) *commonpb.Payload { + t.Helper() + data, err := proto.Marshal(req) + require.NoError(t, err) + return &commonpb.Payload{ + Metadata: map[string][]byte{"encoding": []byte("binary/protobuf")}, + Data: data, + } +} + +func signalWithStartResponsePayload(t *testing.T, resp *workflowservice.SignalWithStartWorkflowExecutionResponse) *commonpb.Payload { + t.Helper() + data, err := proto.Marshal(resp) + require.NoError(t, err) + return &commonpb.Payload{ + Metadata: map[string][]byte{"encoding": []byte("binary/protobuf")}, + Data: data, + } +} + +func TestUnwrapAndInjectRequest_NilPayloadIsNoOp(t *testing.T) { + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.unwrapAndInjectRequest( + temporalSystemNexusEndpoint, "SignalWithStartWorkflowExecution", + nil, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Empty(t, fields, "nil payload should not inject anything") +} + +func TestUnwrapAndInjectRequest_UnknownOperationIsNoOp(t *testing.T) { + p := &commonpb.Payload{Data: []byte("ignored")} + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.unwrapAndInjectRequest( + temporalSystemNexusEndpoint, "NotARealOperation", + p, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Empty(t, fields, "unknown operation should be a no-op") +} + +func TestUnwrapAndInjectRequest_UnknownEndpointIsNoOp(t *testing.T) { + p := &commonpb.Payload{Data: []byte("ignored")} + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.unwrapAndInjectRequest( + "some-user-endpoint", "SignalWithStartWorkflowExecution", + p, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Empty(t, fields, "non-system endpoint should be a no-op even if operation name matches") +} + +func TestUnwrapAndInjectRequest_BadProtoBytesReturnsError(t *testing.T) { + p := &commonpb.Payload{Data: []byte{0xff, 0xff, 0xff}} + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + err := iter.unwrapAndInjectRequest( + temporalSystemNexusEndpoint, "SignalWithStartWorkflowExecution", + p, fields, temporalproto.CustomJSONMarshalOptions{}) + require.Error(t, err) + require.ErrorContains(t, err, "failed unmarshaling system nexus payload") +} + +func TestUnwrapAndInjectResponse_NilPayloadIsNoOp(t *testing.T) { + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.unwrapAndInjectResponse( + temporalSystemNexusEndpoint, "SignalWithStartWorkflowExecution", + nil, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Empty(t, fields) +} + +func TestUnwrapAndInjectResponse_UnknownOperationIsNoOp(t *testing.T) { + p := &commonpb.Payload{Data: []byte("ignored")} + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.unwrapAndInjectResponse( + temporalSystemNexusEndpoint, "NotARealOperation", + p, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Empty(t, fields) +} + +func TestUnwrapAndInjectResponse_BadProtoBytesReturnsError(t *testing.T) { + p := &commonpb.Payload{Data: []byte{0xff, 0xff, 0xff}} + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + err := iter.unwrapAndInjectResponse( + temporalSystemNexusEndpoint, "SignalWithStartWorkflowExecution", + p, fields, temporalproto.CustomJSONMarshalOptions{}) + require.Error(t, err) + require.ErrorContains(t, err, "failed unmarshaling system nexus payload") +} + +func TestUnwrapAndInjectRequest_DecodesAllNestedPayloads(t *testing.T) { + // The Input/SignalInput fields hold the user-supplied payloads, which the codec + // should be applied to. The outer payload bytes are raw proto and are not codec-encoded. + inner1 := &commonpb.Payload{Metadata: map[string][]byte{"encoding": []byte("binary/plain")}, Data: []byte("hello")} + inner2 := &commonpb.Payload{Metadata: map[string][]byte{"encoding": []byte("binary/plain")}, Data: []byte("world")} + signalInner := &commonpb.Payload{Metadata: map[string][]byte{"encoding": []byte("binary/plain")}, Data: []byte("signal-arg")} + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: "ns", + WorkflowId: "wf", + Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{inner1, inner2}}, + SignalInput: &commonpb.Payloads{Payloads: []*commonpb.Payload{signalInner}}, + } + p := signalWithStartRequestPayload(t, req) + + codec := &markingCodec{} + iter := &structuredHistoryIter{ctx: context.Background(), codec: codec} + fields := map[string]any{} + require.NoError(t, iter.unwrapAndInjectRequest( + temporalSystemNexusEndpoint, "SignalWithStartWorkflowExecution", + p, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Equal(t, 3, codec.decodeCalls, "codec should have been invoked once per nested payload") + + unwrapped, ok := fields["unwrappedInput"].(map[string]any) + require.True(t, ok) + require.Equal(t, "ns", unwrapped["namespace"]) + require.Equal(t, "wf", unwrapped["workflowId"]) +} + +func TestUnwrapAndInjectRequest_CodecErrorPropagates(t *testing.T) { + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{{Data: []byte("x")}}}, + } + p := signalWithStartRequestPayload(t, req) + iter := &structuredHistoryIter{ctx: context.Background(), codec: failingCodec{}} + fields := map[string]any{} + err := iter.unwrapAndInjectRequest( + temporalSystemNexusEndpoint, "SignalWithStartWorkflowExecution", + p, fields, temporalproto.CustomJSONMarshalOptions{}) + require.Error(t, err) + require.Equal(t, "failed decoding payloads in system nexus payload: codec decode failure for testing", err.Error()) +} + +func TestDecodePayloadsInProto_VisitsAllPayloads(t *testing.T) { + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{ + {Data: []byte("a")}, + {Data: []byte("b")}, + }}, + SignalInput: &commonpb.Payloads{Payloads: []*commonpb.Payload{ + {Data: []byte("c")}, + }}, + } + codec := &markingCodec{} + require.NoError(t, decodePayloadsInProto(context.Background(), req, codec)) + require.Equal(t, 3, codec.decodeCalls) + require.Equal(t, []byte("decoded:a"), req.Input.Payloads[0].Data) + require.Equal(t, []byte("decoded:b"), req.Input.Payloads[1].Data) + require.Equal(t, []byte("decoded:c"), req.SignalInput.Payloads[0].Data) +} + +func TestInjectSystemNexusUnwrapped_ScheduledKnownOp(t *testing.T) { + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: "ns", + WorkflowId: "wf-xyz", + SignalName: "ping", + } + event := &historypb.HistoryEvent{ + EventId: 5, + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + Endpoint: temporalSystemNexusEndpoint, + Operation: "SignalWithStartWorkflowExecution", + Input: signalWithStartRequestPayload(t, req), + }, + }, + } + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.injectSystemNexusUnwrapped(event, fields, temporalproto.CustomJSONMarshalOptions{})) + + unwrapped, ok := fields["unwrappedInput"].(map[string]any) + require.True(t, ok, "expected unwrappedInput map to be set") + require.Equal(t, "ns", unwrapped["namespace"]) + require.Equal(t, "wf-xyz", unwrapped["workflowId"]) + require.Equal(t, "ping", unwrapped["signalName"]) +} + +func TestInjectSystemNexusUnwrapped_ScheduledUnknownEndpointSkipped(t *testing.T) { + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{Namespace: "ns"} + event := &historypb.HistoryEvent{ + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + Endpoint: "user-endpoint", + Operation: "SignalWithStartWorkflowExecution", + Input: signalWithStartRequestPayload(t, req), + }, + }, + } + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.injectSystemNexusUnwrapped(event, fields, temporalproto.CustomJSONMarshalOptions{})) + _, ok := fields["unwrappedInput"] + require.False(t, ok, "non-system endpoint must not produce unwrappedInput") +} + +func TestInjectSystemNexusUnwrapped_CompletedUsesPriorScheduled(t *testing.T) { + resp := &workflowservice.SignalWithStartWorkflowExecutionResponse{ + RunId: "run-abc", + Started: true, + } + completed := &historypb.HistoryEvent{ + EventId: 6, + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_COMPLETED, + Attributes: &historypb.HistoryEvent_NexusOperationCompletedEventAttributes{ + NexusOperationCompletedEventAttributes: &historypb.NexusOperationCompletedEventAttributes{ + ScheduledEventId: 5, + Result: signalWithStartResponsePayload(t, resp), + }, + }, + } + iter := &structuredHistoryIter{ + ctx: context.Background(), + systemNexusOps: map[int64]string{5: "SignalWithStartWorkflowExecution"}, + } + fields := map[string]any{} + require.NoError(t, iter.injectSystemNexusUnwrapped(completed, fields, temporalproto.CustomJSONMarshalOptions{})) + + unwrapped, ok := fields["unwrappedResult"].(map[string]any) + require.True(t, ok, "expected unwrappedResult map to be set") + require.Equal(t, "run-abc", unwrapped["runId"]) + require.Equal(t, true, unwrapped["started"]) +} + +func TestInjectSystemNexusUnwrapped_CompletedWithoutPriorScheduledSkipped(t *testing.T) { + completed := &historypb.HistoryEvent{ + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_COMPLETED, + Attributes: &historypb.HistoryEvent_NexusOperationCompletedEventAttributes{ + NexusOperationCompletedEventAttributes: &historypb.NexusOperationCompletedEventAttributes{ + ScheduledEventId: 5, + Result: &commonpb.Payload{Data: []byte("garbage")}, + }, + }, + } + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.injectSystemNexusUnwrapped(completed, fields, temporalproto.CustomJSONMarshalOptions{})) + _, ok := fields["unwrappedResult"] + require.False(t, ok, "no prior scheduled means we don't know the op, so no unwrap") +} + +func TestInjectSystemNexusUnwrapped_NonNexusEventNoOp(t *testing.T) { + event := &historypb.HistoryEvent{ + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{ + WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{}, + }, + } + iter := &structuredHistoryIter{ctx: context.Background()} + fields := map[string]any{} + require.NoError(t, iter.injectSystemNexusUnwrapped(event, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Empty(t, fields) +} + +func TestInjectSystemNexusUnwrapped_AppliesCodecToScheduledInput(t *testing.T) { + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ + WorkflowId: "wf", + Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{ + {Data: []byte("inner-input")}, + }}, + SignalInput: &commonpb.Payloads{Payloads: []*commonpb.Payload{ + {Data: []byte("inner-signal")}, + }}, + } + event := &historypb.HistoryEvent{ + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + Endpoint: temporalSystemNexusEndpoint, + Operation: "SignalWithStartWorkflowExecution", + Input: signalWithStartRequestPayload(t, req), + }, + }, + } + codec := &markingCodec{} + iter := &structuredHistoryIter{ctx: context.Background(), codec: codec} + fields := map[string]any{} + require.NoError(t, iter.injectSystemNexusUnwrapped(event, fields, temporalproto.CustomJSONMarshalOptions{})) + require.Equal(t, 2, codec.decodeCalls, "codec should run on both nested payloads") +} diff --git a/internal/temporalcli/commands.workflow_exec.go b/internal/temporalcli/commands.workflow_exec.go index e115a5303..cef46fdf2 100644 --- a/internal/temporalcli/commands.workflow_exec.go +++ b/internal/temporalcli/commands.workflow_exec.go @@ -25,7 +25,9 @@ import ( "go.temporal.io/api/temporalproto" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" ) @@ -40,7 +42,7 @@ func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) } func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string) error { - cl, err := dialClient(cctx, &c.Parent.ClientOptions) + cl, codec, err := dialClientWithCodec(cctx, &c.Parent.ClientOptions) if err != nil { return err } @@ -63,6 +65,7 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string runID: run.GetRunID(), includeDetails: c.Detailed, follow: true, + codec: codec, } if err := iter.print(cctx); err != nil && cctx.Err() == nil { return fmt.Errorf("displaying history failed: %w", err) @@ -704,6 +707,68 @@ func coloredEventType(e enums.EventType) string { return fn(e.String()) } +const temporalSystemNexusEndpoint = "__temporal_system" + +// systemNexusOpDisplayName returns a display name for Nexus operation events on the +// "__temporal_system" endpoint, replacing the generic NexusOperation* name with the +// actual operation name + event-type suffix. Returns "" for all other events. +// Populates s.systemNexusOps when it encounters a qualifying Scheduled event. +func (s *structuredHistoryIter) systemNexusOpDisplayName(event *history.HistoryEvent) string { + if s.systemNexusOps == nil { + s.systemNexusOps = make(map[int64]string) + } + switch event.EventType { + case enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED: + attr := event.GetNexusOperationScheduledEventAttributes() + if attr == nil || attr.Endpoint != temporalSystemNexusEndpoint { + return "" + } + s.systemNexusOps[event.EventId] = attr.Operation + return attr.Operation + "Scheduled" + case enums.EVENT_TYPE_NEXUS_OPERATION_STARTED: + attr := event.GetNexusOperationStartedEventAttributes() + if attr == nil { + return "" + } + if op, ok := s.systemNexusOps[attr.ScheduledEventId]; ok { + return op + "Started" + } + case enums.EVENT_TYPE_NEXUS_OPERATION_COMPLETED: + attr := event.GetNexusOperationCompletedEventAttributes() + if attr == nil { + return "" + } + if op, ok := s.systemNexusOps[attr.ScheduledEventId]; ok { + return op + "Completed" + } + case enums.EVENT_TYPE_NEXUS_OPERATION_FAILED: + attr := event.GetNexusOperationFailedEventAttributes() + if attr == nil { + return "" + } + if op, ok := s.systemNexusOps[attr.ScheduledEventId]; ok { + return op + "Failed" + } + case enums.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT: + attr := event.GetNexusOperationTimedOutEventAttributes() + if attr == nil { + return "" + } + if op, ok := s.systemNexusOps[attr.ScheduledEventId]; ok { + return op + "TimedOut" + } + case enums.EVENT_TYPE_NEXUS_OPERATION_CANCELED: + attr := event.GetNexusOperationCanceledEventAttributes() + if attr == nil { + return "" + } + if op, ok := s.systemNexusOps[attr.ScheduledEventId]; ok { + return op + "Canceled" + } + } + return "" +} + type structuredHistoryIter struct { ctx context.Context client client.Client @@ -725,6 +790,14 @@ type structuredHistoryIter struct { reverseBuf []*history.HistoryEvent reverseNextToken []byte reverseStarted bool + + // maps NexusOperationScheduled eventId → operation name for __temporal_system endpoint events + systemNexusOps map[int64]string + + // codec is the remote payload codec configured for this client, or nil if none. Used to + // decode payloads nested inside system Nexus operation request/response bytes so they + // can be rendered alongside the rest of the event fields. + codec converter.PayloadCodec } func (s *structuredHistoryIter) print(cctx *CommandContext) error { @@ -751,7 +824,11 @@ func (s *structuredHistoryIter) print(cctx *CommandContext) error { first = false // Print section heading - cctx.Printer.Printlnf("--------------- [%v] %v ---------------", event.EventId, event.EventType) + eventTypeName := event.EventType.String() + if name := s.systemNexusOpDisplayName(event); name != "" { + eventTypeName = name + } + cctx.Printer.Printlnf("--------------- [%v] %v ---------------", event.EventId, eventTypeName) // Convert the event to dot-delimited-field/value and print one per line fields, err := s.flattenFields(cctx, event) if err != nil { @@ -786,10 +863,14 @@ func (s *structuredHistoryIter) Next() (any, error) { return nil, nil } // Build data + typeName := s.systemNexusOpDisplayName(event) + if typeName == "" { + typeName = coloredEventType(event.EventType) + } data := structuredHistoryEvent{ ID: event.EventId, Time: event.EventTime.AsTime().Format(time.RFC3339), - Type: coloredEventType(event.EventType), + Type: typeName, } // Follow continue as new (forward only; reverse traversal stays within the requested run) @@ -900,6 +981,12 @@ func (s *structuredHistoryIter) flattenFields( delete(fieldsMap, k) } } + // For system Nexus operation events, deserialize the request/response payload bytes + // into the typed proto, decode any payloads nested inside via the codec, and merge + // the decoded view into the output under "unwrappedInput" / "unwrappedResult". + if err := s.injectSystemNexusUnwrapped(event, fieldsMap, opts); err != nil { + return nil, err + } // Flatten JSON map and sort fields, err := s.flattenJSONValue(nil, "", fieldsMap) if err != nil { @@ -909,6 +996,101 @@ func (s *structuredHistoryIter) flattenFields( return fields, nil } +// injectSystemNexusUnwrapped, if the given event is a known system Nexus operation, +// deserializes the underlying request (on Scheduled) or response (on Completed) proto, +// decodes any payloads nested inside via the codec, and inserts the resulting JSON +// representation into fieldsMap under "unwrappedInput" / "unwrappedResult". +func (s *structuredHistoryIter) injectSystemNexusUnwrapped( + event *history.HistoryEvent, + fieldsMap map[string]any, + opts temporalproto.CustomJSONMarshalOptions, +) error { + switch event.EventType { + case enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED: + attr := event.GetNexusOperationScheduledEventAttributes() + if attr == nil { + return nil + } + return s.unwrapAndInjectRequest(attr.GetEndpoint(), attr.GetOperation(), attr.GetInput(), fieldsMap, opts) + case enums.EVENT_TYPE_NEXUS_OPERATION_COMPLETED: + attr := event.GetNexusOperationCompletedEventAttributes() + if attr == nil { + return nil + } + op, ok := s.systemNexusOps[attr.GetScheduledEventId()] + if !ok { + return nil + } + return s.unwrapAndInjectResponse(temporalSystemNexusEndpoint, op, attr.GetResult(), fieldsMap, opts) + } + return nil +} + +// unwrapAndInjectRequest looks up the registered request proto for (endpoint, operation), +// then injects the decoded view under "unwrappedInput". No-op for unregistered ops. +func (s *structuredHistoryIter) unwrapAndInjectRequest( + endpoint, operation string, + payload *commonpb.Payload, + fieldsMap map[string]any, + opts temporalproto.CustomJSONMarshalOptions, +) error { + types, ok := systemNexusOps[systemNexusOpKey{Endpoint: endpoint, Operation: operation}] + if !ok { + return nil + } + return s.unwrapAndInject(types.NewRequest(), payload, fieldsMap, "unwrappedInput", opts) +} + +// unwrapAndInjectResponse looks up the registered response proto for (endpoint, operation), +// then injects the decoded view under "unwrappedResult". No-op for unregistered ops. +func (s *structuredHistoryIter) unwrapAndInjectResponse( + endpoint, operation string, + payload *commonpb.Payload, + fieldsMap map[string]any, + opts temporalproto.CustomJSONMarshalOptions, +) error { + types, ok := systemNexusOps[systemNexusOpKey{Endpoint: endpoint, Operation: operation}] + if !ok { + return nil + } + return s.unwrapAndInject(types.NewResponse(), payload, fieldsMap, "unwrappedResult", opts) +} + +// unwrapAndInject is the shared body: unmarshal payload bytes into the supplied proto, +// decode any payloads nested inside via the codec, marshal back to JSON, and inject the +// resulting map into fieldsMap[key]. A nil payload is a no-op. +func (s *structuredHistoryIter) unwrapAndInject( + msg proto.Message, + payload *commonpb.Payload, + fieldsMap map[string]any, + key string, + opts temporalproto.CustomJSONMarshalOptions, +) error { + if payload == nil { + return nil + } + if err := proto.Unmarshal(payload.Data, msg); err != nil { + return fmt.Errorf("failed unmarshaling system nexus payload: %w", err) + } + if s.codec != nil { + if err := decodePayloadsInProto(s.ctx, msg, s.codec); err != nil { + return fmt.Errorf("failed decoding payloads in system nexus payload: %w", err) + } + } + unwrappedJSON, err := opts.Marshal(msg) + if err != nil { + return fmt.Errorf("failed marshaling unwrapped system nexus payload: %w", err) + } + dec := json.NewDecoder(bytes.NewReader(unwrappedJSON)) + dec.UseNumber() + var unwrappedMap map[string]any + if err := dec.Decode(&unwrappedMap); err != nil { + return fmt.Errorf("failed unmarshaling unwrapped JSON for system nexus payload: %w", err) + } + fieldsMap[key] = unwrappedMap + return nil +} + func (s *structuredHistoryIter) flattenJSONValue( to []eventFieldValue, field string, diff --git a/internal/temporalcli/commands.workflow_show_test.go b/internal/temporalcli/commands.workflow_show_test.go new file mode 100644 index 000000000..0f70449a5 --- /dev/null +++ b/internal/temporalcli/commands.workflow_show_test.go @@ -0,0 +1,173 @@ +package temporalcli + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" +) + +func TestSystemNexusOpDisplayName(t *testing.T) { + const op = "SignalWithStartWorkflowExecution" // present in the global systemNexusOps registry + const futureOp = "FutureUnregisteredOperation" // intentionally NOT in the global registry + + newScheduled := func(eventID int64, endpoint, operation string) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + Endpoint: endpoint, Operation: operation, + }, + }, + } + } + // terminalEvent returns a NexusOperation* event of the given type referencing scheduledEventID. + // Returns nil for unsupported event types. + terminalEvent := func(eventType enums.EventType, scheduledEventID int64) *historypb.HistoryEvent { + e := &historypb.HistoryEvent{EventId: scheduledEventID + 1, EventType: eventType} + switch eventType { + case enums.EVENT_TYPE_NEXUS_OPERATION_STARTED: + e.Attributes = &historypb.HistoryEvent_NexusOperationStartedEventAttributes{ + NexusOperationStartedEventAttributes: &historypb.NexusOperationStartedEventAttributes{ScheduledEventId: scheduledEventID}, + } + case enums.EVENT_TYPE_NEXUS_OPERATION_COMPLETED: + e.Attributes = &historypb.HistoryEvent_NexusOperationCompletedEventAttributes{ + NexusOperationCompletedEventAttributes: &historypb.NexusOperationCompletedEventAttributes{ScheduledEventId: scheduledEventID}, + } + case enums.EVENT_TYPE_NEXUS_OPERATION_FAILED: + e.Attributes = &historypb.HistoryEvent_NexusOperationFailedEventAttributes{ + NexusOperationFailedEventAttributes: &historypb.NexusOperationFailedEventAttributes{ScheduledEventId: scheduledEventID}, + } + case enums.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT: + e.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{ + NexusOperationTimedOutEventAttributes: &historypb.NexusOperationTimedOutEventAttributes{ScheduledEventId: scheduledEventID}, + } + case enums.EVENT_TYPE_NEXUS_OPERATION_CANCELED: + e.Attributes = &historypb.HistoryEvent_NexusOperationCanceledEventAttributes{ + NexusOperationCanceledEventAttributes: &historypb.NexusOperationCanceledEventAttributes{ScheduledEventId: scheduledEventID}, + } + } + return e + } + + // priorScheduled returns a setup that creates an iter and pre-processes the given Scheduled + // event. In forward traversal this matches the natural event order; in reverse traversal it + // simulates the iter having pre-scanned the buffered history. + priorScheduled := func(scheduled *historypb.HistoryEvent) func(reverse bool) *structuredHistoryIter { + return func(reverse bool) *structuredHistoryIter { + s := &structuredHistoryIter{reverse: reverse} + s.systemNexusOpDisplayName(scheduled) + return s + } + } + empty := func(reverse bool) *structuredHistoryIter { + return &structuredHistoryIter{reverse: reverse} + } + + schedSystem := newScheduled(5, temporalSystemNexusEndpoint, op) + schedSystemFutureOp := newScheduled(5, temporalSystemNexusEndpoint, futureOp) + schedNonSystem := newScheduled(7, "some-other-endpoint", op) + + tests := []struct { + name string + setup func(reverse bool) *structuredHistoryIter + event *historypb.HistoryEvent + wantName string + wantEmpty bool + }{ + // Scheduled cases + { + name: "scheduled system endpoint records op and returns name", + setup: empty, + event: schedSystem, + wantName: op + "Scheduled", + }, + { + name: "scheduled non-system endpoint returns empty", + setup: empty, + event: schedNonSystem, + wantEmpty: true, + }, + { + // Display name does not consult the global systemNexusOps registry; any op on the + // __temporal_system endpoint produces a display name. (Payload unwrap-and-inject is + // the layer that requires registry membership; see TestUnwrapAndInjectRequest_*.) + name: "scheduled system endpoint with op not in registry still returns name", + setup: empty, + event: schedSystemFutureOp, + wantName: futureOp + "Scheduled", + }, + // Terminal cases with a prior Scheduled in the instance map + { + name: "started after system scheduled", + setup: priorScheduled(schedSystem), + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_STARTED, 5), + wantName: op + "Started", + }, + { + // Confirms terminal events use the iter's instance map (which captured the unregistered + // op from its Scheduled event), regardless of global-registry membership. + name: "started after system scheduled with op not in registry", + setup: priorScheduled(schedSystemFutureOp), + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_STARTED, 5), + wantName: futureOp + "Started", + }, + { + name: "completed after system scheduled", + setup: priorScheduled(schedSystem), + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_COMPLETED, 5), + wantName: op + "Completed", + }, + { + name: "completed with no prior scheduled returns empty", + setup: empty, + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_COMPLETED, 5), + wantEmpty: true, + }, + { + name: "failed after system scheduled", + setup: priorScheduled(schedSystem), + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_FAILED, 5), + wantName: op + "Failed", + }, + { + name: "timed out after system scheduled", + setup: priorScheduled(schedSystem), + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, 5), + wantName: op + "TimedOut", + }, + { + name: "canceled after system scheduled", + setup: priorScheduled(schedSystem), + event: terminalEvent(enums.EVENT_TYPE_NEXUS_OPERATION_CANCELED, 5), + wantName: op + "Canceled", + }, + } + + // The function is direction-agnostic: given the same prior state, it returns the same + // display name whether iter.reverse is true or false. Running each case in both modes + // pins that contract so a future change can't silently introduce direction-sensitive + // branching without breaking these tests. + for _, reverse := range []bool{false, true} { + modeName := "forward" + if reverse { + modeName = "reverse" + } + t.Run(modeName, func(t *testing.T) { + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + iter := tc.setup(reverse) + require.Equal(t, reverse, iter.reverse, "test setup must mark iter reverse=%v", reverse) + name := iter.systemNexusOpDisplayName(tc.event) + if tc.wantEmpty { + require.Empty(t, name) + } else { + require.Equal(t, tc.wantName, name) + } + }) + } + }) + } +} diff --git a/internal/temporalcli/commands.workflow_view.go b/internal/temporalcli/commands.workflow_view.go index 9d142c3c8..a1487d906 100644 --- a/internal/temporalcli/commands.workflow_view.go +++ b/internal/temporalcli/commands.workflow_view.go @@ -324,8 +324,14 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin _ = cctx.Printer.PrintStructured(resp.PendingChildren, printer.StructuredOptions{}) } - cctx.Printer.Println(color.MagentaString("Pending Nexus Operations: %v", len(resp.PendingNexusOperations))) - if len(resp.PendingNexusOperations) > 0 { + var pendingNexusOps []*workflow.PendingNexusOperationInfo + for _, op := range resp.PendingNexusOperations { + if op.GetEndpoint() != temporalSystemNexusEndpoint { + pendingNexusOps = append(pendingNexusOps, op) + } + } + cctx.Printer.Println(color.MagentaString("Pending Nexus Operations: %v", len(pendingNexusOps))) + if len(pendingNexusOps) > 0 { cctx.Printer.Println() ops := make([]struct { Endpoint string @@ -348,8 +354,8 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin CancelationLastAttemptCompleteTime time.Time `cli:",cardOmitEmpty"` CancelationLastAttemptFailure *failure.Failure `cli:",cardOmitEmpty"` CancelationBlockedReason string `cli:",cardOmitEmpty"` - }, len(resp.PendingNexusOperations)) - for i, op := range resp.PendingNexusOperations { + }, len(pendingNexusOps)) + for i, op := range pendingNexusOps { ops[i].Endpoint = op.GetEndpoint() ops[i].Service = op.GetService() ops[i].Operation = op.GetOperation() @@ -558,7 +564,7 @@ func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) erro } // Call describe - cl, err := dialClient(cctx, &c.Parent.ClientOptions) + cl, codec, err := dialClientWithCodec(cctx, &c.Parent.ClientOptions) if err != nil { return err } @@ -574,6 +580,7 @@ func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) erro includeDetails: c.Detailed, follow: c.Follow, reverse: c.Reverse, + codec: codec, } if !cctx.JSONOutput { cctx.Printer.Println(color.MagentaString("Progress:"))