Skip to content

LLMAgent

Agent Module.

LLMAgent

A simple LLM Agent Class.

Attributes:

Name Type Description
llm LLM

The backbone LLM.

tools_registry dict[str, Tool]

The tools the LLM agent can equip the LLM with, represented as a dict.

templates LLMAgentTemplates

Prompt templates for LLM Agent.

logger Logger

LLMAgent logger.

Source code in src/llm_agents_from_scratch/agent/llm_agent.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
class LLMAgent:
    """A simple LLM Agent Class.

    Attributes:
        llm (LLM): The backbone LLM.
        tools_registry (dict[str, Tool]): The tools the LLM agent can equip
            the LLM with, represented as a dict.
        templates (LLMAgentTemplates): Prompt templates for LLM Agent.
        logger (logging.Logger): LLMAgent logger.
    """

    def __init__(
        self,
        llm: LLM,
        tools: list[Tool] | None = None,
        templates: LLMAgentTemplates = default_templates,
    ):
        """Initialize an LLMAgent.

        Args:
            llm (LLM): The backbone LLM of the LLM agent.
            tools (list[Tool], optional): The set of tools with which the
                LLM can be equipped. Defaults to None.
            templates (LLMAgentTemplates): Prompt templates for LLM Agent.
        """
        self.llm = llm
        tools = tools or []
        # validate no duplications in tool names
        if len({t.name for t in tools}) < len(tools):
            raise LLMAgentError(
                "Provided tool list contains duplicate tool names.",
            )
        self.tools_registry = {t.name: t for t in tools}
        self.templates = templates
        self.logger = get_logger(self.__class__.__name__)

    @property
    def tools(self) -> list[Tool]:
        """Return tools as list."""
        return list(self.tools_registry.values())

    def add_tool(self, tool: Tool) -> Self:
        """Add a tool to the agents tool set.

        NOTE: Supports fluent style for convenience.

        Args:
            tool (Tool): The tool to equip the LLM agent.

        """
        if tool.name in self.tools_registry:
            raise LLMAgentError(f"Tool with name {tool.name} already exists.")
        self.tools_registry[tool.name] = tool
        return self

    class TaskHandler(asyncio.Future):
        """Handler for processing tasks.

        Attributes:
            llm_agent (LLMAgent): The LLM agent.
            task: The task to execute.
            rollout: The execution log of the task.
            step_counter: The number of TaskSteps executed.
            logger: TaskHandler logger.
        """

        def __init__(
            self,
            llm_agent: "LLMAgent",
            task: Task,
            *args: Any,
            **kwargs: Any,
        ) -> None:
            """Initialize a TaskHandler.

            Args:
                llm_agent (LLMAgent): The LLM agent.
                task (Task): The task to process.
                *args: Additional positional arguments.
                **kwargs: Additional keyword arguments.
            """
            super().__init__(*args, **kwargs)
            self.llm_agent = llm_agent
            self.task = task
            self.rollout = ""
            self.step_counter = 0
            self._background_task: asyncio.Task | None = None
            self.logger = get_logger(self.__class__.__name__)

        @property
        def background_task(self) -> asyncio.Task:
            """Get the background ~asyncio.Task for the handler."""
            if not self._background_task:
                raise TaskHandlerError(
                    "No background task is running for this handler.",
                )
            return self._background_task

        @background_task.setter
        def background_task(self, asyncio_task: asyncio.Task) -> None:
            """Setter for background_task."""
            if self._background_task is not None:
                raise TaskHandlerError(
                    "A background task has already been set.",
                )
            self._background_task = asyncio_task

        def _rollout_contribution_from_single_run_step(
            self,
            chat_history: list[ChatMessage],
        ) -> str:
            """Update rollout after a run_step execution."""
            rollout_contributions = ["=== Task Step Start ==="]
            for msg in chat_history:
                # don't include system messages in rollout
                content = msg.content
                role = msg.role

                if role == "system":
                    continue

                if role == "user":
                    # From the LLMAgent to the backbone LLM, but in a rollout
                    # we'll simplify to just LLM agent having a monologue
                    role = ChatRole.ASSISTANT
                    content = self.llm_agent.templates[
                        "rollout_contribution_content_instruction"
                    ].format(
                        instruction=content,
                    )

                if msg.tool_calls and msg.role == "assistant":
                    called_tools = "\n\n".join(
                        [
                            f"{t.model_dump_json(indent=4)}"
                            for t in msg.tool_calls
                        ],
                    )
                    content = self.llm_agent.templates[
                        "rollout_contribution_content_tool_call_request"
                    ].format(
                        called_tools=called_tools,
                    )

                rollout_contributions.append(
                    self.llm_agent.templates[
                        "rollout_contribution_from_chat_message"
                    ].format(
                        actor=("🔧 " if role == ChatRole.TOOL else "💬 ")
                        + role.value,
                        content=content,
                    ),
                )

            rollout_contributions.append(
                "=== Task Step End ===",
            )

            return "\n\n".join(rollout_contributions)

        async def get_next_step(
            self,
            previous_step_result: TaskStepResult | None,
        ) -> TaskStep | TaskResult:
            """Based on previous step result, get next step or conclude task.

            Returns:
                TaskStep | TaskResult: Either the next step or the result of the
                    task.
            """
            if not previous_step_result:
                return TaskStep(
                    task_id=self.task.id_,
                    instruction=self.task.instruction,
                )
            self.logger.debug(f"🧵 Rollout: {self.rollout}")

            prompt = self.llm_agent.templates["get_next_step"].format(
                instruction=self.task.instruction,
                current_rollout=self.rollout,
                current_response=previous_step_result.content,
            )
            self.logger.debug(f"---NEXT STEP PROMPT: {prompt}")
            try:
                next_step = await self.llm_agent.llm.structured_output(
                    prompt=prompt,
                    mdl=NextStepDecision,
                )
                self.logger.debug(
                    f"---NEXT STEP: {next_step.model_dump_json()}",
                )
            except Exception as e:
                raise TaskHandlerError(
                    f"Failed to get next step: {str(e)}",
                ) from e

            if next_step.kind == "final_result":
                self.logger.info("No new step required.")
                retval = TaskResult(
                    task_id=self.task.id_,
                    content=previous_step_result.content,
                )
            else:  # next_step.kind == "next_step":
                self.logger.info(f"🧠 New Step: {next_step.content}")
                retval = TaskStep(
                    task_id=self.task.id_,
                    instruction=next_step.content,
                )

            return retval

        async def run_step(self, step: TaskStep) -> TaskStepResult:
            """Run next step of a given task.

            A single step is executed through a single-turn conversation that
            the LLM agent has with itself. In other words, it is both the `user`
            providing the instruction (from `get_next_step`) as well as the
            `assistant` that provides the result.

            Args:
                step (TaskStep): The step to execute.

            Returns:
                TaskStepResult: The result of the step execution.
            """
            self.step_counter += 1
            self.logger.info(f"⚙️ Processing Step: {step.instruction}")
            self.logger.debug(f"🧵 Rollout: {self.rollout}")

            # include rollout as context in the system message
            system_message = ChatMessage(
                role=ChatRole.SYSTEM,
                content=self.llm_agent.templates[
                    "run_step_system_message"
                ].format(
                    llm_agent_system_message=self.llm_agent.templates[
                        "system_message"
                    ],
                    current_rollout=self.rollout,
                )
                if self.rollout
                else self.llm_agent.templates[
                    "run_step_system_message_without_rollout"
                ].format(
                    llm_agent_system_message=self.llm_agent.templates[
                        "system_message"
                    ],
                ),
            )
            self.logger.debug(f"💬 SYSTEM: {system_message.content}")

            # fictitious user's input
            user_input = self.llm_agent.templates[
                "run_step_user_message"
            ].format(
                instruction=step.instruction,
            )
            self.logger.debug(f"💬 USER INPUT: {user_input}")

            # start single-turn conversation
            user_message, response_message = await self.llm_agent.llm.chat(
                input=user_input,
                chat_history=[system_message],
                tools=self.llm_agent.tools,
            )
            self.logger.debug(f"💬 ASSISTANT: {response_message.content}")

            # check if there are tool calls
            if response_message.tool_calls:
                tool_call_results = []
                for tool_call in response_message.tool_calls:
                    self.logger.info(
                        f"🛠️ Executing Tool Call: {tool_call.tool_name}",
                    )
                    if tool := self.llm_agent.tools_registry.get(
                        tool_call.tool_name,
                    ):
                        if isinstance(tool, AsyncBaseTool):
                            tool_call_result = await tool(tool_call=tool_call)
                        else:
                            tool_call_result = tool(tool_call=tool_call)
                        msg = (
                            "✅ Successful Tool Call: "
                            f"{tool_call_result.content}"
                        )
                        self.logger.info(msg)
                    else:
                        error_msg = (
                            f"Tool with name {tool_call.tool_name} "
                            "doesn't exist."
                        )
                        tool_call_result = ToolCallResult(
                            tool_call_id=tool_call.id_,
                            error=True,
                            content=error_msg,
                        )
                        self.logger.info(
                            f"❌ Tool Call Failure: {tool_call_result.content}",
                        )
                    tool_call_results.append(tool_call_result)

                # send tool call results back to llm to get result
                (
                    tool_messages,
                    another_response_message,
                ) = await self.llm_agent.llm.continue_chat_with_tool_results(  # noqa: E501
                    tool_call_results=tool_call_results,
                    chat_history=[
                        system_message,
                        user_message,
                        response_message,
                    ],
                )

                # get final content and update chat history
                if another_response_message.tool_calls:
                    # if has tool calls, we'll make them in the next step
                    final_content = "I need to make the following tool-calls:\n"
                    final_content += "\n".join(
                        t.model_dump_json(indent=4)
                        for t in another_response_message.tool_calls
                    )
                else:
                    final_content = another_response_message.content
                chat_history = (
                    [
                        system_message,
                        user_message,
                        response_message,
                    ]
                    + tool_messages
                    + [another_response_message]
                )
            else:
                final_content = response_message.content
                chat_history = [
                    system_message,
                    user_message,
                    response_message,
                ]

            # augment rollout from this turn
            rollout_contribution = (
                self._rollout_contribution_from_single_run_step(
                    chat_history=chat_history,
                )
            )
            if self.rollout:
                self.rollout += "\n\n" + rollout_contribution

            else:
                self.rollout = rollout_contribution

            self.logger.info(
                f"✅ Step Result: {final_content}",
            )
            return TaskStepResult(
                task_step_id=step.id_,
                content=final_content,
            )

    def run(
        self,
        task: Task,
        max_steps: int | None = None,
    ) -> TaskHandler:
        """Agent's processing loop for executing tasks.

        Args:
            task (Task): the Task to perform.
            max_steps (int | None): Maximum number of steps to run for task.
                Defaults to None.

        Returns:
            TaskHandler: the TaskHandler object responsible for task execution.
        """
        task_handler = self.TaskHandler(
            llm_agent=self,
            task=task,
        )

        async def _process_loop() -> None:
            """The processing loop for the task handler execute its task.

            Cycle between get_next_step and run_step, until the task_handler
            is marked as done, either through a set result or an exception being
            set.
            """
            self.logger.info(f"🚀 Starting task: {task.instruction}")
            step_result = None
            while not task_handler.done():
                try:
                    if task_handler.step_counter == max_steps:
                        raise MaxStepsReachedError("Max steps reached.")

                    next_step = await task_handler.get_next_step(step_result)

                    match next_step:
                        case TaskStep():
                            step_result = await task_handler.run_step(
                                next_step,
                            )
                        case TaskResult():
                            task_handler.set_result(next_step)
                            self.logger.info(
                                f"🏁 Task completed: {next_step.content}",
                            )

                except Exception as e:
                    task_handler.set_exception(e)

        task_handler.background_task = asyncio.create_task(_process_loop())

        return task_handler

tools property

tools

Return tools as list.

TaskHandler

Bases: Future

Handler for processing tasks.

Attributes:

Name Type Description
llm_agent LLMAgent

The LLM agent.

task

The task to execute.

rollout

The execution log of the task.

step_counter

The number of TaskSteps executed.

logger

TaskHandler logger.

Source code in src/llm_agents_from_scratch/agent/llm_agent.py
class TaskHandler(asyncio.Future):
    """Handler for processing tasks.

    Attributes:
        llm_agent (LLMAgent): The LLM agent.
        task: The task to execute.
        rollout: The execution log of the task.
        step_counter: The number of TaskSteps executed.
        logger: TaskHandler logger.
    """

    def __init__(
        self,
        llm_agent: "LLMAgent",
        task: Task,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize a TaskHandler.

        Args:
            llm_agent (LLMAgent): The LLM agent.
            task (Task): The task to process.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(*args, **kwargs)
        self.llm_agent = llm_agent
        self.task = task
        self.rollout = ""
        self.step_counter = 0
        self._background_task: asyncio.Task | None = None
        self.logger = get_logger(self.__class__.__name__)

    @property
    def background_task(self) -> asyncio.Task:
        """Get the background ~asyncio.Task for the handler."""
        if not self._background_task:
            raise TaskHandlerError(
                "No background task is running for this handler.",
            )
        return self._background_task

    @background_task.setter
    def background_task(self, asyncio_task: asyncio.Task) -> None:
        """Setter for background_task."""
        if self._background_task is not None:
            raise TaskHandlerError(
                "A background task has already been set.",
            )
        self._background_task = asyncio_task

    def _rollout_contribution_from_single_run_step(
        self,
        chat_history: list[ChatMessage],
    ) -> str:
        """Update rollout after a run_step execution."""
        rollout_contributions = ["=== Task Step Start ==="]
        for msg in chat_history:
            # don't include system messages in rollout
            content = msg.content
            role = msg.role

            if role == "system":
                continue

            if role == "user":
                # From the LLMAgent to the backbone LLM, but in a rollout
                # we'll simplify to just LLM agent having a monologue
                role = ChatRole.ASSISTANT
                content = self.llm_agent.templates[
                    "rollout_contribution_content_instruction"
                ].format(
                    instruction=content,
                )

            if msg.tool_calls and msg.role == "assistant":
                called_tools = "\n\n".join(
                    [
                        f"{t.model_dump_json(indent=4)}"
                        for t in msg.tool_calls
                    ],
                )
                content = self.llm_agent.templates[
                    "rollout_contribution_content_tool_call_request"
                ].format(
                    called_tools=called_tools,
                )

            rollout_contributions.append(
                self.llm_agent.templates[
                    "rollout_contribution_from_chat_message"
                ].format(
                    actor=("🔧 " if role == ChatRole.TOOL else "💬 ")
                    + role.value,
                    content=content,
                ),
            )

        rollout_contributions.append(
            "=== Task Step End ===",
        )

        return "\n\n".join(rollout_contributions)

    async def get_next_step(
        self,
        previous_step_result: TaskStepResult | None,
    ) -> TaskStep | TaskResult:
        """Based on previous step result, get next step or conclude task.

        Returns:
            TaskStep | TaskResult: Either the next step or the result of the
                task.
        """
        if not previous_step_result:
            return TaskStep(
                task_id=self.task.id_,
                instruction=self.task.instruction,
            )
        self.logger.debug(f"🧵 Rollout: {self.rollout}")

        prompt = self.llm_agent.templates["get_next_step"].format(
            instruction=self.task.instruction,
            current_rollout=self.rollout,
            current_response=previous_step_result.content,
        )
        self.logger.debug(f"---NEXT STEP PROMPT: {prompt}")
        try:
            next_step = await self.llm_agent.llm.structured_output(
                prompt=prompt,
                mdl=NextStepDecision,
            )
            self.logger.debug(
                f"---NEXT STEP: {next_step.model_dump_json()}",
            )
        except Exception as e:
            raise TaskHandlerError(
                f"Failed to get next step: {str(e)}",
            ) from e

        if next_step.kind == "final_result":
            self.logger.info("No new step required.")
            retval = TaskResult(
                task_id=self.task.id_,
                content=previous_step_result.content,
            )
        else:  # next_step.kind == "next_step":
            self.logger.info(f"🧠 New Step: {next_step.content}")
            retval = TaskStep(
                task_id=self.task.id_,
                instruction=next_step.content,
            )

        return retval

    async def run_step(self, step: TaskStep) -> TaskStepResult:
        """Run next step of a given task.

        A single step is executed through a single-turn conversation that
        the LLM agent has with itself. In other words, it is both the `user`
        providing the instruction (from `get_next_step`) as well as the
        `assistant` that provides the result.

        Args:
            step (TaskStep): The step to execute.

        Returns:
            TaskStepResult: The result of the step execution.
        """
        self.step_counter += 1
        self.logger.info(f"⚙️ Processing Step: {step.instruction}")
        self.logger.debug(f"🧵 Rollout: {self.rollout}")

        # include rollout as context in the system message
        system_message = ChatMessage(
            role=ChatRole.SYSTEM,
            content=self.llm_agent.templates[
                "run_step_system_message"
            ].format(
                llm_agent_system_message=self.llm_agent.templates[
                    "system_message"
                ],
                current_rollout=self.rollout,
            )
            if self.rollout
            else self.llm_agent.templates[
                "run_step_system_message_without_rollout"
            ].format(
                llm_agent_system_message=self.llm_agent.templates[
                    "system_message"
                ],
            ),
        )
        self.logger.debug(f"💬 SYSTEM: {system_message.content}")

        # fictitious user's input
        user_input = self.llm_agent.templates[
            "run_step_user_message"
        ].format(
            instruction=step.instruction,
        )
        self.logger.debug(f"💬 USER INPUT: {user_input}")

        # start single-turn conversation
        user_message, response_message = await self.llm_agent.llm.chat(
            input=user_input,
            chat_history=[system_message],
            tools=self.llm_agent.tools,
        )
        self.logger.debug(f"💬 ASSISTANT: {response_message.content}")

        # check if there are tool calls
        if response_message.tool_calls:
            tool_call_results = []
            for tool_call in response_message.tool_calls:
                self.logger.info(
                    f"🛠️ Executing Tool Call: {tool_call.tool_name}",
                )
                if tool := self.llm_agent.tools_registry.get(
                    tool_call.tool_name,
                ):
                    if isinstance(tool, AsyncBaseTool):
                        tool_call_result = await tool(tool_call=tool_call)
                    else:
                        tool_call_result = tool(tool_call=tool_call)
                    msg = (
                        "✅ Successful Tool Call: "
                        f"{tool_call_result.content}"
                    )
                    self.logger.info(msg)
                else:
                    error_msg = (
                        f"Tool with name {tool_call.tool_name} "
                        "doesn't exist."
                    )
                    tool_call_result = ToolCallResult(
                        tool_call_id=tool_call.id_,
                        error=True,
                        content=error_msg,
                    )
                    self.logger.info(
                        f"❌ Tool Call Failure: {tool_call_result.content}",
                    )
                tool_call_results.append(tool_call_result)

            # send tool call results back to llm to get result
            (
                tool_messages,
                another_response_message,
            ) = await self.llm_agent.llm.continue_chat_with_tool_results(  # noqa: E501
                tool_call_results=tool_call_results,
                chat_history=[
                    system_message,
                    user_message,
                    response_message,
                ],
            )

            # get final content and update chat history
            if another_response_message.tool_calls:
                # if has tool calls, we'll make them in the next step
                final_content = "I need to make the following tool-calls:\n"
                final_content += "\n".join(
                    t.model_dump_json(indent=4)
                    for t in another_response_message.tool_calls
                )
            else:
                final_content = another_response_message.content
            chat_history = (
                [
                    system_message,
                    user_message,
                    response_message,
                ]
                + tool_messages
                + [another_response_message]
            )
        else:
            final_content = response_message.content
            chat_history = [
                system_message,
                user_message,
                response_message,
            ]

        # augment rollout from this turn
        rollout_contribution = (
            self._rollout_contribution_from_single_run_step(
                chat_history=chat_history,
            )
        )
        if self.rollout:
            self.rollout += "\n\n" + rollout_contribution

        else:
            self.rollout = rollout_contribution

        self.logger.info(
            f"✅ Step Result: {final_content}",
        )
        return TaskStepResult(
            task_step_id=step.id_,
            content=final_content,
        )

background_task property writable

background_task

Get the background ~asyncio.Task for the handler.

__init__

__init__(llm_agent, task, *args, **kwargs)

Initialize a TaskHandler.

Parameters:

Name Type Description Default
llm_agent LLMAgent

The LLM agent.

required
task Task

The task to process.

required
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in src/llm_agents_from_scratch/agent/llm_agent.py
def __init__(
    self,
    llm_agent: "LLMAgent",
    task: Task,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize a TaskHandler.

    Args:
        llm_agent (LLMAgent): The LLM agent.
        task (Task): The task to process.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.llm_agent = llm_agent
    self.task = task
    self.rollout = ""
    self.step_counter = 0
    self._background_task: asyncio.Task | None = None
    self.logger = get_logger(self.__class__.__name__)

get_next_step async

get_next_step(previous_step_result)

Based on previous step result, get next step or conclude task.

Returns:

Type Description
TaskStep | TaskResult

TaskStep | TaskResult: Either the next step or the result of the task.

Source code in src/llm_agents_from_scratch/agent/llm_agent.py
async def get_next_step(
    self,
    previous_step_result: TaskStepResult | None,
) -> TaskStep | TaskResult:
    """Based on previous step result, get next step or conclude task.

    Returns:
        TaskStep | TaskResult: Either the next step or the result of the
            task.
    """
    if not previous_step_result:
        return TaskStep(
            task_id=self.task.id_,
            instruction=self.task.instruction,
        )
    self.logger.debug(f"🧵 Rollout: {self.rollout}")

    prompt = self.llm_agent.templates["get_next_step"].format(
        instruction=self.task.instruction,
        current_rollout=self.rollout,
        current_response=previous_step_result.content,
    )
    self.logger.debug(f"---NEXT STEP PROMPT: {prompt}")
    try:
        next_step = await self.llm_agent.llm.structured_output(
            prompt=prompt,
            mdl=NextStepDecision,
        )
        self.logger.debug(
            f"---NEXT STEP: {next_step.model_dump_json()}",
        )
    except Exception as e:
        raise TaskHandlerError(
            f"Failed to get next step: {str(e)}",
        ) from e

    if next_step.kind == "final_result":
        self.logger.info("No new step required.")
        retval = TaskResult(
            task_id=self.task.id_,
            content=previous_step_result.content,
        )
    else:  # next_step.kind == "next_step":
        self.logger.info(f"🧠 New Step: {next_step.content}")
        retval = TaskStep(
            task_id=self.task.id_,
            instruction=next_step.content,
        )

    return retval

run_step async

run_step(step)

Run next step of a given task.

A single step is executed through a single-turn conversation that the LLM agent has with itself. In other words, it is both the user providing the instruction (from get_next_step) as well as the assistant that provides the result.

Parameters:

Name Type Description Default
step TaskStep

The step to execute.

required

Returns:

Name Type Description
TaskStepResult TaskStepResult

The result of the step execution.

Source code in src/llm_agents_from_scratch/agent/llm_agent.py
async def run_step(self, step: TaskStep) -> TaskStepResult:
    """Run next step of a given task.

    A single step is executed through a single-turn conversation that
    the LLM agent has with itself. In other words, it is both the `user`
    providing the instruction (from `get_next_step`) as well as the
    `assistant` that provides the result.

    Args:
        step (TaskStep): The step to execute.

    Returns:
        TaskStepResult: The result of the step execution.
    """
    self.step_counter += 1
    self.logger.info(f"⚙️ Processing Step: {step.instruction}")
    self.logger.debug(f"🧵 Rollout: {self.rollout}")

    # include rollout as context in the system message
    system_message = ChatMessage(
        role=ChatRole.SYSTEM,
        content=self.llm_agent.templates[
            "run_step_system_message"
        ].format(
            llm_agent_system_message=self.llm_agent.templates[
                "system_message"
            ],
            current_rollout=self.rollout,
        )
        if self.rollout
        else self.llm_agent.templates[
            "run_step_system_message_without_rollout"
        ].format(
            llm_agent_system_message=self.llm_agent.templates[
                "system_message"
            ],
        ),
    )
    self.logger.debug(f"💬 SYSTEM: {system_message.content}")

    # fictitious user's input
    user_input = self.llm_agent.templates[
        "run_step_user_message"
    ].format(
        instruction=step.instruction,
    )
    self.logger.debug(f"💬 USER INPUT: {user_input}")

    # start single-turn conversation
    user_message, response_message = await self.llm_agent.llm.chat(
        input=user_input,
        chat_history=[system_message],
        tools=self.llm_agent.tools,
    )
    self.logger.debug(f"💬 ASSISTANT: {response_message.content}")

    # check if there are tool calls
    if response_message.tool_calls:
        tool_call_results = []
        for tool_call in response_message.tool_calls:
            self.logger.info(
                f"🛠️ Executing Tool Call: {tool_call.tool_name}",
            )
            if tool := self.llm_agent.tools_registry.get(
                tool_call.tool_name,
            ):
                if isinstance(tool, AsyncBaseTool):
                    tool_call_result = await tool(tool_call=tool_call)
                else:
                    tool_call_result = tool(tool_call=tool_call)
                msg = (
                    "✅ Successful Tool Call: "
                    f"{tool_call_result.content}"
                )
                self.logger.info(msg)
            else:
                error_msg = (
                    f"Tool with name {tool_call.tool_name} "
                    "doesn't exist."
                )
                tool_call_result = ToolCallResult(
                    tool_call_id=tool_call.id_,
                    error=True,
                    content=error_msg,
                )
                self.logger.info(
                    f"❌ Tool Call Failure: {tool_call_result.content}",
                )
            tool_call_results.append(tool_call_result)

        # send tool call results back to llm to get result
        (
            tool_messages,
            another_response_message,
        ) = await self.llm_agent.llm.continue_chat_with_tool_results(  # noqa: E501
            tool_call_results=tool_call_results,
            chat_history=[
                system_message,
                user_message,
                response_message,
            ],
        )

        # get final content and update chat history
        if another_response_message.tool_calls:
            # if has tool calls, we'll make them in the next step
            final_content = "I need to make the following tool-calls:\n"
            final_content += "\n".join(
                t.model_dump_json(indent=4)
                for t in another_response_message.tool_calls
            )
        else:
            final_content = another_response_message.content
        chat_history = (
            [
                system_message,
                user_message,
                response_message,
            ]
            + tool_messages
            + [another_response_message]
        )
    else:
        final_content = response_message.content
        chat_history = [
            system_message,
            user_message,
            response_message,
        ]

    # augment rollout from this turn
    rollout_contribution = (
        self._rollout_contribution_from_single_run_step(
            chat_history=chat_history,
        )
    )
    if self.rollout:
        self.rollout += "\n\n" + rollout_contribution

    else:
        self.rollout = rollout_contribution

    self.logger.info(
        f"✅ Step Result: {final_content}",
    )
    return TaskStepResult(
        task_step_id=step.id_,
        content=final_content,
    )

__init__

__init__(llm, tools=None, templates=default_templates)

Initialize an LLMAgent.

Parameters:

Name Type Description Default
llm LLM

The backbone LLM of the LLM agent.

required
tools list[Tool]

The set of tools with which the LLM can be equipped. Defaults to None.

None
templates LLMAgentTemplates

Prompt templates for LLM Agent.

default_templates
Source code in src/llm_agents_from_scratch/agent/llm_agent.py
def __init__(
    self,
    llm: LLM,
    tools: list[Tool] | None = None,
    templates: LLMAgentTemplates = default_templates,
):
    """Initialize an LLMAgent.

    Args:
        llm (LLM): The backbone LLM of the LLM agent.
        tools (list[Tool], optional): The set of tools with which the
            LLM can be equipped. Defaults to None.
        templates (LLMAgentTemplates): Prompt templates for LLM Agent.
    """
    self.llm = llm
    tools = tools or []
    # validate no duplications in tool names
    if len({t.name for t in tools}) < len(tools):
        raise LLMAgentError(
            "Provided tool list contains duplicate tool names.",
        )
    self.tools_registry = {t.name: t for t in tools}
    self.templates = templates
    self.logger = get_logger(self.__class__.__name__)

add_tool

add_tool(tool)

Add a tool to the agents tool set.

NOTE: Supports fluent style for convenience.

Parameters:

Name Type Description Default
tool Tool

The tool to equip the LLM agent.

required
Source code in src/llm_agents_from_scratch/agent/llm_agent.py
def add_tool(self, tool: Tool) -> Self:
    """Add a tool to the agents tool set.

    NOTE: Supports fluent style for convenience.

    Args:
        tool (Tool): The tool to equip the LLM agent.

    """
    if tool.name in self.tools_registry:
        raise LLMAgentError(f"Tool with name {tool.name} already exists.")
    self.tools_registry[tool.name] = tool
    return self

run

run(task, max_steps=None)

Agent's processing loop for executing tasks.

Parameters:

Name Type Description Default
task Task

the Task to perform.

required
max_steps int | None

Maximum number of steps to run for task. Defaults to None.

None

Returns:

Name Type Description
TaskHandler TaskHandler

the TaskHandler object responsible for task execution.

Source code in src/llm_agents_from_scratch/agent/llm_agent.py
def run(
    self,
    task: Task,
    max_steps: int | None = None,
) -> TaskHandler:
    """Agent's processing loop for executing tasks.

    Args:
        task (Task): the Task to perform.
        max_steps (int | None): Maximum number of steps to run for task.
            Defaults to None.

    Returns:
        TaskHandler: the TaskHandler object responsible for task execution.
    """
    task_handler = self.TaskHandler(
        llm_agent=self,
        task=task,
    )

    async def _process_loop() -> None:
        """The processing loop for the task handler execute its task.

        Cycle between get_next_step and run_step, until the task_handler
        is marked as done, either through a set result or an exception being
        set.
        """
        self.logger.info(f"🚀 Starting task: {task.instruction}")
        step_result = None
        while not task_handler.done():
            try:
                if task_handler.step_counter == max_steps:
                    raise MaxStepsReachedError("Max steps reached.")

                next_step = await task_handler.get_next_step(step_result)

                match next_step:
                    case TaskStep():
                        step_result = await task_handler.run_step(
                            next_step,
                        )
                    case TaskResult():
                        task_handler.set_result(next_step)
                        self.logger.info(
                            f"🏁 Task completed: {next_step.content}",
                        )

            except Exception as e:
                task_handler.set_exception(e)

    task_handler.background_task = asyncio.create_task(_process_loop())

    return task_handler