Skip to main content

Hello World

This is a simple example showing how to call an LLM from Temporal using the OpenAI Python API library.

Being an external API call, the LLM invocation happens in a Temporal Activity.

This recipe highlights two key design decisions:

  • A generic activity for invoking an LLM API. This activity can be re-used with different arguments throughout your codebase.
  • Configuring the Temporal client with a dataconverter to allow serialization of Pydantic types.
  • Retries are handled by Temporal and not by the underlying libraries such as the OpenAI client. This is important because if you leave the client retires on they can interfere with correct and durable error handling and recovery.

Create the Activity

We create wrapper for the create method of the AsyncOpenAI client object. This is a generic activity that invokes the OpenAI LLM.

We set max_retries=0 on when creating the AsyncOpenAI client. This moves the responsibility for retries from the OpenAI client to Temporal.

In this implementation, we include only the instructions and input argument, but it could be extended to others.

File: activities/openai_responses.py


from temporalio import activity
from openai import AsyncOpenAI
from openai.types.responses import Response
from dataclasses import dataclass

# Temporal best practice: Create a data structure to hold the request parameters.
@dataclass
class OpenAIResponsesRequest:
model: str
instructions: str
input: str

@activity.defn
async def create(request: OpenAIResponsesRequest) -> Response:
# Temporal best practice: Disable retry logic in OpenAI API client library.
client = AsyncOpenAI(max_retries=0)

resp = await client.responses.create(
model=request.model,
instructions=request.instructions,
input=request.input,
timeout=15,
)

return resp

Create the Workflow

In this example, we take the user input and generate a response in haiku format, using the OpenAI Responses activity. The Workflow returns result.output_text from the OpenAI Response.

As per usual, the activity retry configuration is set here in the Workflow. In this case, a retry policy is not specified so the default retry policy is used (exponential backoff with 1s initial interval, 2.0 backoff coefficient, max interval 100× initial, unlimited attempts, no non-retryable errors).

File: workflows/hello_world_workflow.py

from temporalio import workflow
from datetime import timedelta

from activities import openai_responses


@workflow.defn
class HelloWorld:
@workflow.run
async def run(self, input: str) -> str:
system_instructions = "You only respond in haikus."
result = await workflow.execute_activity(
openai_responses.create,
openai_responses.OpenAIResponsesRequest(
model="gpt-4o-mini",
instructions=system_instructions,
input=input,
),
start_to_close_timeout=timedelta(seconds=30),
)
return result.output_text

Create the Worker

Create the process for executing Activities and Workflows. We configure the Temporal client with pydantic_data_converter so Temporal can serialize/deserialize output of the OpenAI SDK.

File: worker.py

import asyncio

from temporalio.client import Client
from temporalio.worker import Worker

from workflows.hello_world_workflow import HelloWorld
from activities import openai_responses
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)

worker = Worker(
client,
task_queue="hello-world-python-task-queue",
workflows=[
HelloWorld,
],
activities=[
openai_responses.create,
],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Create the Workflow Starter

The starter script submits the workflow to Temporal for execution, then waits for the result and prints it out. It uses the pydantic_data_converter to match the Worker configuration.

File: start_workflow.py

import asyncio

from temporalio.client import Client

from workflows.hello_world_workflow import HelloWorld
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)

# Submit the Hello World workflow for execution
result = await client.execute_workflow(
HelloWorld.run,
"Tell me about recursion in programming.",
id="my-workflow-id",
task_queue="hello-world-python-task-queue",
)
print(f"Result: {result}")


if __name__ == "__main__":
asyncio.run(main())

Running

Start the Temporal Dev Server:

temporal server start-dev

Run the worker:

uv run python -m worker

Start execution:

uv run python -m start_workflow