Getting started with the Dapr Workflow Python SDK
Let’s create a Dapr workflow and invoke it using the console. With the provided workflow example, you will:
- Run a Python console application that demonstrates workflow orchestration with activities, child workflows, and external events
- Learn how to handle retries, timeouts, and workflow state management
- Use the Python workflow SDK to start, pause, resume, and purge workflow instances
This example uses the default configuration from dapr init
in self-hosted mode.
In the Python example project, the simple.py
file contains the setup of the app, including:
- The workflow definition
- The workflow activity definitions
- The registration of the workflow and workflow activities
Prerequisites
- Dapr CLI installed
- Initialized Dapr environment
- Python 3.9+ installed
- Dapr Python package and the workflow extension installed
- Verify you’re using the latest proto bindings
Set up the environment
Start by cloning the [Python SDK repo].
git clone https://github.com/dapr/python-sdk.git
From the Python SDK root directory, navigate to the Dapr Workflow example.
cd examples/workflow
Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK.
pip3 install -r workflow/requirements.txt
Run the application locally
To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run:
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py
Note: Since Python3.exe is not defined in Windows, you may need to use
python simple.py
instead ofpython3 simple.py
.
Expected output
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: "Completed"
What happened?
When you run the application, several key workflow features are shown:
Workflow and Activity Registration: The application uses Python decorators to automatically register workflows and activities with the runtime. This decorator-based approach provides a clean, declarative way to define your workflow components:
@wfr.workflow(name='hello_world_wf') def hello_world_wf(ctx: DaprWorkflowContext, wf_input): # Workflow definition... @wfr.activity(name='hello_act') def hello_act(ctx: WorkflowActivityContext, wf_input): # Activity definition...
Runtime Setup: The application initializes the workflow runtime and client:
wfr = WorkflowRuntime() wfr.start() wf_client = DaprWorkflowClient()
Activity Execution: The workflow executes a series of activities that increment a counter:
@wfr.workflow(name='hello_world_wf') def hello_world_wf(ctx: DaprWorkflowContext, wf_input): yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10)
Retry Logic: The workflow demonstrates error handling with a retry policy:
retry_policy = RetryPolicy( first_retry_interval=timedelta(seconds=1), max_number_of_attempts=3, backoff_coefficient=2, max_retry_interval=timedelta(seconds=10), retry_timeout=timedelta(seconds=100), ) yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
Child Workflow: A child workflow is executed with its own retry policy:
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
External Event Handling: The workflow waits for an external event with a timeout:
event = ctx.wait_for_external_event(event_name) timeout = ctx.create_timer(timedelta(seconds=30)) winner = yield when_any([event, timeout])
Workflow Lifecycle Management: The example demonstrates how to pause and resume the workflow:
wf_client.pause_workflow(instance_id=instance_id) metadata = wf_client.get_workflow_state(instance_id=instance_id) # ... check status ... wf_client.resume_workflow(instance_id=instance_id)
Event Raising: After resuming, the workflow raises an event:
wf_client.raise_workflow_event( instance_id=instance_id, event_name=event_name, data=event_data )
Completion and Cleanup: Finally, the workflow waits for completion and cleans up:
state = wf_client.wait_for_workflow_completion( instance_id, timeout_in_seconds=30 ) wf_client.purge_workflow(instance_id=instance_id)
Next steps
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.