monoai.agents

MonoAI Agents Package

This package provides a comprehensive set of agentic loop implementations that support real-time streaming responses during the entire generation phase. The agents are designed to work with various AI paradigms and can process user queries through iterative reasoning, tool usage, and structured output generation.

Key Features:

  • Real-time streaming of AI responses
  • Multiple agent paradigms (React, Function Calling, Plan-and-Execute, etc.)
  • Step-based reasoning format for transparent decision making
  • Tool integration and execution
  • Configurable iteration limits and debugging
  • Asynchronous streaming with proper resource management

Agent Paradigms: The agents use a structured step-based format for reasoning:

  • Thought N:
  • Action N: {"name": "tool_name", "arguments": {...}}
  • Observation N:
  • Final answer:

Streaming Architecture: Streaming allows real-time processing of AI responses as they are generated, enabling:

  • Immediate feedback to users
  • Progress monitoring during long operations
  • Better user experience with responsive interfaces
  • Debugging and monitoring of agent reasoning

Usage Examples:

  1. Basic React Agent with Streaming:

    from monoai.agents import Agent
    
    agent = Agent(model, paradigm="react")
    agent.enable_streaming()  # Uses default console output
    result = agent.run("What is the capital of France?")
    
  2. Custom Streaming Handler:

    from monoai.agents import Agent
    import json
    
    def custom_stream_handler(content):
        # Process streaming content in real-time
        print(f"Streaming: {content}", end='', flush=True)
    
    agent = Agent(model, paradigm="react")
    agent.enable_streaming(custom_stream_handler)
    result = agent.run("Your query here")
    
  3. Function Calling Agent:

    from monoai.agents import Agent
    from monoai.tools import search_web
    
    agent = Agent(model, paradigm="function_calling")
    agent.register_tools([search_web])
    agent.enable_streaming()
    result = agent.run("Search for recent AI news")
    
  4. Plan and Execute Agent:

    agent = Agent(model, paradigm="plan-and-execute")
    agent.enable_streaming()
    result = agent.run("Create a detailed project plan")
    

Available Agent Types:

  • FunctionCallingAgenticLoop: Native OpenAI function calling
  • ReactAgenticLoop: ReAct reasoning pattern
  • ReactWithFCAgenticLoop: Hybrid ReAct + Function Calling
  • ProgrammaticAgenticLoop: Code generation and execution
  • PlanAndExecuteAgenticLoop: Planning then execution pattern
  • ReflexionAgenticLoop: Self-reflection and improvement
  • SelfAskAgenticLoop: Self-questioning approach
  • SelfAskWithSearchLoop: Self-ask with web search capabilities

Streaming Callback Format: The streaming callback receives content as plain text strings. For advanced use cases, you can access the raw streaming data through the model's streaming methods.

Error Handling:

  • Automatic fallback to non-streaming mode on errors
  • Proper cleanup of async resources
  • Configurable debug output
  • Iteration limits to prevent infinite loops

Thread Safety: This implementation is designed for single-threaded use. For concurrent access, create separate agent instances for each thread or process.

  1"""
  2MonoAI Agents Package
  3
  4This package provides a comprehensive set of agentic loop implementations
  5that support real-time streaming responses during the entire generation phase.
  6The agents are designed to work with various AI paradigms and can process
  7user queries through iterative reasoning, tool usage, and structured output generation.
  8
  9Key Features:
 10- Real-time streaming of AI responses
 11- Multiple agent paradigms (React, Function Calling, Plan-and-Execute, etc.)
 12- Step-based reasoning format for transparent decision making
 13- Tool integration and execution
 14- Configurable iteration limits and debugging
 15- Asynchronous streaming with proper resource management
 16
 17Agent Paradigms:
 18The agents use a structured step-based format for reasoning:
 19- Thought N: <reasoning process and analysis>
 20- Action N: {"name": "tool_name", "arguments": {...}}
 21- Observation N: <tool execution result>
 22- Final answer: <conclusive response to user query>
 23
 24Streaming Architecture:
 25Streaming allows real-time processing of AI responses as they are generated, enabling:
 26- Immediate feedback to users
 27- Progress monitoring during long operations
 28- Better user experience with responsive interfaces
 29- Debugging and monitoring of agent reasoning
 30
 31Usage Examples:
 32
 331. Basic React Agent with Streaming:
 34    ```python
 35    from monoai.agents import Agent
 36    
 37    agent = Agent(model, paradigm="react")
 38    agent.enable_streaming()  # Uses default console output
 39    result = agent.run("What is the capital of France?")
 40    ```
 41
 422. Custom Streaming Handler:
 43    ```python
 44    from monoai.agents import Agent
 45    import json
 46    
 47    def custom_stream_handler(content):
 48        # Process streaming content in real-time
 49        print(f"Streaming: {content}", end='', flush=True)
 50    
 51    agent = Agent(model, paradigm="react")
 52    agent.enable_streaming(custom_stream_handler)
 53    result = agent.run("Your query here")
 54    ```
 55
 563. Function Calling Agent:
 57    ```python
 58    from monoai.agents import Agent
 59    from monoai.tools import search_web
 60    
 61    agent = Agent(model, paradigm="function_calling")
 62    agent.register_tools([search_web])
 63    agent.enable_streaming()
 64    result = agent.run("Search for recent AI news")
 65    ```
 66
 674. Plan and Execute Agent:
 68    ```python
 69    agent = Agent(model, paradigm="plan-and-execute")
 70    agent.enable_streaming()
 71    result = agent.run("Create a detailed project plan")
 72    ```
 73
 74Available Agent Types:
 75- FunctionCallingAgenticLoop: Native OpenAI function calling
 76- ReactAgenticLoop: ReAct reasoning pattern
 77- ReactWithFCAgenticLoop: Hybrid ReAct + Function Calling
 78- ProgrammaticAgenticLoop: Code generation and execution
 79- PlanAndExecuteAgenticLoop: Planning then execution pattern
 80- ReflexionAgenticLoop: Self-reflection and improvement
 81- SelfAskAgenticLoop: Self-questioning approach
 82- SelfAskWithSearchLoop: Self-ask with web search capabilities
 83
 84Streaming Callback Format:
 85The streaming callback receives content as plain text strings. For advanced use cases,
 86you can access the raw streaming data through the model's streaming methods.
 87
 88Error Handling:
 89- Automatic fallback to non-streaming mode on errors
 90- Proper cleanup of async resources
 91- Configurable debug output
 92- Iteration limits to prevent infinite loops
 93
 94Thread Safety:
 95This implementation is designed for single-threaded use. For concurrent access,
 96create separate agent instances for each thread or process.
 97"""
 98
 99from .agentic_loops._agentic_loop import _AgenticLoop, _FunctionCallingMixin, _ReactMixin
100from .agentic_loops._function_calling_agentic_loop import FunctionCallingAgenticLoop
101from .agentic_loops._react_agentic_loop import ReactAgenticLoop, ReactWithFCAgenticLoop, _BaseReactLoop
102from .agentic_loops._other_agentic_loops import (
103    ProgrammaticAgenticLoop,
104    PlanAndExecuteAgenticLoop,
105    ReflexionAgenticLoop,
106    SelfAskAgenticLoop,
107    SelfAskWithSearchLoop
108)
109
110# Main Agent class that provides a unified interface
111from .agent import Agent
112
113__all__ = [
114    # Base classes
115    '_AgenticLoop',
116    '_FunctionCallingMixin', 
117    '_ReactMixin',
118    '_BaseReactLoop',
119    
120    # Agent implementations
121    'FunctionCallingAgenticLoop',
122    'ReactAgenticLoop',
123    'ReactWithFCAgenticLoop',
124    'ProgrammaticAgenticLoop',
125    'PlanAndExecuteAgenticLoop',
126    'ReflexionAgenticLoop',
127    'SelfAskAgenticLoop',
128    'SelfAskWithSearchLoop',
129    
130    # Main interface
131    'Agent'
132]
class _AgenticLoop:
 222class _AgenticLoop:
 223    """Base class for all agentic loop implementations.
 224    
 225    This class provides the core functionality for all AI agents, including
 226    tool management, message creation, model execution, and streaming support.
 227    It's designed to be extended by specific classes that implement different
 228    agentic approaches and reasoning patterns.
 229    
 230    The base class handles common operations like:
 231    - Tool registration and execution
 232    - Message formatting and conversation management
 233    - Model interaction (both streaming and non-streaming)
 234    - Step-based reasoning format parsing
 235    - Debug output and iteration limits
 236    
 237    Attributes
 238    ----------
 239    _model : Any
 240        AI model instance used for execution
 241    _agentic_prompt : str, Prompt, optional
 242        Custom prompt for the agent (None for default). Can be a string, 
 243        a Prompt object, or a path to a .prompt file
 244    _debug : bool
 245        Flag to enable debug output
 246    _max_iter : Optional[int]
 247        Maximum number of iterations allowed (None for unlimited)
 248    _stream_callback : Optional[Callable[[str], None]]
 249        Callback function for handling streaming content
 250    _tools : Dict[str, Any]
 251        Dictionary of available tools, mapped by name
 252    """
 253
 254    _DEFAULT_PROMPT_PATH = "monoai/agents/prompts/"
 255    
 256    def __init__(self, model: Model, agentic_prompt: str=None, debug: bool=False, max_iter: Optional[int]=None, 
 257                 stream_callback: Optional[Callable[[str], None]]=None, human_feedback: Optional[str]=None) -> None:
 258        """Initialize the agent with model and configuration.
 259        
 260        Parameters
 261        ----------
 262        model : Any
 263            AI model instance to use for execution
 264        agentic_prompt : str, optional
 265            Custom prompt for the agent (None to use default)
 266        debug : bool, default False
 267            Enable debug output and logging
 268        max_iter : Optional[int], default None
 269            Maximum number of iterations allowed (None for unlimited)
 270        stream_callback : Optional[Callable[[str], None]], default None
 271            Callback function for handling streaming content chunks
 272        human_feedback : Optional[str], default None
 273            Human feedback mode for controlling agent execution. Can be:
 274            - None: No human feedback required
 275            - "actions": Pause and request confirmation before executing tool actions
 276            - "all": Pause after every step for human review
 277        """
 278        self._model = model
 279        self._agentic_prompt = agentic_prompt
 280        self._debug = debug
 281        self._max_iter = max_iter
 282        self._stream_callback = stream_callback
 283        self._human_feedback = human_feedback
 284        self._tools = {}
 285        self._mcp_servers = {}
 286
 287    def register_tools(self, tools: List[Any]) -> None:
 288        """Register tools with the agent.
 289        
 290        Parameters
 291        ----------
 292        tools : List[Any]
 293            List of tool functions to register. Each tool must have a
 294            __name__ attribute for identification.
 295        """
 296        for tool in tools:
 297            self._tools[tool.__name__] = tool
 298
 299    
 300    def register_mcp_servers(self, mcp_servers: List[Any]) -> None:
 301        """Register MCP servers with the agent.
 302        
 303        Parameters
 304        ----------
 305        mcp_servers : List[Any]
 306            List of MCP server instances to register.
 307        """
 308        tools = {}
 309        self._mcp_servers = {}
 310        for mcp_server in mcp_servers:
 311                self._debug_print(f"Connecting to MCP server: {mcp_server.name}")
 312                # Use context manager for proper connection handling
 313                self._mcp_servers[mcp_server.name] = mcp_server
 314                server_tools =  mcp_server.get_tools()
 315                self._debug_print(f"Retrieved {len(server_tools)} tools from {mcp_server.name}")
 316                for tool in server_tools:
 317                    # Create a new tool name with MCP prefix
 318                    original_name = tool.name
 319                    tool.name = f"mcp_{mcp_server.name}_{original_name}"
 320                    tools[tool.name] = tool
 321                    self._debug_print(f" - {tool.name}")
 322                
 323        self._tools.update(tools)
 324
 325    
 326    def enable_streaming(self, stream_callback: Optional[Callable[[str], None]] = None) -> None:
 327        """Enable streaming responses for this agent.
 328        
 329        When streaming is enabled, the agent will call the provided callback
 330        function with each content chunk as it's generated, allowing for
 331        real-time processing and display of the AI's response.
 332        
 333        Parameters
 334        ----------
 335        stream_callback : Optional[Callable[[str], None]], default None
 336            Callback function to handle streaming content chunks.
 337            If None, uses a default callback that prints content to console.
 338            The callback receives plain text content strings.
 339        """
 340        if stream_callback is None:
 341            def default_callback(content):
 342                print(content, end='', flush=True)
 343            
 344            self._stream_callback = default_callback
 345        else:
 346            self._stream_callback = stream_callback
 347    
 348    def disable_streaming(self) -> None:
 349        """Disable streaming responses for this agent.
 350        
 351        After calling this method, the agent will use standard (non-streaming)
 352        model execution for all subsequent requests.
 353        """
 354        self._stream_callback = None
 355    
 356    @classmethod
 357    def create_streaming(cls, model: Any, stream_callback: Optional[Callable[[str], None]] = None, 
 358                        **kwargs) -> '_AgenticLoop':
 359        """Create an agent instance with streaming enabled.
 360        
 361        This is a convenience class method that creates an agent instance
 362        with streaming already enabled, avoiding the need to call
 363        enable_streaming() separately.
 364        
 365        Parameters
 366        ----------
 367        model : Any
 368            AI model instance to use
 369        stream_callback : Optional[Callable[[str], None]], default None
 370            Callback function for handling streaming content chunks
 371        **kwargs
 372            Additional parameters to pass to the constructor
 373        
 374        Returns
 375        -------
 376        _AgenticLoop
 377            Agent instance with streaming enabled
 378        """
 379        return cls(model, stream_callback=stream_callback, **kwargs)
 380
 381    def _get_tools(self) -> str:
 382        """Generate a descriptive string of available tools.
 383        
 384        This method creates a formatted string listing all registered tools
 385        with their signatures and documentation. This string is typically
 386        included in prompts to help the AI model understand what tools
 387        are available for use.
 388        
 389        Returns
 390        -------
 391        str
 392            Formatted string with descriptions of all available tools,
 393            one per line with " - " prefix. Returns empty string if no tools.
 394            
 395            Example:
 396             - colab_downloader(colab_url): Downloads a Jupyter notebook from Google Drive and returns its Python code as a string.  
 397                Args:     colab_url (str): The Google Drive URL of the notebook (should contain 'drive/').
 398        """
 399        if not self._tools:
 400            return ""
 401        
 402        tools = []
 403        for tool_name, tool_func in self._tools.items():
 404            if tool_name.startswith("mcp_"):
 405                tools.append(f" - {self._encode_mcp_tool(tool_func)}")
 406            else:
 407                tools.append(f" - {self._encode_tool(tool_func)}")
 408        return "\n".join(tools)
 409
 410    def _get_base_messages(self, agent_type: str, query: str) -> List[Dict[str, Any]]:
 411        """Generate base messages for the specific agent type.
 412        
 413        This method creates the initial message structure for the agent,
 414        including the appropriate prompt template and user query. The
 415        prompt template is selected based on the agent type and includes
 416        information about available tools.
 417        
 418        Parameters
 419        ----------
 420        agent_type : str
 421            Type of agent to determine which prompt template to use
 422        query : str
 423            User query to include in the prompt
 424        
 425        Returns
 426        -------
 427        List[Dict[str, Any]]
 428            List of base messages for the agent, including the prompt and query
 429        """
 430        tools = self._get_tools()
 431        prompt_data = {"available_tools": tools}
 432        
 433        # Handle case where _agentic_prompt is already a Prompt object
 434        if self._agentic_prompt is not None and hasattr(self._agentic_prompt, 'as_dict'):
 435            # It's already a Prompt object, just update the data and return
 436            agentic_prompt = self._agentic_prompt
 437            # Update prompt data if the prompt supports it
 438            if hasattr(agentic_prompt, 'prompt_data'):
 439                agentic_prompt.prompt_data = prompt_data
 440        else:           
 441            # Otherwise, determine prompt configuration and create new Prompt
 442            prompt_config = self._determine_prompt_config(agent_type)
 443            
 444            agentic_prompt = Prompt(
 445                **prompt_config,
 446                is_system=True,
 447                prompt_data=prompt_data
 448            )
 449
 450        messages = [agentic_prompt.as_dict()]
 451
 452        if isinstance(query, Prompt):
 453            messages.append(query.as_dict())
 454        else:
 455            messages.append({"role": "user", "content": query})
 456
 457        return messages
 458    
 459    def _determine_prompt_config(self, agent_type: str) -> Dict[str, Any]:
 460        """Determine the prompt configuration based on agentic_prompt setting.
 461        
 462        Parameters
 463        ----------
 464        agent_type : str
 465            Type of agent for fallback prompt selection
 466            
 467        Returns
 468        -------
 469        Dict[str, Any]
 470            Configuration dictionary for Prompt initialization
 471        """
 472        if self._agentic_prompt is None:
 473            return {
 474                "prompt_id": f"{self._DEFAULT_PROMPT_PATH}{agent_type}.prompt"
 475            }
 476        
 477        if isinstance(self._agentic_prompt, str):
 478            if self._agentic_prompt.endswith(".prompt"):
 479                return {"prompt_id": self._agentic_prompt}
 480            else:
 481                return {"prompt": self._agentic_prompt}
 482        else:
 483            # For non-string types (including Prompt objects), treat as prompt content
 484            return {"prompt": self._agentic_prompt}
 485
 486    def _debug_print(self, content: str) -> None:
 487        """Print debug information if debug mode is enabled.
 488        
 489        Parameters
 490        ----------
 491        content : str
 492            Content to print in debug mode
 493        """
 494        if self._debug:
 495            print(content)
 496            print("-------")
 497    
 498    def _request_human_feedback(self, step_type: str, content: str, action_data: Optional[Dict] = None) -> bool:
 499        """Request human feedback for the current step.
 500        
 501        This method pauses execution and asks the user for confirmation
 502        before proceeding with the current step.
 503        
 504        Parameters
 505        ----------
 506        step_type : str
 507            Type of step being executed (e.g., "thought", "action", "observation")
 508        content : str
 509            Content of the current step
 510        action_data : Optional[Dict], default None
 511            Action data if this is an action step
 512        
 513        Returns
 514        -------
 515        bool
 516            True if the user approves the step, False if they want to stop
 517        """
 518        print(f"\n{'='*60}")
 519        print(f"HUMAN FEEDBACK REQUIRED - {step_type.upper()} STEP")
 520        print(f"{'='*60}")
 521        
 522        if action_data:
 523            print(f"Action to execute: {action_data.get('name', 'Unknown')}")
 524            print(f"Arguments: {action_data.get('arguments', {})}")
 525        else:
 526            print(f"Content: {content}")
 527        
 528        print(f"\nOptions:")
 529        print(f"  [y] Yes - Continue with this step")
 530        print(f"  [n] No - Stop execution")
 531        print(f"  [s] Skip - Skip this step and continue")
 532        
 533        while True:
 534            try:
 535                response = input("\nYour choice (y/n/s): ").lower().strip()
 536                if response in ['y', 'yes']:
 537                    return True
 538                elif response in ['n', 'no']:
 539                    return False
 540                elif response in ['s', 'skip']:
 541                    print("Step skipped.")
 542                    return True  # Continue but skip the current step
 543                else:
 544                    print("Please enter 'y' for yes, 'n' for no, or 's' for skip.")
 545            except KeyboardInterrupt:
 546                print("\nExecution interrupted by user.")
 547                return False
 548    
 549
 550    def _parse_step_format(self, content: str) -> Optional[Dict[str, Any]]:
 551        """Parse the step-based format <STEP_TYPE>: <RESULT>.
 552        
 553        This method parses agent responses that follow the structured step format
 554        used by ReAct and similar reasoning patterns. It extracts the step type,
 555        step number, and content from formatted responses.
 556        
 557        Supported step types:
 558        - Thought: Reasoning and analysis steps
 559        - Action: Tool calls and actions (must be valid JSON)
 560        - Observation: Results from tool executions
 561        - Final answer: Conclusive responses to user queries
 562        
 563        Special handling: 
 564        - If content contains both "Thought" and "Action", the Action will be returned with priority.
 565        - If content contains both "Thought" and "Final answer", the Final answer will be returned with priority.
 566        
 567        Parameters
 568        ----------
 569        content : str
 570            Content to parse in the format <STEP_TYPE>: <RESULT>
 571        
 572        Returns
 573        -------
 574        Optional[Dict[str, Any]]
 575            Dictionary containing:
 576            - step_type: Type of step (thought, action, observation, final answer)
 577            - step_number: Optional step number if present
 578            - content: Step content
 579            - action: Parsed action data (for action steps)
 580            - final_answer: Final answer content (for final answer steps)
 581            Returns None if parsing fails
 582        """
 583        if not content or not isinstance(content, str):
 584            return None
 585        
 586        content = content.strip()
 587        
 588        # Check for multiple step types and prioritize accordingly
 589        lines = content.split('\n')
 590        action_start = None
 591        thought_start = None
 592        final_answer_start = None
 593        
 594        # Find the start lines for Action, Thought, and Final answer
 595        for i, line in enumerate(lines):
 596            if re.match(r'^Action(?:\s+\d+)?\s*:', line, re.IGNORECASE):
 597                action_start = i
 598            elif re.match(r'^Thought(?:\s+\d+)?\s*:', line, re.IGNORECASE):
 599                thought_start = i
 600            elif re.match(r'^Final answer(?:\s+\d+)?\s*:', line, re.IGNORECASE):
 601                final_answer_start = i
 602        
 603        # Priority: Final answer > Action > Thought
 604        if final_answer_start is not None:
 605            # Final answer has highest priority
 606            step_type = "final_answer"
 607            
 608            # Extract step number from Final answer line
 609            final_answer_line = lines[final_answer_start]
 610            step_number_match = re.search(r'Final answer\s+(\d+)\s*:', final_answer_line, re.IGNORECASE)
 611            step_number = step_number_match.group(1) if step_number_match else None
 612            
 613            # Extract content from Final answer (everything after the colon on the first line)
 614            final_answer_content = final_answer_line.split(':', 1)[1].strip()
 615            
 616            # Add all subsequent lines until we hit another step type or end
 617            i = final_answer_start + 1
 618            while i < len(lines):
 619                line = lines[i].strip()
 620                if re.match(r'^(Thought|Action|Observation|Final answer)(?:\s+\d+)?\s*:', line, re.IGNORECASE):
 621                    break
 622                if line:  # Only add non-empty lines
 623                    final_answer_content += '\n' + line
 624                i += 1
 625            
 626            step_content = final_answer_content.strip().replace("```json", "").replace("```", "")
 627            
 628            result = {
 629                "step_type": step_type,
 630                "step_number": step_number,
 631                "content": step_content,
 632                "final_answer": step_content
 633            }
 634            
 635            return result
 636            
 637        elif action_start is not None and thought_start is not None:
 638            # Both Thought and Action present, prioritize Action
 639            step_type = "action"
 640            
 641            # Extract step number from Action line
 642            action_line = lines[action_start]
 643            step_number_match = re.search(r'Action\s+(\d+)\s*:', action_line, re.IGNORECASE)
 644            step_number = step_number_match.group(1) if step_number_match else None
 645            
 646            # Extract content from Action (everything after the colon on the first line)
 647            action_content = action_line.split(':', 1)[1].strip()
 648            
 649            # Add all subsequent lines until we hit another step type or end
 650            i = action_start + 1
 651            while i < len(lines):
 652                line = lines[i].strip()
 653                if re.match(r'^(Thought|Action|Observation|Final answer)(?:\s+\d+)?\s*:', line, re.IGNORECASE):
 654                    break
 655                if line:  # Only add non-empty lines
 656                    action_content += '\n' + line
 657                i += 1
 658            
 659            step_content = action_content.strip().replace("```json", "").replace("```", "")
 660            
 661            result = {
 662                "step_type": step_type,
 663                "step_number": step_number,
 664                "content": step_content
 665            }
 666            
 667            # Parse action data (must be JSON)
 668            try:
 669                action_data = json.loads(step_content)
 670                result["action"] = action_data
 671            except json.JSONDecodeError:
 672                # If not valid JSON, keep content as raw string
 673                result["action"] = {"raw": step_content}
 674            
 675            return result
 676        
 677        # Standard single step parsing
 678        step_pattern = r'^(Thought|Action|Observation|Final answer)(?:\s+(\d+))?\s*:\s*(.*)$'
 679        match = re.match(step_pattern, content, re.IGNORECASE | re.DOTALL)
 680        
 681        if match:
 682            step_type = match.group(1).lower()
 683            step_number = match.group(2) if match.group(2) else None
 684            step_content = match.group(3).strip().replace("```json", "").replace("```", "")
 685            
 686            result = {
 687                "step_type": step_type,
 688                "step_number": step_number,
 689                "content": step_content
 690            }
 691            
 692            # Special handling for Action steps (must be JSON)
 693            if step_type == "action":
 694                try:
 695                    action_data = json.loads(step_content)
 696                    result["action"] = action_data
 697                except json.JSONDecodeError:
 698                    # If not valid JSON, keep content as raw string
 699                    result["action"] = {"raw": step_content}
 700            
 701            # Special handling for Final answer steps
 702            elif step_type == "final_answer":
 703                result["final_answer"] = step_content
 704            
 705            return result
 706        
 707        return None
 708
 709    def _execute_model_step(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
 710        """Execute a model step and return the response.
 711        
 712        This method handles both streaming and non-streaming model execution.
 713        If streaming is enabled and not already in progress, it uses the
 714        streaming method. Otherwise, it uses standard model execution.
 715        
 716        Parameters
 717        ----------
 718        messages : List[Dict[str, Any]]
 719            List of messages to send to the model
 720        
 721        Returns
 722        -------
 723        Dict[str, Any]
 724            Model response in standard OpenAI format with usage information
 725        """
 726        resp = self._model._execute(messages)
 727        
 728        # Extract usage information from the response
 729        usage_info = None
 730        if hasattr(resp, 'usage') and resp.usage:
 731            usage_info = {
 732                "total_tokens": resp.usage.total_tokens,
 733                "prompt_tokens": resp.usage.prompt_tokens,
 734                "completion_tokens": resp.usage.completion_tokens
 735            }
 736        
 737        message = resp["choices"][0]["message"]
 738        
 739        # Add usage information to the message
 740        if usage_info:
 741            message["usage"] = usage_info
 742        return message
 743    
 744    def _execute_model_step_stream(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
 745        """Execute a model step with streaming and return the complete response.
 746        
 747        This method handles asynchronous streaming of model responses, collecting
 748        all chunks and building the final response. It properly manages async
 749        resources to avoid memory leaks and task warnings.
 750        
 751        Parameters
 752        ----------
 753        messages : List[Dict[str, Any]]
 754            List of messages to send to the model
 755        
 756        Returns
 757        -------
 758        Dict[str, Any]
 759            Complete model response in standard OpenAI format
 760        """
 761
 762        import asyncio
 763        from litellm import stream_chunk_builder
 764        
 765        # Usa asyncio.run per gestire correttamente il loop
 766        async def run_streaming():
 767            chunks = []
 768            stream = None
 769            try:
 770                stream = self._model._execute_stream(messages)
 771                async for chunk in stream:
 772                    chunks.append(chunk)
 773                    content = chunk["choices"][0]["delta"]["content"]
 774
 775                    if content is not None:
 776                        self._stream_callback({"type": "text", "delta": content})
 777
 778            finally:
 779                if stream is not None:
 780                    try:
 781                        await stream.aclose()
 782                    except Exception:
 783                        pass
 784            
 785            return chunks
 786
 787        try:
 788            chunks = asyncio.run(run_streaming())
 789            resp = stream_chunk_builder(chunks)
 790            
 791            # Extract usage information from the response
 792            usage_info = None
 793            if hasattr(resp, 'usage') and resp.usage:
 794                usage_info = {
 795                    "total_tokens": resp.usage.total_tokens,
 796                    "prompt_tokens": resp.usage.prompt_tokens,
 797                    "completion_tokens": resp.usage.completion_tokens
 798                }
 799            
 800            message = resp["choices"][0]["message"]
 801            
 802            # Add usage information to the message
 803            if usage_info:
 804                message["usage"] = usage_info
 805                
 806            return message
 807        except Exception as e:
 808            print(f"Streaming error: {e}, falling back to standard execution")
 809            return self._execute_model_step(messages)
 810            
 811    def _create_base_response(self, query: str) -> Dict[str, Any]:
 812        """Create the base response structure.
 813        
 814        Parameters
 815        ----------
 816        query : str
 817            Original user query
 818        
 819        Returns
 820        -------
 821        Dict[str, Any]
 822            Dictionary with base response structure:
 823            - prompt: Original user query
 824            - iterations: Empty list for iterations
 825            - usage: Dictionary for tracking token usage
 826        """
 827        return {
 828            "prompt": query, 
 829            "iterations": [],
 830            "usage": {
 831                "total_tokens": 0,
 832                "prompt_tokens": 0,
 833                "completion_tokens": 0
 834            }
 835        }
 836
 837    def _update_usage(self, response: Dict[str, Any], iteration_usage: Dict[str, int]) -> None:
 838        """Update the total usage in the response.
 839        
 840        Parameters
 841        ----------
 842        response : Dict[str, Any]
 843            Response dictionary to update
 844        iteration_usage : Dict[str, int]
 845            Usage information from current iteration
 846        """
 847        if hasattr(iteration_usage, "usage"):
 848            response["usage"]["total_tokens"] += iteration_usage["usage"]["total_tokens"]
 849            response["usage"]["prompt_tokens"] += iteration_usage["usage"]["prompt_tokens"]
 850            response["usage"]["completion_tokens"] += iteration_usage["usage"]["completion_tokens"]
 851
 852    def _handle_final_answer(self, iteration: Dict[str, Any], response: Dict[str, Any]) -> bool:
 853        """Handle a final answer, returns True if this is the end.
 854        
 855        This method processes iterations that contain final answers and
 856        updates the response structure accordingly. It supports both
 857        the old format (final_answer key) and new step format.
 858        
 859        Parameters
 860        ----------
 861        iteration : Dict[str, Any]
 862            Current iteration potentially containing a final answer
 863        response : Dict[str, Any]
 864            Response dictionary to update
 865        
 866        Returns
 867        -------
 868        bool
 869            True if a final answer was found and processed, False otherwise
 870        
 871        Notes
 872        -----
 873        This method modifies the response object directly.
 874        """
 875        if "final_answer" in iteration:
 876            response["iterations"].append(iteration)
 877            response["response"] = iteration["final_answer"]
 878            return True
 879        elif iteration.get("step_type") == "final answer":
 880            response["iterations"].append(iteration)
 881            response["response"] = iteration["content"]
 882            return True
 883        return False
 884
 885    def _pngimagefile_to_base64_data_uri(self, img) -> str:
 886        """
 887        Converte un oggetto PIL.PngImageFile in una stringa base64 data URI.
 888        
 889        Args:
 890            img (Image.Image): immagine PIL (PngImageFile o convertita in PNG)
 891        
 892        Returns:
 893            str: stringa "data:image/png;base64,<...>"
 894        """
 895        import io
 896        import base64
 897        buffer = io.BytesIO()
 898        img.save(buffer, format="PNG")
 899        buffer.seek(0)
 900
 901        # codifica in base64
 902        encoded = base64.b64encode(buffer.read()).decode("utf-8")
 903
 904        return f"data:image/png;base64,{encoded}"
 905
 906    def _handle_tool_action(self, iteration: Dict[str, Any], response: Dict[str, Any], messages: List[Dict[str, Any]]) -> None:
 907        """Handle a tool action execution.
 908        
 909        This method processes iterations that contain tool actions, executes
 910        the corresponding tools, and updates the conversation with the results.
 911        It supports both old and new step-based formats.
 912        
 913        Parameters
 914        ----------
 915        iteration : Dict[str, Any]
 916            Current iteration containing the tool action
 917        response : Dict[str, Any]
 918            Response dictionary to update
 919        messages : List[Dict[str, Any]]
 920            Message list to update with observation
 921        
 922        Notes
 923        -----
 924        This method modifies the response and messages objects directly.
 925        """
 926        # Check if human feedback is required for actions
 927        if self._human_feedback == "actions" or self._human_feedback == "all":
 928            action_data = iteration.get("action", {})
 929            if not self._request_human_feedback("action", iteration.get("content", ""), action_data):
 930                print("Execution stopped by user.")
 931                return
 932
 933        if "action" in iteration and iteration["action"].get("name"):
 934            tool_call = iteration["action"]
 935            
 936            if self._stream_callback is not None:
 937                self._stream_callback({"type": "tool_call", "tool_call": tool_call})
 938
 939            if tool_call.get("name").startswith("mcp_"):
 940                tool_result = self._call_mcp_tool(tool_call)
 941            else:
 942                tool_result = self._call_tool(tool_call)
 943
 944            iteration["observation"] = tool_result
 945            response["iterations"].append(iteration)
 946
 947            if isinstance(tool_result, dict):
 948                if "image" in tool_result:
 949                    img = self._pngimagefile_to_base64_data_uri(tool_result["image"])
 950                    msg = [{"type": "image_url", "image_url": {"url": img}}, 
 951                            {"type": "text", "text": "Ecco l'immagine"}]
 952                    messages.append({"role": "user", "content": msg})
 953                else:
 954                    msg = json.dumps({"observation": tool_result})
 955                    messages.append({"role": "user", "content": msg})
 956        elif iteration.get("step_type") == "action" and "action" in iteration:
 957            tool_call = iteration["action"]
 958            tool_result = self._call_tool(tool_call)
 959            iteration["observation"] = tool_result
 960            response["iterations"].append(iteration)
 961            
 962            # Add observation in the new system format
 963            observation_msg = f"Observation {iteration.get('step_number', '')}: {tool_result}".strip()
 964            messages.append({"role": "user", "content": observation_msg})
 965
 966    def _handle_default(self, iteration: Dict[str, Any], response: Dict[str, Any], messages: List[Dict[str, Any]]) -> None:
 967        """Handle default case for unhandled iterations.
 968        
 969        This method processes iterations that don't match specific handlers,
 970        adding them to the response and updating the conversation flow.
 971        It supports both step-based and JSON formats.
 972        
 973        Parameters
 974        ----------
 975        iteration : Dict[str, Any]
 976            Current iteration to handle
 977        response : Dict[str, Any]
 978            Response dictionary to update
 979        messages : List[Dict[str, Any]]
 980            Message list to update
 981        
 982        Notes
 983        -----
 984        This method modifies the response and messages objects directly.
 985        """
 986        response["iterations"].append(iteration)
 987        
 988        # For new format, add content as user message
 989        if iteration.get("step_type") in ["thought", "observation"]:
 990            step_type = iteration["step_type"].capitalize()
 991            step_number = iteration.get("step_number", "")
 992            content = iteration["content"]
 993            message_content = f"{step_type} {step_number}: {content}".strip()
 994            messages.append({"role": "user", "content": message_content})
 995        else:
 996            # Fallback to JSON format for compatibility
 997            messages.append({"role": "user", "content": json.dumps(iteration)})
 998
 999    def start(self, query: str) -> Dict[str, Any]:
1000        """Abstract method to start the agentic loop.
1001        
1002        This method must be implemented by subclasses to define the specific
1003        agentic behavior and reasoning pattern.
1004        
1005        Parameters
1006        ----------
1007        query : str
1008            User query to process
1009        
1010        Returns
1011        -------
1012        Dict[str, Any]
1013            Agent response containing iterations and final result
1014        
1015        Raises
1016        ------
1017        NotImplementedError
1018            This method must be implemented by subclasses
1019        """
1020        raise NotImplementedError

Base class for all agentic loop implementations.

This class provides the core functionality for all AI agents, including tool management, message creation, model execution, and streaming support. It's designed to be extended by specific classes that implement different agentic approaches and reasoning patterns.

The base class handles common operations like:

  • Tool registration and execution
  • Message formatting and conversation management
  • Model interaction (both streaming and non-streaming)
  • Step-based reasoning format parsing
  • Debug output and iteration limits
Attributes
  • _model (Any): AI model instance used for execution
  • _agentic_prompt (str, Prompt, optional): Custom prompt for the agent (None for default). Can be a string, a Prompt object, or a path to a .prompt file
  • _debug (bool): Flag to enable debug output
  • _max_iter (Optional[int]): Maximum number of iterations allowed (None for unlimited)
  • _stream_callback (Optional[Callable[[str], None]]): Callback function for handling streaming content
  • _tools (Dict[str, Any]): Dictionary of available tools, mapped by name
_AgenticLoop( model: monoai.models.Model, agentic_prompt: str = None, debug: bool = False, max_iter: Optional[int] = None, stream_callback: Optional[Callable[[str], NoneType]] = None, human_feedback: Optional[str] = None)
256    def __init__(self, model: Model, agentic_prompt: str=None, debug: bool=False, max_iter: Optional[int]=None, 
257                 stream_callback: Optional[Callable[[str], None]]=None, human_feedback: Optional[str]=None) -> None:
258        """Initialize the agent with model and configuration.
259        
260        Parameters
261        ----------
262        model : Any
263            AI model instance to use for execution
264        agentic_prompt : str, optional
265            Custom prompt for the agent (None to use default)
266        debug : bool, default False
267            Enable debug output and logging
268        max_iter : Optional[int], default None
269            Maximum number of iterations allowed (None for unlimited)
270        stream_callback : Optional[Callable[[str], None]], default None
271            Callback function for handling streaming content chunks
272        human_feedback : Optional[str], default None
273            Human feedback mode for controlling agent execution. Can be:
274            - None: No human feedback required
275            - "actions": Pause and request confirmation before executing tool actions
276            - "all": Pause after every step for human review
277        """
278        self._model = model
279        self._agentic_prompt = agentic_prompt
280        self._debug = debug
281        self._max_iter = max_iter
282        self._stream_callback = stream_callback
283        self._human_feedback = human_feedback
284        self._tools = {}
285        self._mcp_servers = {}

Initialize the agent with model and configuration.

Parameters
  • model (Any): AI model instance to use for execution
  • agentic_prompt (str, optional): Custom prompt for the agent (None to use default)
  • debug (bool, default False): Enable debug output and logging
  • max_iter (Optional[int], default None): Maximum number of iterations allowed (None for unlimited)
  • stream_callback (Optional[Callable[[str], None]], default None): Callback function for handling streaming content chunks
  • human_feedback (Optional[str], default None): Human feedback mode for controlling agent execution. Can be:
    • None: No human feedback required
    • "actions": Pause and request confirmation before executing tool actions
    • "all": Pause after every step for human review
def register_tools(self, tools: List[Any]) -> None:
287    def register_tools(self, tools: List[Any]) -> None:
288        """Register tools with the agent.
289        
290        Parameters
291        ----------
292        tools : List[Any]
293            List of tool functions to register. Each tool must have a
294            __name__ attribute for identification.
295        """
296        for tool in tools:
297            self._tools[tool.__name__] = tool

Register tools with the agent.

Parameters
  • tools (List[Any]): List of tool functions to register. Each tool must have a __name__ attribute for identification.
def register_mcp_servers(self, mcp_servers: List[Any]) -> None:
300    def register_mcp_servers(self, mcp_servers: List[Any]) -> None:
301        """Register MCP servers with the agent.
302        
303        Parameters
304        ----------
305        mcp_servers : List[Any]
306            List of MCP server instances to register.
307        """
308        tools = {}
309        self._mcp_servers = {}
310        for mcp_server in mcp_servers:
311                self._debug_print(f"Connecting to MCP server: {mcp_server.name}")
312                # Use context manager for proper connection handling
313                self._mcp_servers[mcp_server.name] = mcp_server
314                server_tools =  mcp_server.get_tools()
315                self._debug_print(f"Retrieved {len(server_tools)} tools from {mcp_server.name}")
316                for tool in server_tools:
317                    # Create a new tool name with MCP prefix
318                    original_name = tool.name
319                    tool.name = f"mcp_{mcp_server.name}_{original_name}"
320                    tools[tool.name] = tool
321                    self._debug_print(f" - {tool.name}")
322                
323        self._tools.update(tools)

Register MCP servers with the agent.

Parameters
  • mcp_servers (List[Any]): List of MCP server instances to register.
def enable_streaming( self, stream_callback: Optional[Callable[[str], NoneType]] = None) -> None:
326    def enable_streaming(self, stream_callback: Optional[Callable[[str], None]] = None) -> None:
327        """Enable streaming responses for this agent.
328        
329        When streaming is enabled, the agent will call the provided callback
330        function with each content chunk as it's generated, allowing for
331        real-time processing and display of the AI's response.
332        
333        Parameters
334        ----------
335        stream_callback : Optional[Callable[[str], None]], default None
336            Callback function to handle streaming content chunks.
337            If None, uses a default callback that prints content to console.
338            The callback receives plain text content strings.
339        """
340        if stream_callback is None:
341            def default_callback(content):
342                print(content, end='', flush=True)
343            
344            self._stream_callback = default_callback
345        else:
346            self._stream_callback = stream_callback

Enable streaming responses for this agent.

When streaming is enabled, the agent will call the provided callback function with each content chunk as it's generated, allowing for real-time processing and display of the AI's response.

Parameters
  • stream_callback (Optional[Callable[[str], None]], default None): Callback function to handle streaming content chunks. If None, uses a default callback that prints content to console. The callback receives plain text content strings.
def disable_streaming(self) -> None:
348    def disable_streaming(self) -> None:
349        """Disable streaming responses for this agent.
350        
351        After calling this method, the agent will use standard (non-streaming)
352        model execution for all subsequent requests.
353        """
354        self._stream_callback = None

Disable streaming responses for this agent.

After calling this method, the agent will use standard (non-streaming) model execution for all subsequent requests.

@classmethod
def create_streaming( cls, model: Any, stream_callback: Optional[Callable[[str], NoneType]] = None, **kwargs) -> _AgenticLoop:
356    @classmethod
357    def create_streaming(cls, model: Any, stream_callback: Optional[Callable[[str], None]] = None, 
358                        **kwargs) -> '_AgenticLoop':
359        """Create an agent instance with streaming enabled.
360        
361        This is a convenience class method that creates an agent instance
362        with streaming already enabled, avoiding the need to call
363        enable_streaming() separately.
364        
365        Parameters
366        ----------
367        model : Any
368            AI model instance to use
369        stream_callback : Optional[Callable[[str], None]], default None
370            Callback function for handling streaming content chunks
371        **kwargs
372            Additional parameters to pass to the constructor
373        
374        Returns
375        -------
376        _AgenticLoop
377            Agent instance with streaming enabled
378        """
379        return cls(model, stream_callback=stream_callback, **kwargs)

Create an agent instance with streaming enabled.

This is a convenience class method that creates an agent instance with streaming already enabled, avoiding the need to call enable_streaming() separately.

Parameters
  • model (Any): AI model instance to use
  • stream_callback (Optional[Callable[[str], None]], default None): Callback function for handling streaming content chunks
  • **kwargs: Additional parameters to pass to the constructor
Returns
  • _AgenticLoop: Agent instance with streaming enabled
def start(self, query: str) -> Dict[str, Any]:
 999    def start(self, query: str) -> Dict[str, Any]:
1000        """Abstract method to start the agentic loop.
1001        
1002        This method must be implemented by subclasses to define the specific
1003        agentic behavior and reasoning pattern.
1004        
1005        Parameters
1006        ----------
1007        query : str
1008            User query to process
1009        
1010        Returns
1011        -------
1012        Dict[str, Any]
1013            Agent response containing iterations and final result
1014        
1015        Raises
1016        ------
1017        NotImplementedError
1018            This method must be implemented by subclasses
1019        """
1020        raise NotImplementedError

Abstract method to start the agentic loop.

This method must be implemented by subclasses to define the specific agentic behavior and reasoning pattern.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response containing iterations and final result
Raises
  • NotImplementedError: This method must be implemented by subclasses
class _FunctionCallingMixin:
21class _FunctionCallingMixin:
22    """Mixin for handling OpenAI function calling tool execution.
23    
24    This mixin provides methods for managing tool calls in OpenAI's native
25    function calling format, converting tool responses into standardized messages
26    that can be used in the conversation flow.
27    
28    The mixin handles the execution of tools registered with the agent and
29    formats their responses according to OpenAI's message format specification.
30    """
31    
32    def _call_tool(self, tool_call: Any) -> Dict[str, str]:
33        """Execute a tool call and return the formatted response.
34        
35        This method takes a tool call object from the AI model's response,
36        executes the corresponding tool function, and returns a properly
37        formatted message that can be added to the conversation history.
38        
39        Parameters
40        ----------
41        tool_call : Any
42            Tool call object containing function execution details.
43            Must have attributes:
44            - function.name: Name of the function to call
45            - function.arguments: JSON string of function arguments
46            - id: Unique identifier for this tool call
47        
48        Returns
49        -------
50        Dict[str, str]
51            Formatted tool response message containing:
52            - tool_call_id: ID of the original tool call
53            - role: Message role (always "tool")
54            - name: Name of the executed function
55            - content: Tool execution result as string
56        
57        Raises
58        ------
59        KeyError
60            If the tool function is not registered with the agent
61        json.JSONDecodeError
62            If the function arguments are not valid JSON
63        Exception
64            If the tool function execution fails
65        """
66
67        function_name = tool_call.function.name
68        function_to_call = self._tools[function_name]
69        function_args = json.loads(tool_call.function.arguments)                
70        function_response = str(function_to_call(**function_args))
71        return {
72            "tool_call_id": tool_call.id,
73            "role": "tool",
74            "name": function_name,
75            "content": function_response,
76        }
77
78    def _call_mcp_tool(self, tool_call: Dict[str, Any]) -> Any:
79        """Execute a MCP tool call."""
80        tool_name = tool_call.function.name
81        tool_arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
82        tool_call_id = tool_call.id
83        
84        # Get server name from tool name (format: mcp_servername_toolname)
85        server_name = tool_name.split("_", 2)[1]
86        if server_name not in self._mcp_servers:
87            raise ValueError(f"MCP server '{server_name}' not found")
88        # Simple approach: just return an error for now since MCP is not working
89        return {
90            "tool_call_id": tool_call_id,
91            "role": "tool",
92            "name": tool_name,
93            "content": str(self._mcp_servers[server_name].call_tool(tool_name, tool_arguments))
94        }

Mixin for handling OpenAI function calling tool execution.

This mixin provides methods for managing tool calls in OpenAI's native function calling format, converting tool responses into standardized messages that can be used in the conversation flow.

The mixin handles the execution of tools registered with the agent and formats their responses according to OpenAI's message format specification.

class _ReactMixin:
 98class _ReactMixin:
 99    """Mixin for handling ReAct-style tool calls with JSON format.
100    
101    This mixin provides methods for managing tool calls in structured JSON format,
102    typical of ReAct-style approaches for AI agents. It handles the encoding
103    of tool functions and execution of tool calls based on parsed JSON responses.
104    
105    The ReAct (Reasoning and Acting) pattern allows agents to reason about
106    problems step-by-step and take actions using tools when needed.
107    """
108    
109    def _encode_tool(self, func: Any) -> str:
110        """Encode a function into a descriptive string format.
111        
112        This method creates a human-readable description of a tool function
113        that can be included in prompts to help the AI model understand
114        what tools are available and how to use them.
115        
116        Parameters
117        ----------
118        func : Any
119            Function to encode. Must have attributes:
120            - __name__: Function name
121            - __doc__: Function documentation
122        
123        Returns
124        -------
125        str
126            Descriptive string in the format:
127            "function_name(signature): documentation"
128            Newlines are replaced with spaces for single-line format.
129        """
130        sig = inspect.signature(func)
131        doc = inspect.getdoc(func)
132        encoded = func.__name__ + str(sig) + ": " + doc
133        encoded = encoded.replace("\n", " ")
134        return encoded
135    
136    def _encode_mcp_tool(self, tool: dict) -> str:
137        """Converte uno schema tipo JSON Schema in sezione Args Google style."""
138        schema = tool.inputSchema
139        if schema.get("type") != "object":
140            raise ValueError("Lo schema radice deve avere type=object")
141
142        args = []
143        args_doc = ["Args:"]
144        properties = schema.get("properties", {})
145        required = set(schema.get("required", []))
146
147        for name, spec in properties.items():
148            args.append(name)
149            typ = spec.get("type", "Any")
150            desc = spec.get("description", "").strip().replace("\n", " ")
151            title = spec.get("title", "")
152            default_info = "" if name in required else " Defaults to None."
153
154            arg_line = f"    {name} ({typ}): {desc}{default_info}"
155            if title:
156                arg_line = f"    {name} ({typ}): {title}. {desc}{default_info}"
157            args_doc.append(arg_line)
158
159        encoded = tool.name + "(" + ", ".join(args) + "): "
160        encoded += tool.description
161        encoded += ". ".join(args_doc)
162        return encoded
163
164    
165    def _call_tool(self, tool_call: Dict[str, Any]) -> Any:
166        """Execute a tool call in ReAct format.
167        
168        This method executes a tool call based on a structured dictionary
169        containing the tool name and arguments. It's designed to work with
170        the ReAct pattern where tools are called through JSON-formatted
171        action specifications.
172        
173        Parameters
174        ----------
175        tool_call : Dict[str, Any]
176            Tool call specification containing:
177            - name: Name of the tool to call
178            - arguments: Dictionary of tool arguments
179        
180        Returns
181        -------
182        Any
183            Result of the tool execution
184        
185        Raises
186        ------
187        KeyError
188            If the tool is not registered with the agent
189        TypeError
190            If the tool arguments don't match the function signature
191        Exception
192            If the tool execution fails
193        """
194        tool = self._tools[tool_call["name"]]
195        kwargs = list(tool_call["arguments"].values())
196        return tool(*kwargs)
197
198    def _call_mcp_tool(self, tool_call: Dict[str, Any]) -> Any:
199        """Execute a MCP tool call.
200        
201        This method executes a MCP tool call based on a structured dictionary
202        containing the tool name and arguments.
203        """
204        # Extract server name from tool name (format: mcp_servername_toolname)
205        tool_name_parts = tool_call["name"].split("_", 2)
206        if len(tool_name_parts) < 3:
207            raise ValueError(f"Invalid MCP tool name format: {tool_call['name']}")
208        
209        server_name = tool_name_parts[1]
210        tool_name = tool_name_parts[2]
211        if server_name not in self._mcp_servers:
212            raise ValueError(f"MCP server '{server_name}' not found")
213
214        return {
215            "tool_call_id": tool_name,
216            "role": "tool",
217            "name": tool_name,
218            "content": str(self._mcp_servers[server_name].call_tool(tool_name, tool_call["arguments"]))
219        }

Mixin for handling ReAct-style tool calls with JSON format.

This mixin provides methods for managing tool calls in structured JSON format, typical of ReAct-style approaches for AI agents. It handles the encoding of tool functions and execution of tool calls based on parsed JSON responses.

The ReAct (Reasoning and Acting) pattern allows agents to reason about problems step-by-step and take actions using tools when needed.

class _BaseReactLoop(monoai.agents._AgenticLoop, monoai.agents._ReactMixin):
 14class _BaseReactLoop(_AgenticLoop, _ReactMixin):
 15    """Base class for all ReAct-style agents.
 16    
 17    This class implements the standard loop for agents that use a ReAct-style
 18    approach, where the model produces structured JSON responses that are
 19    parsed and handled iteratively. The ReAct pattern combines reasoning
 20    and acting in a step-by-step manner.
 21    
 22    The base loop handles:
 23    - Step-based reasoning format parsing
 24    - Tool action execution and observation
 25    - Final answer detection
 26    - Custom iteration handlers
 27    - Error handling and fallbacks
 28    
 29    Attributes
 30    ----------
 31    _max_iter : Optional[int]
 32        Maximum number of iterations allowed
 33    """
 34    
 35    def _run_react_loop(self, query: str, agent_type: str, 
 36                        custom_handlers: Optional[Dict[str, Callable]] = None) -> Dict[str, Any]:
 37        """Execute the standard ReAct loop.
 38        
 39        This method implements the core ReAct reasoning pattern where the
 40        agent alternates between thinking, acting, and observing until it
 41        reaches a final answer or hits iteration limits.
 42        
 43        Parameters
 44        ----------
 45        query : str
 46            User query to process
 47        agent_type : str
 48            Type of agent to determine which prompt template to use
 49        custom_handlers : Optional[Dict[str, Callable]], optional
 50            Dictionary of custom handlers for specific iteration types.
 51            Keys are field names in the iteration, values are functions
 52            that handle those iterations.
 53        
 54        Returns
 55        -------
 56        Dict[str, Any]
 57            Agent response containing:
 58            - prompt: Original user query
 59            - iterations: List of processed iterations
 60            - response: Final response (if present)
 61        
 62        Notes
 63        -----
 64        This method automatically handles:
 65        - Final answers (final_answer)
 66        - Tool actions (action)
 67        - Custom cases via custom_handlers
 68        - Default cases for unhandled iterations
 69        - JSON error handling
 70        """
 71        messages = self._get_base_messages(agent_type, query)
 72
 73        current_iter = 0
 74        response = self._create_base_response(query)
 75        
 76        # Handler personalizzati per casi speciali
 77        custom_handlers = custom_handlers or {}
 78
 79        while True:
 80            if self._max_iter is not None and current_iter >= self._max_iter:
 81                break
 82            
 83            if self._stream_callback is None:
 84                resp = self._execute_model_step(messages)
 85            else:
 86                resp = self._execute_model_step_stream(messages)
 87
 88            messages.append(resp)
 89            content = resp["content"]
 90        
 91            self._debug_print(content)
 92
 93            # Update usage for this iteration
 94            self._update_usage(response, resp)
 95
 96            if content is not None:
 97                iteration = self._parse_step_format(content)
 98                if iteration:
 99                    # Add usage information to the iteration
100                    if "usage" in resp:
101                        iteration["usage"] = resp["usage"]
102                    
103                    # Check if human feedback is required for all steps
104                    if self._human_feedback == "all":
105                        if not self._request_human_feedback(iteration.get("step_type", "unknown"), 
106                                                          iteration.get("content", ""), 
107                                                          iteration.get("action")):
108                            print("Execution stopped by user.")
109                            break
110                    
111                    # Gestione risposta finale
112                    if self._handle_final_answer(iteration, response):
113                        break
114                    
115                    # Gestione azioni di tool
116                    if iteration["step_type"]=="action":
117                        self._handle_tool_action(iteration, response, messages)
118                        continue
119                    
120                    # Gestione casi personalizzati
121                    handled = False
122                    for key, handler in custom_handlers.items():
123                        if iteration["step_type"] == key:
124                            handler(iteration, response, messages)
125                            handled = True
126                            break
127                    
128                    if not handled:
129                        self._handle_default(iteration, response, messages)
130                else:
131                    # Se non riesce a parsare, aggiungi come messaggio utente
132                    messages.append({"role": "user", "content": content})
133
134            current_iter += 1
135
136        return response

Base class for all ReAct-style agents.

This class implements the standard loop for agents that use a ReAct-style approach, where the model produces structured JSON responses that are parsed and handled iteratively. The ReAct pattern combines reasoning and acting in a step-by-step manner.

The base loop handles:

  • Step-based reasoning format parsing
  • Tool action execution and observation
  • Final answer detection
  • Custom iteration handlers
  • Error handling and fallbacks
Attributes
  • _max_iter (Optional[int]): Maximum number of iterations allowed
class FunctionCallingAgenticLoop(monoai.agents._AgenticLoop, monoai.agents._FunctionCallingMixin):
 15class FunctionCallingAgenticLoop(_AgenticLoop, _FunctionCallingMixin):
 16    """Agent that uses OpenAI's native function calling.
 17    
 18    This agent implements a loop that leverages OpenAI's native function calling
 19    system, allowing the model to directly call available functions without
 20    manual response parsing. This approach is more reliable and efficient
 21    than text-based tool calling.
 22    
 23    The agent automatically handles:
 24    - Function call detection and execution
 25    - Tool result integration into conversation
 26    - Iteration limits to prevent infinite loops
 27    - Streaming support for real-time responses
 28    
 29    Attributes
 30    ----------
 31    _model : Any
 32        OpenAI model with function calling support
 33    _tools : Dict[str, Any]
 34        Available tools for the agent
 35    """
 36    
 37    def _get_base_messages(self, query: str) -> List[Dict[str, Any]]:
 38        """Generate base messages for the specific agent type.
 39        
 40        This method creates the initial message structure for the agent,
 41        including the appropriate prompt template and user query. The
 42        prompt template is selected based on the agent type and includes
 43        information about available tools.
 44        
 45        Parameters
 46        ----------
 47        query : str
 48            User query to include in the prompt
 49        
 50        Returns
 51        -------
 52        List[Dict[str, Any]]
 53            List of base messages for the agent, including the prompt and query
 54        """
 55        messages = []
 56        if self._agentic_prompt is not None:
 57            if isinstance(self._agentic_prompt, str):
 58                if self._agentic_prompt.endswith(".prompt"):
 59                    agentic_prompt = Prompt(prompt_id=self._agentic_prompt, is_system=True)
 60                else:
 61                    agentic_prompt = Prompt(prompt=self._agentic_prompt, is_system=True)
 62            else:
 63                agentic_prompt = self._agentic_prompt
 64            messages.append(agentic_prompt.as_dict())
 65                    
 66        if isinstance(query, Prompt):
 67            messages.append(query.as_dict())
 68        else:
 69            messages.append({"role": "user", "content": query})
 70
 71        return messages
 72
 73    def start(self, query: str) -> Dict[str, Any]:
 74        """Start the agentic loop using function calling.
 75        
 76        This method processes user queries through OpenAI's function calling
 77        system, automatically executing tools when the model determines
 78        they are needed.
 79        
 80        Parameters
 81        ----------
 82        query : str
 83            User query to process
 84        
 85        Returns
 86        -------
 87        Dict[str, Any]
 88            Agent response containing:
 89            - prompt: Original user query
 90            - iterations: List of tool calls executed
 91            - response: Final model response
 92        """
 93        # Add all tools to the model
 94        self._model._add_tools(list(self._tools.values()))
 95        messages = self._get_base_messages(query)
 96        response = self._create_base_response(query)
 97        current_iter = 0
 98        max_iterations = self._max_iter if self._max_iter is not None else 10  # Limite di sicurezza
 99        
100        while current_iter < max_iterations:
101            
102            if self._stream_callback is None:
103                resp = self._execute_model_step(messages)
104            else:
105                resp = self._execute_model_step_stream(messages)
106
107            messages.append(resp)
108            content = resp["content"]
109            self._debug_print(content)
110            
111            self._update_usage(response, resp)
112            
113            if resp.get("tool_calls"):
114                for tool_call in resp["tool_calls"]:
115                    # Check if human feedback is required for actions
116                    if self._human_feedback == "actions" or self._human_feedback == "all":
117                        action_data = {
118                            "name": tool_call.function.name,
119                            "arguments": json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
120                        }
121                        if not self._request_human_feedback("action", f"Function call: {tool_call.function.name}", action_data):
122                            print("Execution stopped by user.")
123                            return response
124                    
125                    if tool_call.function.name.startswith("mcp_"):
126                        tool_result = self._call_mcp_tool(tool_call)
127                    else:
128                        tool_result = self._call_tool(tool_call)
129                    iteration_data = {
130                        "name": tool_call.function.name,
131                        "arguments": tool_call.function.arguments,
132                        "result": tool_result["content"]
133                    }
134                    
135                    # Add usage information to the iteration
136                    if "usage" in resp:
137                        iteration_data["usage"] = resp["usage"]
138                    
139                    response["iterations"].append(iteration_data)
140                    messages.append(tool_result)
141            else:
142                # Check if human feedback is required for all steps (non-action responses)
143                if self._human_feedback == "all":
144                    if not self._request_human_feedback("response", content):
145                        print("Execution stopped by user.")
146                        return response
147                
148                response["response"] = content
149                break
150            
151            current_iter += 1
152        
153        # Se arriviamo qui, abbiamo raggiunto il limite di iterazioni
154        if self._debug:
155            print(f"Raggiunto limite di iterazioni ({max_iterations})")
156        
157        return response

Agent that uses OpenAI's native function calling.

This agent implements a loop that leverages OpenAI's native function calling system, allowing the model to directly call available functions without manual response parsing. This approach is more reliable and efficient than text-based tool calling.

The agent automatically handles:

  • Function call detection and execution
  • Tool result integration into conversation
  • Iteration limits to prevent infinite loops
  • Streaming support for real-time responses
Attributes
  • _model (Any): OpenAI model with function calling support
  • _tools (Dict[str, Any]): Available tools for the agent
def start(self, query: str) -> Dict[str, Any]:
 73    def start(self, query: str) -> Dict[str, Any]:
 74        """Start the agentic loop using function calling.
 75        
 76        This method processes user queries through OpenAI's function calling
 77        system, automatically executing tools when the model determines
 78        they are needed.
 79        
 80        Parameters
 81        ----------
 82        query : str
 83            User query to process
 84        
 85        Returns
 86        -------
 87        Dict[str, Any]
 88            Agent response containing:
 89            - prompt: Original user query
 90            - iterations: List of tool calls executed
 91            - response: Final model response
 92        """
 93        # Add all tools to the model
 94        self._model._add_tools(list(self._tools.values()))
 95        messages = self._get_base_messages(query)
 96        response = self._create_base_response(query)
 97        current_iter = 0
 98        max_iterations = self._max_iter if self._max_iter is not None else 10  # Limite di sicurezza
 99        
100        while current_iter < max_iterations:
101            
102            if self._stream_callback is None:
103                resp = self._execute_model_step(messages)
104            else:
105                resp = self._execute_model_step_stream(messages)
106
107            messages.append(resp)
108            content = resp["content"]
109            self._debug_print(content)
110            
111            self._update_usage(response, resp)
112            
113            if resp.get("tool_calls"):
114                for tool_call in resp["tool_calls"]:
115                    # Check if human feedback is required for actions
116                    if self._human_feedback == "actions" or self._human_feedback == "all":
117                        action_data = {
118                            "name": tool_call.function.name,
119                            "arguments": json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
120                        }
121                        if not self._request_human_feedback("action", f"Function call: {tool_call.function.name}", action_data):
122                            print("Execution stopped by user.")
123                            return response
124                    
125                    if tool_call.function.name.startswith("mcp_"):
126                        tool_result = self._call_mcp_tool(tool_call)
127                    else:
128                        tool_result = self._call_tool(tool_call)
129                    iteration_data = {
130                        "name": tool_call.function.name,
131                        "arguments": tool_call.function.arguments,
132                        "result": tool_result["content"]
133                    }
134                    
135                    # Add usage information to the iteration
136                    if "usage" in resp:
137                        iteration_data["usage"] = resp["usage"]
138                    
139                    response["iterations"].append(iteration_data)
140                    messages.append(tool_result)
141            else:
142                # Check if human feedback is required for all steps (non-action responses)
143                if self._human_feedback == "all":
144                    if not self._request_human_feedback("response", content):
145                        print("Execution stopped by user.")
146                        return response
147                
148                response["response"] = content
149                break
150            
151            current_iter += 1
152        
153        # Se arriviamo qui, abbiamo raggiunto il limite di iterazioni
154        if self._debug:
155            print(f"Raggiunto limite di iterazioni ({max_iterations})")
156        
157        return response

Start the agentic loop using function calling.

This method processes user queries through OpenAI's function calling system, automatically executing tools when the model determines they are needed.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response containing:
    • prompt: Original user query
    • iterations: List of tool calls executed
    • response: Final model response
class ReactAgenticLoop(monoai.agents._BaseReactLoop):
139class ReactAgenticLoop(_BaseReactLoop):
140    """Standard ReAct agent.
141    
142    This agent implements the standard ReAct pattern, where the model
143    produces JSON responses that are parsed and handled iteratively.
144    The ReAct pattern combines reasoning and acting in a structured way.
145    """
146    
147    def start(self, query: str) -> Dict[str, Any]:
148        """Start the ReAct agentic loop.
149        
150        Parameters
151        ----------
152        query : str
153            User query to process
154        
155        Returns
156        -------
157        Dict[str, Any]
158            Agent response processed through the ReAct loop
159        """
160        return self._run_react_loop(query, "react")

Standard ReAct agent.

This agent implements the standard ReAct pattern, where the model produces JSON responses that are parsed and handled iteratively. The ReAct pattern combines reasoning and acting in a structured way.

def start(self, query: str) -> Dict[str, Any]:
147    def start(self, query: str) -> Dict[str, Any]:
148        """Start the ReAct agentic loop.
149        
150        Parameters
151        ----------
152        query : str
153            User query to process
154        
155        Returns
156        -------
157        Dict[str, Any]
158            Agent response processed through the ReAct loop
159        """
160        return self._run_react_loop(query, "react")

Start the ReAct agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response processed through the ReAct loop
class ReactWithFCAgenticLoop(monoai.agents._AgenticLoop, monoai.agents._FunctionCallingMixin):
163class ReactWithFCAgenticLoop(_AgenticLoop, _FunctionCallingMixin):
164    """Agent that combines ReAct and Function Calling.
165    
166    This agent combines the ReAct approach with OpenAI's native function
167    calling, allowing for hybrid tool call management. This provides
168    the flexibility of ReAct reasoning with the reliability of function calling.
169    """
170    
171    def start(self, query: str) -> Dict[str, Any]:
172        """Start the hybrid ReAct + Function Calling agentic loop.
173        
174        Parameters
175        ----------
176        query : str
177            User query to process
178        
179        Returns
180        -------
181        Dict[str, Any]
182            Agent response (to be implemented)
183        
184        Notes
185        -----
186        TODO: Implement combination of ReAct and Function Calling
187        """
188        # TODO: Implement combination of ReAct and Function Calling
189        pass

Agent that combines ReAct and Function Calling.

This agent combines the ReAct approach with OpenAI's native function calling, allowing for hybrid tool call management. This provides the flexibility of ReAct reasoning with the reliability of function calling.

def start(self, query: str) -> Dict[str, Any]:
171    def start(self, query: str) -> Dict[str, Any]:
172        """Start the hybrid ReAct + Function Calling agentic loop.
173        
174        Parameters
175        ----------
176        query : str
177            User query to process
178        
179        Returns
180        -------
181        Dict[str, Any]
182            Agent response (to be implemented)
183        
184        Notes
185        -----
186        TODO: Implement combination of ReAct and Function Calling
187        """
188        # TODO: Implement combination of ReAct and Function Calling
189        pass

Start the hybrid ReAct + Function Calling agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response (to be implemented)
Notes

TODO: Implement combination of ReAct and Function Calling

class ProgrammaticAgenticLoop(monoai.agents._BaseReactLoop):
15class ProgrammaticAgenticLoop(_BaseReactLoop):
16    """Programmatic agent.
17    
18    This agent implements a programmatic approach where the model
19    produces code or structured instructions that are executed.
20    It's designed for tasks that require code generation and execution.
21    """
22    
23    def start(self, query: str) -> Dict[str, Any]:
24        """Start the programmatic agentic loop.
25        
26        Parameters
27        ----------
28        query : str
29            User query to process
30        
31        Returns
32        -------
33        Dict[str, Any]
34            Agent response processed through the programmatic loop
35        """
36        return self._run_react_loop(query, "programmatic")

Programmatic agent.

This agent implements a programmatic approach where the model produces code or structured instructions that are executed. It's designed for tasks that require code generation and execution.

def start(self, query: str) -> Dict[str, Any]:
23    def start(self, query: str) -> Dict[str, Any]:
24        """Start the programmatic agentic loop.
25        
26        Parameters
27        ----------
28        query : str
29            User query to process
30        
31        Returns
32        -------
33        Dict[str, Any]
34            Agent response processed through the programmatic loop
35        """
36        return self._run_react_loop(query, "programmatic")

Start the programmatic agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response processed through the programmatic loop
class PlanAndExecuteAgenticLoop(monoai.agents._BaseReactLoop):
39class PlanAndExecuteAgenticLoop(_BaseReactLoop):
40    """Plan-and-execute agent.
41    
42    This agent implements the plan-and-execute pattern, where the model
43    first plans the actions and then executes them sequentially.
44    This approach is useful for complex tasks that require careful planning.
45    """
46    
47    def start(self, query: str) -> Dict[str, Any]:
48        """Start the plan-and-execute agentic loop.
49        
50        Parameters
51        ----------
52        query : str
53            User query to process
54        
55        Returns
56        -------
57        Dict[str, Any]
58            Agent response processed through the plan-and-execute loop
59        """
60        return self._run_react_loop(query, "plan_and_execute")

Plan-and-execute agent.

This agent implements the plan-and-execute pattern, where the model first plans the actions and then executes them sequentially. This approach is useful for complex tasks that require careful planning.

def start(self, query: str) -> Dict[str, Any]:
47    def start(self, query: str) -> Dict[str, Any]:
48        """Start the plan-and-execute agentic loop.
49        
50        Parameters
51        ----------
52        query : str
53            User query to process
54        
55        Returns
56        -------
57        Dict[str, Any]
58            Agent response processed through the plan-and-execute loop
59        """
60        return self._run_react_loop(query, "plan_and_execute")

Start the plan-and-execute agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response processed through the plan-and-execute loop
class ReflexionAgenticLoop(monoai.agents._BaseReactLoop):
63class ReflexionAgenticLoop(_BaseReactLoop):
64    """Agent with reflection capabilities.
65    
66    This agent implements the reflexion pattern, where the model
67    reflects on its own actions and decisions to improve performance.
68    This self-reflective approach helps the agent learn from mistakes
69    and improve its reasoning over time.
70    """
71    
72    def start(self, query: str) -> Dict[str, Any]:
73        """Start the reflexion agentic loop.
74        
75        Parameters
76        ----------
77        query : str
78            User query to process
79        
80        Returns
81        -------
82        Dict[str, Any]
83            Agent response processed through the reflexion loop
84        """
85        return self._run_react_loop(query, "reflexion")

Agent with reflection capabilities.

This agent implements the reflexion pattern, where the model reflects on its own actions and decisions to improve performance. This self-reflective approach helps the agent learn from mistakes and improve its reasoning over time.

def start(self, query: str) -> Dict[str, Any]:
72    def start(self, query: str) -> Dict[str, Any]:
73        """Start the reflexion agentic loop.
74        
75        Parameters
76        ----------
77        query : str
78            User query to process
79        
80        Returns
81        -------
82        Dict[str, Any]
83            Agent response processed through the reflexion loop
84        """
85        return self._run_react_loop(query, "reflexion")

Start the reflexion agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response processed through the reflexion loop
class SelfAskAgenticLoop(monoai.agents._BaseReactLoop):
 88class SelfAskAgenticLoop(_BaseReactLoop):
 89    """Self-ask agent.
 90    
 91    This agent implements the self-ask pattern, where the model
 92    asks itself questions to guide the reasoning process.
 93    This approach helps break down complex problems into smaller,
 94    more manageable questions.
 95    """
 96    
 97    def start(self, query: str) -> Dict[str, Any]:
 98        """Start the self-ask agentic loop.
 99        
100        Parameters
101        ----------
102        query : str
103            User query to process
104        
105        Returns
106        -------
107        Dict[str, Any]
108            Agent response processed through the self-ask loop
109        """
110        return self._run_react_loop(query, "self_ask")

Self-ask agent.

This agent implements the self-ask pattern, where the model asks itself questions to guide the reasoning process. This approach helps break down complex problems into smaller, more manageable questions.

def start(self, query: str) -> Dict[str, Any]:
 97    def start(self, query: str) -> Dict[str, Any]:
 98        """Start the self-ask agentic loop.
 99        
100        Parameters
101        ----------
102        query : str
103            User query to process
104        
105        Returns
106        -------
107        Dict[str, Any]
108            Agent response processed through the self-ask loop
109        """
110        return self._run_react_loop(query, "self_ask")

Start the self-ask agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response processed through the self-ask loop
class SelfAskWithSearchLoop(monoai.agents._BaseReactLoop):
113class SelfAskWithSearchLoop(_BaseReactLoop):
114    """Self-ask agent with web search capabilities.
115    
116    This agent extends the self-ask pattern with the ability to
117    perform web searches to obtain additional information.
118    It's particularly useful for questions that require current
119    or factual information not available in the model's training data.
120    
121    Attributes
122    ----------
123    _handle_search_query : callable
124        Method for handling web search queries
125    """
126    
127    def _handle_search_query(self, iteration: Dict[str, Any], response: Dict[str, Any], messages: List[Dict[str, Any]]) -> None:
128        """Handle web search queries.
129        
130        This method processes search queries by executing web searches
131        using the Tavily search engine and integrating the results
132        into the conversation flow.
133        
134        Parameters
135        ----------
136        iteration : Dict[str, Any]
137            Current iteration containing the search query
138        response : Dict[str, Any]
139            Response dictionary to update
140        messages : List[Dict[str, Any]]
141            Message list to update with search results
142        
143        Notes
144        -----
145        This method modifies the response and messages objects directly.
146        Uses the Tavily search engine for web searches.
147        """
148        from ...tools.websearch import search_web
149        
150        query = iteration["search_query"]
151        result = search_web(query, engine="tavily")["text"]
152        iteration["search_result"] = result
153        
154        msg = json.dumps({"query_results": result})
155        messages.append({"role": "user", "content": msg})
156        response["iterations"].append(iteration)
157    
158    def start(self, query: str) -> Dict[str, Any]:
159        """Start the self-ask with search agentic loop.
160        
161        Parameters
162        ----------
163        query : str
164            User query to process
165        
166        Returns
167        -------
168        Dict[str, Any]
169            Agent response processed through the self-ask with search loop
170        
171        Notes
172        -----
173        This agent uses a custom handler for web search queries, allowing
174        the model to obtain up-to-date information during the process.
175        """
176        custom_handlers = {"search_query": self._handle_search_query}
177        return self._run_react_loop(query, "self_ask_with_search", custom_handlers)

Self-ask agent with web search capabilities.

This agent extends the self-ask pattern with the ability to perform web searches to obtain additional information. It's particularly useful for questions that require current or factual information not available in the model's training data.

Attributes
  • _handle_search_query (callable): Method for handling web search queries
def start(self, query: str) -> Dict[str, Any]:
158    def start(self, query: str) -> Dict[str, Any]:
159        """Start the self-ask with search agentic loop.
160        
161        Parameters
162        ----------
163        query : str
164            User query to process
165        
166        Returns
167        -------
168        Dict[str, Any]
169            Agent response processed through the self-ask with search loop
170        
171        Notes
172        -----
173        This agent uses a custom handler for web search queries, allowing
174        the model to obtain up-to-date information during the process.
175        """
176        custom_handlers = {"search_query": self._handle_search_query}
177        return self._run_react_loop(query, "self_ask_with_search", custom_handlers)

Start the self-ask with search agentic loop.

Parameters
  • query (str): User query to process
Returns
  • Dict[str, Any]: Agent response processed through the self-ask with search loop
Notes

This agent uses a custom handler for web search queries, allowing the model to obtain up-to-date information during the process.

class Agent:
 18class Agent:
 19    """
 20    AI Agent that implements different reasoning paradigms for autonomous task execution.
 21    
 22    The Agent class provides a unified interface for creating and using AI agents
 23    with various reasoning paradigms. It acts as a high-level wrapper around
 24    specific agentic loop implementations, offering a consistent API regardless
 25    of the underlying reasoning pattern.
 26    
 27    The agent can be configured with different reasoning approaches, tools,
 28    and execution parameters to handle complex tasks autonomously or with
 29    human oversight.
 30    
 31    Parameters
 32    ----------
 33    model : Model
 34        AI model instance to use for agent execution
 35    tools : list, optional
 36        List of available tools for the agent. Each tool should be a
 37        callable function. Default is None.
 38    paradigm : str or _AgenticLoop, optional
 39        Reasoning paradigm to use. Default is "function_calling".
 40    agent_prompt : str, optional
 41        Custom prompt for the agent. If None, uses the default prompt
 42        for the chosen paradigm. Default is None.
 43    name : str, optional
 44        Name identifier for the agent. Default is empty string.
 45    mcp_servers : list, optional
 46        List of MCP (Model Context Protocol) servers to register.
 47        Default is None.
 48    debug : bool, optional
 49        Flag to enable debug output during execution. Default is False.
 50    max_iter : int, optional
 51        Maximum number of iterations allowed for the agent.
 52        If None, there are no limits. Default is None.
 53    native_web_search : str, optional
 54        Native web search capability level. Must be one of:
 55        "low", "medium", or "high". Default is None.
 56    human_feedback : str, optional
 57        Human feedback mode for controlling agent execution. Can be:
 58        - None: No human feedback required (default)
 59        - "actions": Pause and request confirmation before executing tool actions
 60        - "all": Pause after every step for human review
 61        Default is None.
 62    
 63    Attributes
 64    ----------
 65    name : str
 66        Name identifier for the agent
 67    _model : Model
 68        The AI model instance used for execution
 69    _loop : _AgenticLoop
 70        The agentic loop implementation for the chosen paradigm
 71        
 72    Supported Paradigms
 73    -------------------
 74    - **function_calling**: Native OpenAI function calling with tool integration
 75    - **react**: Reasoning and Acting pattern with iterative problem solving
 76    - **react_with_function_calling**: Combines ReAct reasoning with function calling
 77    - **plan-and-execute**: Two-phase approach: planning then execution
 78    - **programmatic**: Code generation and execution for computational tasks
 79    - **reflexion**: Self-reflective reasoning with error correction
 80    - **self_ask**: Self-questioning approach for complex reasoning
 81    - **self_ask_with_search**: Self-ask with web search capabilities
 82    
 83    Examples
 84    --------
 85    Basic function calling agent:
 86    >>> from monoai.models import Model
 87    >>> from monoai.agents import Agent
 88    >>> 
 89    >>> model = Model(provider="openai", model="gpt-4")
 90    >>> agent = Agent(model=model, paradigm="function_calling")
 91    >>> response = agent.run("What's the weather like today?")
 92    
 93    ReAct agent with custom tools:
 94    >>> def calculator(expression: str) -> str:
 95    ...     return str(eval(expression))
 96    >>> 
 97    >>> agent = Agent(
 98    ...     model=model,
 99    ...     paradigm="react",
100    ...     tools=[calculator],
101    ...     name="MathAgent"
102    ... )
103    >>> response = agent.run("Calculate 15 * 23 + 7")
104    
105    Agent with human feedback:
106    >>> agent = Agent(
107    ...     model=model,
108    ...     paradigm="plan-and-execute",
109    ...     human_feedback="actions",
110    ...     debug=True
111    ... )
112    >>> response = agent.run("Plan and execute a data analysis task")
113    
114    Agent with MCP servers:
115    >>> mcp_server = McpServer("coingecko", "http", server_url="https://mcp.api.coingecko.com/sse")
116    >>> agent = Agent(
117    ...     model=model,
118    ...     paradigm="function_calling",
119    ...     mcp_servers=[mcp_server]
120    ... )
121    >>> response = agent.run("What's the price of Bitcoin?")
122
123    Custom paradigm agent:
124    >>> class CustomLoop(_AgenticLoop):
125    ...     def start(self, prompt):
126    ...         # Custom implementation
127    ...         return {"response": "Custom result"}
128    >>> 
129    >>> custom_loop = CustomLoop()
130    >>> agent = Agent(model=model, paradigm=custom_loop)
131    >>> response = agent.run("Test custom paradigm")
132    """
133    
134    def __init__(self, model: Model, tools=None, paradigm="function_calling", 
135                 agent_prompt=None, name="", mcp_servers=None, debug=False, max_iter=None, native_web_search=None, 
136                 human_feedback=None):
137        """
138        Initialize the agent with the specified model and configuration.
139        
140        Sets up an AI agent with the chosen reasoning paradigm, registers
141        any provided tools, and configures execution parameters.
142        
143        Raises
144        ------
145        ValueError
146            If the specified paradigm is not supported or invalid
147        TypeError
148            If a custom object is passed that doesn't derive from _AgenticLoop
149        """
150        self._model = model
151        self.name = name
152
153        if native_web_search is not None and native_web_search not in ["low", "medium", "high"]:
154            raise ValueError("native_web_search must be 'low', 'medium' or 'high'")
155        
156        if human_feedback is not None and human_feedback not in ["actions", "all"]:
157            raise ValueError("human_feedback must be None, 'actions', or 'all'")
158        
159        self._model._web_search = native_web_search
160        self._human_feedback = human_feedback
161
162        # Handle custom paradigm
163        if isinstance(paradigm, _AgenticLoop):
164            # Verify that the custom object is valid
165            if not hasattr(paradigm, 'start') or not callable(paradigm.start):
166                raise TypeError("Custom paradigm must have a callable 'start' method")
167            self._loop = paradigm
168        else:
169            # Predefined paradigms
170            loop_kwargs = self._model, agent_prompt, debug, max_iter, None, human_feedback
171            
172            if paradigm == "function_calling":
173                self._loop = FunctionCallingAgenticLoop(*loop_kwargs)
174            elif paradigm == "react":
175                self._loop = ReactAgenticLoop(*loop_kwargs)
176            elif paradigm == "react_with_function_calling":
177                self._loop = ReactWithFCAgenticLoop(*loop_kwargs)
178            elif paradigm == "plan-and-execute":
179                self._loop = PlanAndExecuteAgenticLoop(*loop_kwargs)
180            elif paradigm == "programmatic":
181                self._loop = ProgrammaticAgenticLoop(*loop_kwargs)
182            elif paradigm == "reflexion":
183                self._loop = ReflexionAgenticLoop(*loop_kwargs)
184            elif paradigm == "self_ask":
185                self._loop = SelfAskAgenticLoop(*loop_kwargs)
186            elif paradigm == "self_ask_with_search":
187                self._loop = SelfAskWithSearchLoop(*loop_kwargs)
188            else:
189                raise ValueError(f"Paradigm '{paradigm}' not supported. "
190                               f"Available paradigms: function_calling, react, "
191                               f"react_with_function_calling, plan-and-execute, "
192                               f"programmatic, reflexion, self_ask, self_ask_with_search, "
193                               f"or a custom object derived from _AgenticLoop")
194        
195        if tools is not None:
196            self._loop.register_tools(tools)
197        
198        if mcp_servers is not None:
199            self._loop.register_mcp_servers(mcp_servers)
200        
201    def run(self, prompt: str | Prompt):
202        """
203        Execute the agent with the specified prompt.
204        
205        Processes a user prompt through the agent's reasoning paradigm,
206        returning a structured response that includes the reasoning process
207        and final answer.
208        
209        Parameters
210        ----------
211        prompt : str or Prompt
212            User prompt or query to process. Can be a string or a Prompt object.
213        
214        Returns
215        -------
216        Dict[str, Any]
217            Agent response containing:
218            - prompt: Original user query
219            - iterations: List of processed reasoning iterations
220            - response: Final response (if available)
221            - metadata: Additional execution metadata (paradigm-specific)
222        
223        Examples
224        --------
225        >>> agent = Agent(model=model, paradigm="function_calling")
226        >>> response = agent.run("What's the weather in New York?")
227        >>> print(response['response'])
228        
229        >>> from monoai.prompts import Prompt
230        >>> prompt = Prompt("Analyze this data: [1, 2, 3, 4, 5]")
231        >>> response = agent.run(prompt)
232        >>> print(response['iterations'])
233        
234        Notes
235        -----
236        The response structure varies by paradigm:
237        - **Function calling**: Includes tool call details and function results
238        - **ReAct**: Includes thought-action-observation cycles
239        - **Plan-and-execute**: Includes planning and execution phases
240        - **Programmatic**: Includes code generation and execution results
241        - **Reflexion**: Includes self-reflection and error correction steps
242        - **Self-ask**: Includes question-answer reasoning chains
243        - **Self-ask with search**: Includes web search results and reasoning
244        - **Custom paradigms**: May include paradigm-specific information
245        """
246        
247        return self._loop.start(prompt)
248    
249    def enable_streaming(self, stream_callback=None):
250        """
251        Enable streaming responses for this agent.
252        
253        When streaming is enabled, the agent will call the provided callback
254        function with each content chunk as it's generated, allowing for
255        real-time processing and display of the AI's response.
256        
257        Parameters
258        ----------
259        stream_callback : callable, optional
260            Callback function to handle streaming content chunks.
261            If None, uses a default callback that prints content to console.
262            The callback receives plain text content strings.
263            
264            Callback signature: callback(content: str) -> None
265            where content is the streaming text chunk.
266        
267        Raises
268        ------
269        AttributeError
270            If the current paradigm doesn't support streaming
271            
272        Examples
273        --------
274        >>> def my_callback(content):
275        ...     print(f"Streaming: {content}")
276        >>> 
277        >>> agent = Agent(model=model, paradigm="function_calling")
278        >>> agent.enable_streaming(my_callback)
279        >>> response = agent.run("Tell me a story")
280        
281        Notes
282        -----
283        Not all paradigms support streaming. Check the specific paradigm
284        implementation to ensure streaming is available.
285        """
286        if hasattr(self._loop, 'enable_streaming'):
287            self._loop.enable_streaming(stream_callback)
288        else:
289            raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")
290    
291    def disable_streaming(self):
292        """
293        Disable streaming responses for this agent.
294        
295        After calling this method, the agent will use standard (non-streaming)
296        model execution for all subsequent requests.
297        
298        Raises
299        ------
300        AttributeError
301            If the current paradigm doesn't support streaming
302            
303        Examples
304        --------
305        >>> agent = Agent(model=model, paradigm="function_calling")
306        >>> agent.enable_streaming()
307        >>> # ... use streaming ...
308        >>> agent.disable_streaming()
309        >>> # Now using standard execution
310        
311        Notes
312        -----
313        Not all paradigms support streaming. Check the specific paradigm
314        implementation to ensure streaming is available.
315        """
316        if hasattr(self._loop, 'disable_streaming'):
317            self._loop.disable_streaming()
318        else:
319            raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")

AI Agent that implements different reasoning paradigms for autonomous task execution.

The Agent class provides a unified interface for creating and using AI agents with various reasoning paradigms. It acts as a high-level wrapper around specific agentic loop implementations, offering a consistent API regardless of the underlying reasoning pattern.

The agent can be configured with different reasoning approaches, tools, and execution parameters to handle complex tasks autonomously or with human oversight.

Parameters
  • model (Model): AI model instance to use for agent execution
  • tools (list, optional): List of available tools for the agent. Each tool should be a callable function. Default is None.
  • paradigm (str or _AgenticLoop, optional): Reasoning paradigm to use. Default is "function_calling".
  • agent_prompt (str, optional): Custom prompt for the agent. If None, uses the default prompt for the chosen paradigm. Default is None.
  • name (str, optional): Name identifier for the agent. Default is empty string.
  • mcp_servers (list, optional): List of MCP (Model Context Protocol) servers to register. Default is None.
  • debug (bool, optional): Flag to enable debug output during execution. Default is False.
  • max_iter (int, optional): Maximum number of iterations allowed for the agent. If None, there are no limits. Default is None.
  • native_web_search (str, optional): Native web search capability level. Must be one of: "low", "medium", or "high". Default is None.
  • human_feedback (str, optional): Human feedback mode for controlling agent execution. Can be:
    • None: No human feedback required (default)
    • "actions": Pause and request confirmation before executing tool actions
    • "all": Pause after every step for human review Default is None.
Attributes
  • name (str): Name identifier for the agent
  • _model (Model): The AI model instance used for execution
  • _loop (_AgenticLoop): The agentic loop implementation for the chosen paradigm
Supported Paradigms
  • function_calling: Native OpenAI function calling with tool integration
  • react: Reasoning and Acting pattern with iterative problem solving
  • react_with_function_calling: Combines ReAct reasoning with function calling
  • plan-and-execute: Two-phase approach: planning then execution
  • programmatic: Code generation and execution for computational tasks
  • reflexion: Self-reflective reasoning with error correction
  • self_ask: Self-questioning approach for complex reasoning
  • self_ask_with_search: Self-ask with web search capabilities
Examples

Basic function calling agent:

>>> from monoai.models import Model
>>> from monoai.agents import Agent
>>> 
>>> model = Model(provider="openai", model="gpt-4")
>>> agent = Agent(model=model, paradigm="function_calling")
>>> response = agent.run("What's the weather like today?")

ReAct agent with custom tools:

>>> def calculator(expression: str) -> str:
...     return str(eval(expression))
>>> 
>>> agent = Agent(
...     model=model,
...     paradigm="react",
...     tools=[calculator],
...     name="MathAgent"
... )
>>> response = agent.run("Calculate 15 * 23 + 7")

Agent with human feedback:

>>> agent = Agent(
...     model=model,
...     paradigm="plan-and-execute",
...     human_feedback="actions",
...     debug=True
... )
>>> response = agent.run("Plan and execute a data analysis task")

Agent with MCP servers:

>>> mcp_server = McpServer("coingecko", "http", server_url="https://mcp.api.coingecko.com/sse")
>>> agent = Agent(
...     model=model,
...     paradigm="function_calling",
...     mcp_servers=[mcp_server]
... )
>>> response = agent.run("What's the price of Bitcoin?")

Custom paradigm agent:

>>> class CustomLoop(_AgenticLoop):
...     def start(self, prompt):
...         # Custom implementation
...         return {"response": "Custom result"}
>>> 
>>> custom_loop = CustomLoop()
>>> agent = Agent(model=model, paradigm=custom_loop)
>>> response = agent.run("Test custom paradigm")
Agent( model: monoai.models.Model, tools=None, paradigm='function_calling', agent_prompt=None, name='', mcp_servers=None, debug=False, max_iter=None, native_web_search=None, human_feedback=None)
134    def __init__(self, model: Model, tools=None, paradigm="function_calling", 
135                 agent_prompt=None, name="", mcp_servers=None, debug=False, max_iter=None, native_web_search=None, 
136                 human_feedback=None):
137        """
138        Initialize the agent with the specified model and configuration.
139        
140        Sets up an AI agent with the chosen reasoning paradigm, registers
141        any provided tools, and configures execution parameters.
142        
143        Raises
144        ------
145        ValueError
146            If the specified paradigm is not supported or invalid
147        TypeError
148            If a custom object is passed that doesn't derive from _AgenticLoop
149        """
150        self._model = model
151        self.name = name
152
153        if native_web_search is not None and native_web_search not in ["low", "medium", "high"]:
154            raise ValueError("native_web_search must be 'low', 'medium' or 'high'")
155        
156        if human_feedback is not None and human_feedback not in ["actions", "all"]:
157            raise ValueError("human_feedback must be None, 'actions', or 'all'")
158        
159        self._model._web_search = native_web_search
160        self._human_feedback = human_feedback
161
162        # Handle custom paradigm
163        if isinstance(paradigm, _AgenticLoop):
164            # Verify that the custom object is valid
165            if not hasattr(paradigm, 'start') or not callable(paradigm.start):
166                raise TypeError("Custom paradigm must have a callable 'start' method")
167            self._loop = paradigm
168        else:
169            # Predefined paradigms
170            loop_kwargs = self._model, agent_prompt, debug, max_iter, None, human_feedback
171            
172            if paradigm == "function_calling":
173                self._loop = FunctionCallingAgenticLoop(*loop_kwargs)
174            elif paradigm == "react":
175                self._loop = ReactAgenticLoop(*loop_kwargs)
176            elif paradigm == "react_with_function_calling":
177                self._loop = ReactWithFCAgenticLoop(*loop_kwargs)
178            elif paradigm == "plan-and-execute":
179                self._loop = PlanAndExecuteAgenticLoop(*loop_kwargs)
180            elif paradigm == "programmatic":
181                self._loop = ProgrammaticAgenticLoop(*loop_kwargs)
182            elif paradigm == "reflexion":
183                self._loop = ReflexionAgenticLoop(*loop_kwargs)
184            elif paradigm == "self_ask":
185                self._loop = SelfAskAgenticLoop(*loop_kwargs)
186            elif paradigm == "self_ask_with_search":
187                self._loop = SelfAskWithSearchLoop(*loop_kwargs)
188            else:
189                raise ValueError(f"Paradigm '{paradigm}' not supported. "
190                               f"Available paradigms: function_calling, react, "
191                               f"react_with_function_calling, plan-and-execute, "
192                               f"programmatic, reflexion, self_ask, self_ask_with_search, "
193                               f"or a custom object derived from _AgenticLoop")
194        
195        if tools is not None:
196            self._loop.register_tools(tools)
197        
198        if mcp_servers is not None:
199            self._loop.register_mcp_servers(mcp_servers)

Initialize the agent with the specified model and configuration.

Sets up an AI agent with the chosen reasoning paradigm, registers any provided tools, and configures execution parameters.

Raises
  • ValueError: If the specified paradigm is not supported or invalid
  • TypeError: If a custom object is passed that doesn't derive from _AgenticLoop
name
def run(self, prompt: str | monoai.prompts.Prompt):
201    def run(self, prompt: str | Prompt):
202        """
203        Execute the agent with the specified prompt.
204        
205        Processes a user prompt through the agent's reasoning paradigm,
206        returning a structured response that includes the reasoning process
207        and final answer.
208        
209        Parameters
210        ----------
211        prompt : str or Prompt
212            User prompt or query to process. Can be a string or a Prompt object.
213        
214        Returns
215        -------
216        Dict[str, Any]
217            Agent response containing:
218            - prompt: Original user query
219            - iterations: List of processed reasoning iterations
220            - response: Final response (if available)
221            - metadata: Additional execution metadata (paradigm-specific)
222        
223        Examples
224        --------
225        >>> agent = Agent(model=model, paradigm="function_calling")
226        >>> response = agent.run("What's the weather in New York?")
227        >>> print(response['response'])
228        
229        >>> from monoai.prompts import Prompt
230        >>> prompt = Prompt("Analyze this data: [1, 2, 3, 4, 5]")
231        >>> response = agent.run(prompt)
232        >>> print(response['iterations'])
233        
234        Notes
235        -----
236        The response structure varies by paradigm:
237        - **Function calling**: Includes tool call details and function results
238        - **ReAct**: Includes thought-action-observation cycles
239        - **Plan-and-execute**: Includes planning and execution phases
240        - **Programmatic**: Includes code generation and execution results
241        - **Reflexion**: Includes self-reflection and error correction steps
242        - **Self-ask**: Includes question-answer reasoning chains
243        - **Self-ask with search**: Includes web search results and reasoning
244        - **Custom paradigms**: May include paradigm-specific information
245        """
246        
247        return self._loop.start(prompt)

Execute the agent with the specified prompt.

Processes a user prompt through the agent's reasoning paradigm, returning a structured response that includes the reasoning process and final answer.

Parameters
  • prompt (str or Prompt): User prompt or query to process. Can be a string or a Prompt object.
Returns
  • Dict[str, Any]: Agent response containing:
    • prompt: Original user query
    • iterations: List of processed reasoning iterations
    • response: Final response (if available)
    • metadata: Additional execution metadata (paradigm-specific)
Examples
>>> agent = Agent(model=model, paradigm="function_calling")
>>> response = agent.run("What's the weather in New York?")
>>> print(response['response'])
>>> from monoai.prompts import Prompt
>>> prompt = Prompt("Analyze this data: [1, 2, 3, 4, 5]")
>>> response = agent.run(prompt)
>>> print(response['iterations'])
Notes

The response structure varies by paradigm:

  • Function calling: Includes tool call details and function results
  • ReAct: Includes thought-action-observation cycles
  • Plan-and-execute: Includes planning and execution phases
  • Programmatic: Includes code generation and execution results
  • Reflexion: Includes self-reflection and error correction steps
  • Self-ask: Includes question-answer reasoning chains
  • Self-ask with search: Includes web search results and reasoning
  • Custom paradigms: May include paradigm-specific information
def enable_streaming(self, stream_callback=None):
249    def enable_streaming(self, stream_callback=None):
250        """
251        Enable streaming responses for this agent.
252        
253        When streaming is enabled, the agent will call the provided callback
254        function with each content chunk as it's generated, allowing for
255        real-time processing and display of the AI's response.
256        
257        Parameters
258        ----------
259        stream_callback : callable, optional
260            Callback function to handle streaming content chunks.
261            If None, uses a default callback that prints content to console.
262            The callback receives plain text content strings.
263            
264            Callback signature: callback(content: str) -> None
265            where content is the streaming text chunk.
266        
267        Raises
268        ------
269        AttributeError
270            If the current paradigm doesn't support streaming
271            
272        Examples
273        --------
274        >>> def my_callback(content):
275        ...     print(f"Streaming: {content}")
276        >>> 
277        >>> agent = Agent(model=model, paradigm="function_calling")
278        >>> agent.enable_streaming(my_callback)
279        >>> response = agent.run("Tell me a story")
280        
281        Notes
282        -----
283        Not all paradigms support streaming. Check the specific paradigm
284        implementation to ensure streaming is available.
285        """
286        if hasattr(self._loop, 'enable_streaming'):
287            self._loop.enable_streaming(stream_callback)
288        else:
289            raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")

Enable streaming responses for this agent.

When streaming is enabled, the agent will call the provided callback function with each content chunk as it's generated, allowing for real-time processing and display of the AI's response.

Parameters
  • stream_callback (callable, optional): Callback function to handle streaming content chunks. If None, uses a default callback that prints content to console. The callback receives plain text content strings.

    Callback signature: callback(content: str) -> None where content is the streaming text chunk.

Raises
  • AttributeError: If the current paradigm doesn't support streaming
Examples
>>> def my_callback(content):
...     print(f"Streaming: {content}")
>>> 
>>> agent = Agent(model=model, paradigm="function_calling")
>>> agent.enable_streaming(my_callback)
>>> response = agent.run("Tell me a story")
Notes

Not all paradigms support streaming. Check the specific paradigm implementation to ensure streaming is available.

def disable_streaming(self):
291    def disable_streaming(self):
292        """
293        Disable streaming responses for this agent.
294        
295        After calling this method, the agent will use standard (non-streaming)
296        model execution for all subsequent requests.
297        
298        Raises
299        ------
300        AttributeError
301            If the current paradigm doesn't support streaming
302            
303        Examples
304        --------
305        >>> agent = Agent(model=model, paradigm="function_calling")
306        >>> agent.enable_streaming()
307        >>> # ... use streaming ...
308        >>> agent.disable_streaming()
309        >>> # Now using standard execution
310        
311        Notes
312        -----
313        Not all paradigms support streaming. Check the specific paradigm
314        implementation to ensure streaming is available.
315        """
316        if hasattr(self._loop, 'disable_streaming'):
317            self._loop.disable_streaming()
318        else:
319            raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")

Disable streaming responses for this agent.

After calling this method, the agent will use standard (non-streaming) model execution for all subsequent requests.

Raises
  • AttributeError: If the current paradigm doesn't support streaming
Examples
>>> agent = Agent(model=model, paradigm="function_calling")
>>> agent.enable_streaming()
>>> # ... use streaming ...
>>> agent.disable_streaming()
>>> # Now using standard execution
Notes

Not all paradigms support streaming. Check the specific paradigm implementation to ensure streaming is available.