Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'temporalio/common_enums'
require 'temporalio/priority'

module Temporalio
class Client
Expand All @@ -24,6 +25,7 @@ class WithStartWorkflowOperation
:memo,
:search_attributes,
:start_delay,
:priority,
:arg_hints,
:result_hint,
:headers
Expand All @@ -40,6 +42,8 @@ class Options; end # rubocop:disable Lint/EmptyClass
#
# Note, for {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow},
# `id_conflict_policy` is required.
#
# @param priority [Priority] Priority of the workflow that may be started. This is currently experimental.
def initialize(
workflow,
*args,
Expand All @@ -57,6 +61,7 @@ def initialize(
memo: nil,
search_attributes: nil,
start_delay: nil,
priority: Priority.default,
arg_hints: nil,
result_hint: nil,
headers: {}
Expand All @@ -80,6 +85,7 @@ def initialize(
memo:,
search_attributes:,
start_delay:,
priority:,
arg_hints: arg_hints || defn_arg_hints,
result_hint: result_hint || defn_result_hint,
headers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ def _start_workflow_request_from_with_start_options(klass, start_options)
user_metadata: ProtoUtils.to_user_metadata(
start_options.static_summary, start_options.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(start_options.headers, @client.data_converter)
header: ProtoUtils.headers_to_proto(start_options.headers, @client.data_converter),
priority: start_options.priority._to_proto
)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Temporalio
attr_reader memo: Hash[String | Symbol, Object?]?
attr_reader search_attributes: SearchAttributes?
attr_reader start_delay: duration?
attr_reader priority: Priority
attr_reader arg_hints: Array[Object]?
attr_reader result_hint: Object?
attr_reader headers: Hash[String, Object?]
Expand All @@ -39,6 +40,7 @@ module Temporalio
memo: Hash[String | Symbol, Object?]?,
search_attributes: SearchAttributes?,
start_delay: duration?,
priority: Priority,
arg_hints: Array[Object]?,
result_hint: Object?,
headers: Hash[String, Object?]
Expand All @@ -64,6 +66,7 @@ module Temporalio
?memo: Hash[String | Symbol, Object?]?,
?search_attributes: SearchAttributes?,
?start_delay: duration?,
?priority: Priority,
?arg_hints: Array[Object]?,
?result_hint: Object?,
?headers: Hash[String, Object?]
Expand All @@ -76,4 +79,4 @@ module Temporalio
def _mark_used: -> void
end
end
end
end
16 changes: 14 additions & 2 deletions temporalio/test/worker_workflow_handler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,8 @@ def test_update_with_start_simple
id = "wf-#{SecureRandom.uuid}"
start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new(
UpdateWithStartWorkflow, 123,
id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL
id:, task_queue: worker.task_queue, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL,
priority: Temporalio::Priority.new(priority_key: 2, fairness_key: 'update-start', fairness_weight: 0.7)
)
# Run and confirm result of update is pre-workflow-execute
assert_equal 456, env.client.execute_update_with_start_workflow(
Expand All @@ -723,6 +724,11 @@ def test_update_with_start_simple
# Confirm query is total
handle = start_workflow_operation.workflow_handle
assert_equal 579, handle.query(UpdateWithStartWorkflow.counter)
started_event = handle.fetch_history_events.find(&:workflow_execution_started_event_attributes)
priority = started_event.workflow_execution_started_event_attributes.priority
assert_equal 2, priority.priority_key
assert_equal 'update-start', priority.fairness_key
assert_in_delta 0.7, priority.fairness_weight, 0.001

# Update with start 5 more times
5.times do
Expand Down Expand Up @@ -902,7 +908,8 @@ def test_signal_with_start
id = "wf-#{SecureRandom.uuid}"
start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new(
SignalWithStartWorkflow, 'workflow-start',
id:, task_queue: worker.task_queue
id:, task_queue: worker.task_queue,
priority: Temporalio::Priority.new(priority_key: 4, fairness_key: 'signal-start', fairness_weight: 1.3)
)
handle = env.client.signal_with_start_workflow(
SignalWithStartWorkflow.add_event, 'signal', start_workflow_operation:
Expand All @@ -911,6 +918,11 @@ def test_signal_with_start
assert_same handle, start_workflow_operation.workflow_handle
# Confirm signal event came first
assert_equal %w[signal workflow-start], handle.query(SignalWithStartWorkflow.events)
started_event = handle.fetch_history_events.find(&:workflow_execution_started_event_attributes)
priority = started_event.workflow_execution_started_event_attributes.priority
assert_equal 4, priority.priority_key
assert_equal 'signal-start', priority.fairness_key
assert_in_delta 1.3, priority.fairness_weight, 0.001

# Signal with start 3 more times
3.times do |i|
Expand Down
Loading