##
## Copyright (c) 2023 Chakib Ben Ziane <contact@blob42.xyz>. All rights reserved.
##
## SPDX-License-Identifier: AGPL-3.0-or-later
##
## This file is part of Instrukt.
##
## This program is free software: you can redistribute it and/or modify it under
## the terms of the GNU Affero General Public License as published by the Free
## Software Foundation, either version 3 of the License, or (at your option) any
## later version.
##
## This program is distributed in the hope that it will be useful, but WITHOUT
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
## FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
## details.
##
## You should have received a copy of the GNU Affero General Public License along
## with this program. If not, see <http://www.gnu.org/licenses/>.
##
"""langchain callback handler """
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, Sequence
from uuid import UUID
import logging
from langchain.callbacks.base import AsyncCallbackHandler, RetrieverManagerMixin
from pydantic import BaseModel
from ..context import Context
from ..messages.agents import AgentMessage
from ..utils.debug import notify
from .events import AgentEvents
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from langchain.schema import AgentAction, AgentFinish, LLMResult, Document
#REFACT:
#TODO!: use contextvar
[docs]class InstruktCallbackHandler(AsyncCallbackHandler, RetrieverManagerMixin, BaseModel):
ctx: Context
[docs] class Config:
arbitrary_types_allowed = True
def _set_agent_state(self, state: str) -> None:
assert self.ctx.app is not None, "app context error"
if self.ctx.app.active_agent is None:
raise ValueError("Agent is not loaded")
self.ctx.app.active_agent.state.update_state(state)
[docs] async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> Any:
"""Run when LLM starts running."""
notify("llm_start")
self._set_agent_state("llm_start")
# msg = AgentMessage(event=AgentEvents.LLMStart)
[docs] async def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
"""Run on new LLM token. Only available when streaming is enabled."""
notify("llm new token")
self._set_agent_state("llm_new_token")
[docs] async def on_llm_end(self, response: 'LLMResult', **kwargs: Any) -> Any:
"""Run when LLM ends running."""
notify("llm end instrukt")
self._set_agent_state("llm_end")
[docs] async def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when LLM errors."""
self._set_agent_state("llm_error")
notify("llm error")
[docs] async def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> Any:
"""Run when chain starts running."""
self._set_agent_state("chain_start")
notify("chain start")
# notify(f"entering {class_name}")
msg = AgentMessage(event=AgentEvents.ChainStart, data=serialized)
assert self.ctx.app is not None
self.ctx.app.post_message(msg)
[docs] async def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
"""Run when chain ends running."""
self._set_agent_state("chain_end")
notify("chain end")
msg = AgentMessage(event=AgentEvents.ChainEnd, data=outputs)
self.ctx.app.post_message(msg)
[docs] async def on_chain_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when chain errors."""
self._set_agent_state("chain_error")
notify("chain error")
msg = AgentMessage(event=AgentEvents.ChainError, data=error)
self.ctx.app.post_message(msg)
#NOTE: logs the output of the tool without the actual tool name
[docs] async def on_text(self, text: str, **kwargs: Any) -> Any:
"""Run on arbitrary text."""
notify("lc_ontext")
msg = AgentMessage(event=AgentEvents.Text, data=text)
self.ctx.app.post_message(msg)
[docs] async def on_agent_action(self, action: 'AgentAction', **kwargs: Any) -> Any:
"""Run on agent action."""
self._set_agent_state("agent_action")
notify(f"agent action: {action.tool}")
msg = AgentMessage(event=AgentEvents.AgentAction, data=action)
self.ctx.app.post_message(msg)
[docs] async def on_agent_finish(self, finish: 'AgentFinish', **kwargs: Any) -> Any:
"""Run on agent end."""
self._set_agent_state("agent_finish")
notify("agent finish")
msg = AgentMessage(event=AgentEvents.AgentFinish, data=finish)
self.ctx.app.post_message(msg)
#TODO:
[docs] async def on_retriever_end(
self,
documents: Sequence["Document"],
*,
run_id: UUID,
parent_run_id: UUID | None = None,
**kwargs: Any
) -> Any:
# log.debug("on_retriever_end")
pass