Hello World
This is a simple example showing how to call an LLM from Temporal using the OpenAI Python API library.
Being an external API call, the LLM invocation happens in a Temporal Activity.
This recipe highlights two key design decisions:
- A generic activity for invoking an LLM API. This activity can be re-used with different arguments throughout your codebase.
- Configuring the Temporal client with a
dataconverter
to allow serialization of Pydantic types. - Retries are handled by Temporal and not by the underlying libraries such as the OpenAI client. This is important because if you leave the client retires on they can interfere with correct and durable error handling and recovery.
Create the Activity
We create wrapper for the create
method of the AsyncOpenAI
client object.
This is a generic activity that invokes the OpenAI LLM.
We set max_retries=0
on when creating the AsyncOpenAI
client.
This moves the responsibility for retries from the OpenAI client to Temporal.
In this implementation, we include only the instructions
and input
argument, but it could be extended to others.
File: activities/openai_responses.py
from temporalio import activity
from openai import AsyncOpenAI
from openai.types.responses import Response
from dataclasses import dataclass
# Temporal best practice: Create a data structure to hold the request parameters.
@dataclass
class OpenAIResponsesRequest:
model: str
instructions: str
input: str
@activity.defn
async def create(request: OpenAIResponsesRequest) -> Response:
# Temporal best practice: Disable retry logic in OpenAI API client library.
client = AsyncOpenAI(max_retries=0)
resp = await client.responses.create(
model=request.model,
instructions=request.instructions,
input=request.input,
timeout=15,
)
return resp
Create the Workflow
In this example, we take the user input and generate a response in haiku format, using the OpenAI Responses activity. The
Workflow returns result.output_text
from the OpenAI Response
.
As per usual, the activity retry configuration is set here in the Workflow. In this case, a retry policy is not specified so the default retry policy is used (exponential backoff with 1s initial interval, 2.0 backoff coefficient, max interval 100× initial, unlimited attempts, no non-retryable errors).
File: workflows/hello_world_workflow.py
from temporalio import workflow
from datetime import timedelta
from activities import openai_responses
@workflow.defn
class HelloWorld:
@workflow.run
async def run(self, input: str) -> str:
system_instructions = "You only respond in haikus."
result = await workflow.execute_activity(
openai_responses.create,
openai_responses.OpenAIResponsesRequest(
model="gpt-4o-mini",
instructions=system_instructions,
input=input,
),
start_to_close_timeout=timedelta(seconds=30),
)
return result.output_text
Create the Worker
Create the process for executing Activities and Workflows.
We configure the Temporal client with pydantic_data_converter
so Temporal can serialize/deserialize output of the OpenAI SDK.
File: worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows.hello_world_workflow import HelloWorld
from activities import openai_responses
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="hello-world-python-task-queue",
workflows=[
HelloWorld,
],
activities=[
openai_responses.create,
],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Create the Workflow Starter
The starter script submits the workflow to Temporal for execution, then waits for the result and prints it out.
It uses the pydantic_data_converter
to match the Worker configuration.
File: start_workflow.py
import asyncio
from temporalio.client import Client
from workflows.hello_world_workflow import HelloWorld
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
# Submit the Hello World workflow for execution
result = await client.execute_workflow(
HelloWorld.run,
"Tell me about recursion in programming.",
id="my-workflow-id",
task_queue="hello-world-python-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Running
Start the Temporal Dev Server:
temporal server start-dev
Run the worker:
uv run python -m worker
Start execution:
uv run python -m start_workflow