diff --git a/temporalio/lib/temporalio/common_enums.rb b/temporalio/lib/temporalio/common_enums.rb index 8b7376b6..e6bdd6a2 100644 --- a/temporalio/lib/temporalio/common_enums.rb +++ b/temporalio/lib/temporalio/common_enums.rb @@ -50,6 +50,22 @@ module ContinueAsNewVersioningBehavior # workflow code. AUTO_UPGRADE = Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE + # Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's + # Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes, + # the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping + # Version by the time that the first workflow task is dispatched, it will be sent to the Current Version. + # + # It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because + # this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow + # is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which + # may be the Current Version instead of the Ramping Version. + # + # Note that if the workflow being continued has a Pinned override, that override will be inherited by the + # new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new + # command. Versioning Override always takes precedence until it's removed manually via + # UpdateWorkflowExecutionOptions. + USE_RAMPING_VERSION = + Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION end # Specifies why the server suggests continue-as-new. This is currently experimental. diff --git a/temporalio/lib/temporalio/workflow.rb b/temporalio/lib/temporalio/workflow.rb index 481f4c13..9681e0fd 100644 --- a/temporalio/lib/temporalio/workflow.rb +++ b/temporalio/lib/temporalio/workflow.rb @@ -672,7 +672,8 @@ class ContinueAsNewError < Error # current workflow. # @param initial_versioning_behavior [ContinueAsNewVersioningBehavior::enum, nil] Versioning behavior for the # first task of the new run. Set to {ContinueAsNewVersioningBehavior::AUTO_UPGRADE} to upgrade a pinned workflow - # to the latest version on continue-as-new. This is currently experimental. + # to the latest version on continue-as-new or {ContinueAsNewVersioningBehavior::USE_RAMPING_VERSION} to start on + # the task queue's Ramping Version. This is currently experimental. def initialize( *args, workflow: nil, diff --git a/temporalio/sig/temporalio/common_enums.rbs b/temporalio/sig/temporalio/common_enums.rbs index 1ae034e1..9c16555e 100644 --- a/temporalio/sig/temporalio/common_enums.rbs +++ b/temporalio/sig/temporalio/common_enums.rbs @@ -22,6 +22,7 @@ module Temporalio UNSPECIFIED: enum AUTO_UPGRADE: enum + USE_RAMPING_VERSION: enum end module SuggestContinueAsNewReason @@ -40,4 +41,4 @@ module Temporalio PINNED: enum AUTO_UPGRADE: enum end -end \ No newline at end of file +end diff --git a/temporalio/test/sig/worker_workflow_versioning_test.rbs b/temporalio/test/sig/worker_workflow_versioning_test.rbs index 5dde0fdd..2d3a50e9 100644 --- a/temporalio/test/sig/worker_workflow_versioning_test.rbs +++ b/temporalio/test/sig/worker_workflow_versioning_test.rbs @@ -4,5 +4,5 @@ class WorkerWorkflowVersioningTest < Test def set_current_deployment_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version) -> untyped def set_ramping_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version, Float rate) -> untyped def wait_for_workflow_running_on_version: (untyped handle, String expected_build_id) -> void - def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id) -> void + def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id, ?String expected_ramping_build_id) -> void end diff --git a/temporalio/test/test.rb b/temporalio/test/test.rb index 59a7ed8c..8ad2c8d5 100644 --- a/temporalio/test/test.rb +++ b/temporalio/test/test.rb @@ -147,7 +147,7 @@ def initialize if target_host.empty? @server = Temporalio::Testing::WorkflowEnvironment.start_local( logger: Logger.new($stdout), - dev_server_download_version: 'v1.6.2-server-1.31.0-151.6', + dev_server_download_version: 'v1.7.0', dev_server_extra_args: [ # Allow continue as new to be immediate '--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"', diff --git a/temporalio/test/worker_workflow_versioning_test.rb b/temporalio/test/worker_workflow_versioning_test.rb index 0215f844..32e07a1b 100644 --- a/temporalio/test/worker_workflow_versioning_test.rb +++ b/temporalio/test/worker_workflow_versioning_test.rb @@ -650,6 +650,39 @@ def execute(_attempt) end end + class CanRampingVersionWorkflowV1 < Temporalio::Workflow::Definition + workflow_name :ContinueAsNewWithRampingVersion + workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED + + def initialize + @continue_as_new = false + end + + def execute(attempt) + return 'v1.0' if attempt.positive? + + Temporalio::Workflow.wait_condition { @continue_as_new } + raise Temporalio::Workflow::ContinueAsNewError.new( + attempt + 1, + initial_versioning_behavior: Temporalio::ContinueAsNewVersioningBehavior::USE_RAMPING_VERSION + ) + end + + workflow_signal + def do_continue_as_new + @continue_as_new = true + end + end + + class CanRampingVersionWorkflowV2 < Temporalio::Workflow::Definition + workflow_name :ContinueAsNewWithRampingVersion + workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED + + def execute(_attempt) + 'v2.0' + end + end + def test_continue_as_new_with_version_upgrade deployment_name = "deployment-can-upgrade-#{SecureRandom.uuid}" worker_v1 = Temporalio::WorkerDeploymentVersion.new( @@ -715,6 +748,64 @@ def test_continue_as_new_with_version_upgrade end end + def test_continue_as_new_with_ramping_version + deployment_name = "deployment-can-ramping-#{SecureRandom.uuid}" + worker_v1 = Temporalio::WorkerDeploymentVersion.new( + deployment_name: deployment_name, build_id: '1.0' + ) + worker_v2 = Temporalio::WorkerDeploymentVersion.new( + deployment_name: deployment_name, build_id: '2.0' + ) + + task_queue = "tq-#{SecureRandom.uuid}" + + worker1 = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + workflows: [CanRampingVersionWorkflowV1], + deployment_options: Temporalio::Worker::DeploymentOptions.new( + version: worker_v1, + use_worker_versioning: true + ) + ) + + worker2 = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + workflows: [CanRampingVersionWorkflowV2], + deployment_options: Temporalio::Worker::DeploymentOptions.new( + version: worker_v2, + use_worker_versioning: true + ) + ) + + Temporalio::Worker.run_all(worker1, worker2) do + describe_resp = wait_until_worker_deployment_visible(env.client, worker_v1) + current_resp = set_current_deployment_version(env.client, describe_resp.conflict_token, worker_v1) + wait_for_worker_deployment_routing_config_propagation(env.client, deployment_name, worker_v1.build_id) + + handle = env.client.start_workflow( + 'ContinueAsNewWithRampingVersion', + 0, + id: "test-can-ramping-version-#{SecureRandom.uuid}", + task_queue: task_queue + ) + wait_for_workflow_running_on_version(handle, worker_v1.build_id) + + wait_until_worker_deployment_visible(env.client, worker_v2) + set_ramping_version(env.client, current_resp.conflict_token, worker_v2, 0.0) + wait_for_worker_deployment_routing_config_propagation( + env.client, + deployment_name, + worker_v1.build_id, + worker_v2.build_id + ) + + handle.signal(CanRampingVersionWorkflowV1.do_continue_as_new) + assert_equal 'v2.0', handle.result + end + end + def wait_for_workflow_running_on_version(handle, expected_build_id) assert_eventually do desc = handle.describe