monoai.agents
MonoAI Agents Package
This package provides a comprehensive set of agentic loop implementations that support real-time streaming responses during the entire generation phase. The agents are designed to work with various AI paradigms and can process user queries through iterative reasoning, tool usage, and structured output generation.
Key Features:
- Real-time streaming of AI responses
- Multiple agent paradigms (React, Function Calling, Plan-and-Execute, etc.)
- Step-based reasoning format for transparent decision making
- Tool integration and execution
- Configurable iteration limits and debugging
- Asynchronous streaming with proper resource management
Agent Paradigms: The agents use a structured step-based format for reasoning:
- Thought N:
- Action N: {"name": "tool_name", "arguments": {...}}
- Observation N:
- Final answer:
Streaming Architecture: Streaming allows real-time processing of AI responses as they are generated, enabling:
- Immediate feedback to users
- Progress monitoring during long operations
- Better user experience with responsive interfaces
- Debugging and monitoring of agent reasoning
Usage Examples:
Basic React Agent with Streaming:
from monoai.agents import Agent agent = Agent(model, paradigm="react") agent.enable_streaming() # Uses default console output result = agent.run("What is the capital of France?")
Custom Streaming Handler:
from monoai.agents import Agent import json def custom_stream_handler(content): # Process streaming content in real-time print(f"Streaming: {content}", end='', flush=True) agent = Agent(model, paradigm="react") agent.enable_streaming(custom_stream_handler) result = agent.run("Your query here")
Function Calling Agent:
from monoai.agents import Agent from monoai.tools import search_web agent = Agent(model, paradigm="function_calling") agent.register_tools([search_web]) agent.enable_streaming() result = agent.run("Search for recent AI news")
Plan and Execute Agent:
agent = Agent(model, paradigm="plan-and-execute") agent.enable_streaming() result = agent.run("Create a detailed project plan")
Available Agent Types:
- FunctionCallingAgenticLoop: Native OpenAI function calling
- ReactAgenticLoop: ReAct reasoning pattern
- ReactWithFCAgenticLoop: Hybrid ReAct + Function Calling
- ProgrammaticAgenticLoop: Code generation and execution
- PlanAndExecuteAgenticLoop: Planning then execution pattern
- ReflexionAgenticLoop: Self-reflection and improvement
- SelfAskAgenticLoop: Self-questioning approach
- SelfAskWithSearchLoop: Self-ask with web search capabilities
Streaming Callback Format: The streaming callback receives content as plain text strings. For advanced use cases, you can access the raw streaming data through the model's streaming methods.
Error Handling:
- Automatic fallback to non-streaming mode on errors
- Proper cleanup of async resources
- Configurable debug output
- Iteration limits to prevent infinite loops
Thread Safety: This implementation is designed for single-threaded use. For concurrent access, create separate agent instances for each thread or process.
1""" 2MonoAI Agents Package 3 4This package provides a comprehensive set of agentic loop implementations 5that support real-time streaming responses during the entire generation phase. 6The agents are designed to work with various AI paradigms and can process 7user queries through iterative reasoning, tool usage, and structured output generation. 8 9Key Features: 10- Real-time streaming of AI responses 11- Multiple agent paradigms (React, Function Calling, Plan-and-Execute, etc.) 12- Step-based reasoning format for transparent decision making 13- Tool integration and execution 14- Configurable iteration limits and debugging 15- Asynchronous streaming with proper resource management 16 17Agent Paradigms: 18The agents use a structured step-based format for reasoning: 19- Thought N: <reasoning process and analysis> 20- Action N: {"name": "tool_name", "arguments": {...}} 21- Observation N: <tool execution result> 22- Final answer: <conclusive response to user query> 23 24Streaming Architecture: 25Streaming allows real-time processing of AI responses as they are generated, enabling: 26- Immediate feedback to users 27- Progress monitoring during long operations 28- Better user experience with responsive interfaces 29- Debugging and monitoring of agent reasoning 30 31Usage Examples: 32 331. Basic React Agent with Streaming: 34 ```python 35 from monoai.agents import Agent 36 37 agent = Agent(model, paradigm="react") 38 agent.enable_streaming() # Uses default console output 39 result = agent.run("What is the capital of France?") 40 ``` 41 422. Custom Streaming Handler: 43 ```python 44 from monoai.agents import Agent 45 import json 46 47 def custom_stream_handler(content): 48 # Process streaming content in real-time 49 print(f"Streaming: {content}", end='', flush=True) 50 51 agent = Agent(model, paradigm="react") 52 agent.enable_streaming(custom_stream_handler) 53 result = agent.run("Your query here") 54 ``` 55 563. Function Calling Agent: 57 ```python 58 from monoai.agents import Agent 59 from monoai.tools import search_web 60 61 agent = Agent(model, paradigm="function_calling") 62 agent.register_tools([search_web]) 63 agent.enable_streaming() 64 result = agent.run("Search for recent AI news") 65 ``` 66 674. Plan and Execute Agent: 68 ```python 69 agent = Agent(model, paradigm="plan-and-execute") 70 agent.enable_streaming() 71 result = agent.run("Create a detailed project plan") 72 ``` 73 74Available Agent Types: 75- FunctionCallingAgenticLoop: Native OpenAI function calling 76- ReactAgenticLoop: ReAct reasoning pattern 77- ReactWithFCAgenticLoop: Hybrid ReAct + Function Calling 78- ProgrammaticAgenticLoop: Code generation and execution 79- PlanAndExecuteAgenticLoop: Planning then execution pattern 80- ReflexionAgenticLoop: Self-reflection and improvement 81- SelfAskAgenticLoop: Self-questioning approach 82- SelfAskWithSearchLoop: Self-ask with web search capabilities 83 84Streaming Callback Format: 85The streaming callback receives content as plain text strings. For advanced use cases, 86you can access the raw streaming data through the model's streaming methods. 87 88Error Handling: 89- Automatic fallback to non-streaming mode on errors 90- Proper cleanup of async resources 91- Configurable debug output 92- Iteration limits to prevent infinite loops 93 94Thread Safety: 95This implementation is designed for single-threaded use. For concurrent access, 96create separate agent instances for each thread or process. 97""" 98 99from .agentic_loops._agentic_loop import _AgenticLoop, _FunctionCallingMixin, _ReactMixin 100from .agentic_loops._function_calling_agentic_loop import FunctionCallingAgenticLoop 101from .agentic_loops._react_agentic_loop import ReactAgenticLoop, ReactWithFCAgenticLoop, _BaseReactLoop 102from .agentic_loops._other_agentic_loops import ( 103 ProgrammaticAgenticLoop, 104 PlanAndExecuteAgenticLoop, 105 ReflexionAgenticLoop, 106 SelfAskAgenticLoop, 107 SelfAskWithSearchLoop 108) 109 110# Main Agent class that provides a unified interface 111from .agent import Agent 112 113__all__ = [ 114 # Base classes 115 '_AgenticLoop', 116 '_FunctionCallingMixin', 117 '_ReactMixin', 118 '_BaseReactLoop', 119 120 # Agent implementations 121 'FunctionCallingAgenticLoop', 122 'ReactAgenticLoop', 123 'ReactWithFCAgenticLoop', 124 'ProgrammaticAgenticLoop', 125 'PlanAndExecuteAgenticLoop', 126 'ReflexionAgenticLoop', 127 'SelfAskAgenticLoop', 128 'SelfAskWithSearchLoop', 129 130 # Main interface 131 'Agent' 132]
222class _AgenticLoop: 223 """Base class for all agentic loop implementations. 224 225 This class provides the core functionality for all AI agents, including 226 tool management, message creation, model execution, and streaming support. 227 It's designed to be extended by specific classes that implement different 228 agentic approaches and reasoning patterns. 229 230 The base class handles common operations like: 231 - Tool registration and execution 232 - Message formatting and conversation management 233 - Model interaction (both streaming and non-streaming) 234 - Step-based reasoning format parsing 235 - Debug output and iteration limits 236 237 Attributes 238 ---------- 239 _model : Any 240 AI model instance used for execution 241 _agentic_prompt : str, Prompt, optional 242 Custom prompt for the agent (None for default). Can be a string, 243 a Prompt object, or a path to a .prompt file 244 _debug : bool 245 Flag to enable debug output 246 _max_iter : Optional[int] 247 Maximum number of iterations allowed (None for unlimited) 248 _stream_callback : Optional[Callable[[str], None]] 249 Callback function for handling streaming content 250 _tools : Dict[str, Any] 251 Dictionary of available tools, mapped by name 252 """ 253 254 _DEFAULT_PROMPT_PATH = "monoai/agents/prompts/" 255 256 def __init__(self, model: Model, agentic_prompt: str=None, debug: bool=False, max_iter: Optional[int]=None, 257 stream_callback: Optional[Callable[[str], None]]=None, human_feedback: Optional[str]=None) -> None: 258 """Initialize the agent with model and configuration. 259 260 Parameters 261 ---------- 262 model : Any 263 AI model instance to use for execution 264 agentic_prompt : str, optional 265 Custom prompt for the agent (None to use default) 266 debug : bool, default False 267 Enable debug output and logging 268 max_iter : Optional[int], default None 269 Maximum number of iterations allowed (None for unlimited) 270 stream_callback : Optional[Callable[[str], None]], default None 271 Callback function for handling streaming content chunks 272 human_feedback : Optional[str], default None 273 Human feedback mode for controlling agent execution. Can be: 274 - None: No human feedback required 275 - "actions": Pause and request confirmation before executing tool actions 276 - "all": Pause after every step for human review 277 """ 278 self._model = model 279 self._agentic_prompt = agentic_prompt 280 self._debug = debug 281 self._max_iter = max_iter 282 self._stream_callback = stream_callback 283 self._human_feedback = human_feedback 284 self._tools = {} 285 self._mcp_servers = {} 286 287 def register_tools(self, tools: List[Any]) -> None: 288 """Register tools with the agent. 289 290 Parameters 291 ---------- 292 tools : List[Any] 293 List of tool functions to register. Each tool must have a 294 __name__ attribute for identification. 295 """ 296 for tool in tools: 297 self._tools[tool.__name__] = tool 298 299 300 def register_mcp_servers(self, mcp_servers: List[Any]) -> None: 301 """Register MCP servers with the agent. 302 303 Parameters 304 ---------- 305 mcp_servers : List[Any] 306 List of MCP server instances to register. 307 """ 308 tools = {} 309 self._mcp_servers = {} 310 for mcp_server in mcp_servers: 311 self._debug_print(f"Connecting to MCP server: {mcp_server.name}") 312 # Use context manager for proper connection handling 313 self._mcp_servers[mcp_server.name] = mcp_server 314 server_tools = mcp_server.get_tools() 315 self._debug_print(f"Retrieved {len(server_tools)} tools from {mcp_server.name}") 316 for tool in server_tools: 317 # Create a new tool name with MCP prefix 318 original_name = tool.name 319 tool.name = f"mcp_{mcp_server.name}_{original_name}" 320 tools[tool.name] = tool 321 self._debug_print(f" - {tool.name}") 322 323 self._tools.update(tools) 324 325 326 def enable_streaming(self, stream_callback: Optional[Callable[[str], None]] = None) -> None: 327 """Enable streaming responses for this agent. 328 329 When streaming is enabled, the agent will call the provided callback 330 function with each content chunk as it's generated, allowing for 331 real-time processing and display of the AI's response. 332 333 Parameters 334 ---------- 335 stream_callback : Optional[Callable[[str], None]], default None 336 Callback function to handle streaming content chunks. 337 If None, uses a default callback that prints content to console. 338 The callback receives plain text content strings. 339 """ 340 if stream_callback is None: 341 def default_callback(content): 342 print(content, end='', flush=True) 343 344 self._stream_callback = default_callback 345 else: 346 self._stream_callback = stream_callback 347 348 def disable_streaming(self) -> None: 349 """Disable streaming responses for this agent. 350 351 After calling this method, the agent will use standard (non-streaming) 352 model execution for all subsequent requests. 353 """ 354 self._stream_callback = None 355 356 @classmethod 357 def create_streaming(cls, model: Any, stream_callback: Optional[Callable[[str], None]] = None, 358 **kwargs) -> '_AgenticLoop': 359 """Create an agent instance with streaming enabled. 360 361 This is a convenience class method that creates an agent instance 362 with streaming already enabled, avoiding the need to call 363 enable_streaming() separately. 364 365 Parameters 366 ---------- 367 model : Any 368 AI model instance to use 369 stream_callback : Optional[Callable[[str], None]], default None 370 Callback function for handling streaming content chunks 371 **kwargs 372 Additional parameters to pass to the constructor 373 374 Returns 375 ------- 376 _AgenticLoop 377 Agent instance with streaming enabled 378 """ 379 return cls(model, stream_callback=stream_callback, **kwargs) 380 381 def _get_tools(self) -> str: 382 """Generate a descriptive string of available tools. 383 384 This method creates a formatted string listing all registered tools 385 with their signatures and documentation. This string is typically 386 included in prompts to help the AI model understand what tools 387 are available for use. 388 389 Returns 390 ------- 391 str 392 Formatted string with descriptions of all available tools, 393 one per line with " - " prefix. Returns empty string if no tools. 394 395 Example: 396 - colab_downloader(colab_url): Downloads a Jupyter notebook from Google Drive and returns its Python code as a string. 397 Args: colab_url (str): The Google Drive URL of the notebook (should contain 'drive/'). 398 """ 399 if not self._tools: 400 return "" 401 402 tools = [] 403 for tool_name, tool_func in self._tools.items(): 404 if tool_name.startswith("mcp_"): 405 tools.append(f" - {self._encode_mcp_tool(tool_func)}") 406 else: 407 tools.append(f" - {self._encode_tool(tool_func)}") 408 return "\n".join(tools) 409 410 def _get_base_messages(self, agent_type: str, query: str) -> List[Dict[str, Any]]: 411 """Generate base messages for the specific agent type. 412 413 This method creates the initial message structure for the agent, 414 including the appropriate prompt template and user query. The 415 prompt template is selected based on the agent type and includes 416 information about available tools. 417 418 Parameters 419 ---------- 420 agent_type : str 421 Type of agent to determine which prompt template to use 422 query : str 423 User query to include in the prompt 424 425 Returns 426 ------- 427 List[Dict[str, Any]] 428 List of base messages for the agent, including the prompt and query 429 """ 430 tools = self._get_tools() 431 prompt_data = {"available_tools": tools} 432 433 # Handle case where _agentic_prompt is already a Prompt object 434 if self._agentic_prompt is not None and hasattr(self._agentic_prompt, 'as_dict'): 435 # It's already a Prompt object, just update the data and return 436 agentic_prompt = self._agentic_prompt 437 # Update prompt data if the prompt supports it 438 if hasattr(agentic_prompt, 'prompt_data'): 439 agentic_prompt.prompt_data = prompt_data 440 else: 441 # Otherwise, determine prompt configuration and create new Prompt 442 prompt_config = self._determine_prompt_config(agent_type) 443 444 agentic_prompt = Prompt( 445 **prompt_config, 446 is_system=True, 447 prompt_data=prompt_data 448 ) 449 450 messages = [agentic_prompt.as_dict()] 451 452 if isinstance(query, Prompt): 453 messages.append(query.as_dict()) 454 else: 455 messages.append({"role": "user", "content": query}) 456 457 return messages 458 459 def _determine_prompt_config(self, agent_type: str) -> Dict[str, Any]: 460 """Determine the prompt configuration based on agentic_prompt setting. 461 462 Parameters 463 ---------- 464 agent_type : str 465 Type of agent for fallback prompt selection 466 467 Returns 468 ------- 469 Dict[str, Any] 470 Configuration dictionary for Prompt initialization 471 """ 472 if self._agentic_prompt is None: 473 return { 474 "prompt_id": f"{self._DEFAULT_PROMPT_PATH}{agent_type}.prompt" 475 } 476 477 if isinstance(self._agentic_prompt, str): 478 if self._agentic_prompt.endswith(".prompt"): 479 return {"prompt_id": self._agentic_prompt} 480 else: 481 return {"prompt": self._agentic_prompt} 482 else: 483 # For non-string types (including Prompt objects), treat as prompt content 484 return {"prompt": self._agentic_prompt} 485 486 def _debug_print(self, content: str) -> None: 487 """Print debug information if debug mode is enabled. 488 489 Parameters 490 ---------- 491 content : str 492 Content to print in debug mode 493 """ 494 if self._debug: 495 print(content) 496 print("-------") 497 498 def _request_human_feedback(self, step_type: str, content: str, action_data: Optional[Dict] = None) -> bool: 499 """Request human feedback for the current step. 500 501 This method pauses execution and asks the user for confirmation 502 before proceeding with the current step. 503 504 Parameters 505 ---------- 506 step_type : str 507 Type of step being executed (e.g., "thought", "action", "observation") 508 content : str 509 Content of the current step 510 action_data : Optional[Dict], default None 511 Action data if this is an action step 512 513 Returns 514 ------- 515 bool 516 True if the user approves the step, False if they want to stop 517 """ 518 print(f"\n{'='*60}") 519 print(f"HUMAN FEEDBACK REQUIRED - {step_type.upper()} STEP") 520 print(f"{'='*60}") 521 522 if action_data: 523 print(f"Action to execute: {action_data.get('name', 'Unknown')}") 524 print(f"Arguments: {action_data.get('arguments', {})}") 525 else: 526 print(f"Content: {content}") 527 528 print(f"\nOptions:") 529 print(f" [y] Yes - Continue with this step") 530 print(f" [n] No - Stop execution") 531 print(f" [s] Skip - Skip this step and continue") 532 533 while True: 534 try: 535 response = input("\nYour choice (y/n/s): ").lower().strip() 536 if response in ['y', 'yes']: 537 return True 538 elif response in ['n', 'no']: 539 return False 540 elif response in ['s', 'skip']: 541 print("Step skipped.") 542 return True # Continue but skip the current step 543 else: 544 print("Please enter 'y' for yes, 'n' for no, or 's' for skip.") 545 except KeyboardInterrupt: 546 print("\nExecution interrupted by user.") 547 return False 548 549 550 def _parse_step_format(self, content: str) -> Optional[Dict[str, Any]]: 551 """Parse the step-based format <STEP_TYPE>: <RESULT>. 552 553 This method parses agent responses that follow the structured step format 554 used by ReAct and similar reasoning patterns. It extracts the step type, 555 step number, and content from formatted responses. 556 557 Supported step types: 558 - Thought: Reasoning and analysis steps 559 - Action: Tool calls and actions (must be valid JSON) 560 - Observation: Results from tool executions 561 - Final answer: Conclusive responses to user queries 562 563 Special handling: 564 - If content contains both "Thought" and "Action", the Action will be returned with priority. 565 - If content contains both "Thought" and "Final answer", the Final answer will be returned with priority. 566 567 Parameters 568 ---------- 569 content : str 570 Content to parse in the format <STEP_TYPE>: <RESULT> 571 572 Returns 573 ------- 574 Optional[Dict[str, Any]] 575 Dictionary containing: 576 - step_type: Type of step (thought, action, observation, final answer) 577 - step_number: Optional step number if present 578 - content: Step content 579 - action: Parsed action data (for action steps) 580 - final_answer: Final answer content (for final answer steps) 581 Returns None if parsing fails 582 """ 583 if not content or not isinstance(content, str): 584 return None 585 586 content = content.strip() 587 588 # Check for multiple step types and prioritize accordingly 589 lines = content.split('\n') 590 action_start = None 591 thought_start = None 592 final_answer_start = None 593 594 # Find the start lines for Action, Thought, and Final answer 595 for i, line in enumerate(lines): 596 if re.match(r'^Action(?:\s+\d+)?\s*:', line, re.IGNORECASE): 597 action_start = i 598 elif re.match(r'^Thought(?:\s+\d+)?\s*:', line, re.IGNORECASE): 599 thought_start = i 600 elif re.match(r'^Final answer(?:\s+\d+)?\s*:', line, re.IGNORECASE): 601 final_answer_start = i 602 603 # Priority: Final answer > Action > Thought 604 if final_answer_start is not None: 605 # Final answer has highest priority 606 step_type = "final_answer" 607 608 # Extract step number from Final answer line 609 final_answer_line = lines[final_answer_start] 610 step_number_match = re.search(r'Final answer\s+(\d+)\s*:', final_answer_line, re.IGNORECASE) 611 step_number = step_number_match.group(1) if step_number_match else None 612 613 # Extract content from Final answer (everything after the colon on the first line) 614 final_answer_content = final_answer_line.split(':', 1)[1].strip() 615 616 # Add all subsequent lines until we hit another step type or end 617 i = final_answer_start + 1 618 while i < len(lines): 619 line = lines[i].strip() 620 if re.match(r'^(Thought|Action|Observation|Final answer)(?:\s+\d+)?\s*:', line, re.IGNORECASE): 621 break 622 if line: # Only add non-empty lines 623 final_answer_content += '\n' + line 624 i += 1 625 626 step_content = final_answer_content.strip().replace("```json", "").replace("```", "") 627 628 result = { 629 "step_type": step_type, 630 "step_number": step_number, 631 "content": step_content, 632 "final_answer": step_content 633 } 634 635 return result 636 637 elif action_start is not None and thought_start is not None: 638 # Both Thought and Action present, prioritize Action 639 step_type = "action" 640 641 # Extract step number from Action line 642 action_line = lines[action_start] 643 step_number_match = re.search(r'Action\s+(\d+)\s*:', action_line, re.IGNORECASE) 644 step_number = step_number_match.group(1) if step_number_match else None 645 646 # Extract content from Action (everything after the colon on the first line) 647 action_content = action_line.split(':', 1)[1].strip() 648 649 # Add all subsequent lines until we hit another step type or end 650 i = action_start + 1 651 while i < len(lines): 652 line = lines[i].strip() 653 if re.match(r'^(Thought|Action|Observation|Final answer)(?:\s+\d+)?\s*:', line, re.IGNORECASE): 654 break 655 if line: # Only add non-empty lines 656 action_content += '\n' + line 657 i += 1 658 659 step_content = action_content.strip().replace("```json", "").replace("```", "") 660 661 result = { 662 "step_type": step_type, 663 "step_number": step_number, 664 "content": step_content 665 } 666 667 # Parse action data (must be JSON) 668 try: 669 action_data = json.loads(step_content) 670 result["action"] = action_data 671 except json.JSONDecodeError: 672 # If not valid JSON, keep content as raw string 673 result["action"] = {"raw": step_content} 674 675 return result 676 677 # Standard single step parsing 678 step_pattern = r'^(Thought|Action|Observation|Final answer)(?:\s+(\d+))?\s*:\s*(.*)$' 679 match = re.match(step_pattern, content, re.IGNORECASE | re.DOTALL) 680 681 if match: 682 step_type = match.group(1).lower() 683 step_number = match.group(2) if match.group(2) else None 684 step_content = match.group(3).strip().replace("```json", "").replace("```", "") 685 686 result = { 687 "step_type": step_type, 688 "step_number": step_number, 689 "content": step_content 690 } 691 692 # Special handling for Action steps (must be JSON) 693 if step_type == "action": 694 try: 695 action_data = json.loads(step_content) 696 result["action"] = action_data 697 except json.JSONDecodeError: 698 # If not valid JSON, keep content as raw string 699 result["action"] = {"raw": step_content} 700 701 # Special handling for Final answer steps 702 elif step_type == "final_answer": 703 result["final_answer"] = step_content 704 705 return result 706 707 return None 708 709 def _execute_model_step(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: 710 """Execute a model step and return the response. 711 712 This method handles both streaming and non-streaming model execution. 713 If streaming is enabled and not already in progress, it uses the 714 streaming method. Otherwise, it uses standard model execution. 715 716 Parameters 717 ---------- 718 messages : List[Dict[str, Any]] 719 List of messages to send to the model 720 721 Returns 722 ------- 723 Dict[str, Any] 724 Model response in standard OpenAI format with usage information 725 """ 726 resp = self._model._execute(messages) 727 728 # Extract usage information from the response 729 usage_info = None 730 if hasattr(resp, 'usage') and resp.usage: 731 usage_info = { 732 "total_tokens": resp.usage.total_tokens, 733 "prompt_tokens": resp.usage.prompt_tokens, 734 "completion_tokens": resp.usage.completion_tokens 735 } 736 737 message = resp["choices"][0]["message"] 738 739 # Add usage information to the message 740 if usage_info: 741 message["usage"] = usage_info 742 return message 743 744 def _execute_model_step_stream(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: 745 """Execute a model step with streaming and return the complete response. 746 747 This method handles asynchronous streaming of model responses, collecting 748 all chunks and building the final response. It properly manages async 749 resources to avoid memory leaks and task warnings. 750 751 Parameters 752 ---------- 753 messages : List[Dict[str, Any]] 754 List of messages to send to the model 755 756 Returns 757 ------- 758 Dict[str, Any] 759 Complete model response in standard OpenAI format 760 """ 761 762 import asyncio 763 from litellm import stream_chunk_builder 764 765 # Usa asyncio.run per gestire correttamente il loop 766 async def run_streaming(): 767 chunks = [] 768 stream = None 769 try: 770 stream = self._model._execute_stream(messages) 771 async for chunk in stream: 772 chunks.append(chunk) 773 content = chunk["choices"][0]["delta"]["content"] 774 775 if content is not None: 776 self._stream_callback({"type": "text", "delta": content}) 777 778 finally: 779 if stream is not None: 780 try: 781 await stream.aclose() 782 except Exception: 783 pass 784 785 return chunks 786 787 try: 788 chunks = asyncio.run(run_streaming()) 789 resp = stream_chunk_builder(chunks) 790 791 # Extract usage information from the response 792 usage_info = None 793 if hasattr(resp, 'usage') and resp.usage: 794 usage_info = { 795 "total_tokens": resp.usage.total_tokens, 796 "prompt_tokens": resp.usage.prompt_tokens, 797 "completion_tokens": resp.usage.completion_tokens 798 } 799 800 message = resp["choices"][0]["message"] 801 802 # Add usage information to the message 803 if usage_info: 804 message["usage"] = usage_info 805 806 return message 807 except Exception as e: 808 print(f"Streaming error: {e}, falling back to standard execution") 809 return self._execute_model_step(messages) 810 811 def _create_base_response(self, query: str) -> Dict[str, Any]: 812 """Create the base response structure. 813 814 Parameters 815 ---------- 816 query : str 817 Original user query 818 819 Returns 820 ------- 821 Dict[str, Any] 822 Dictionary with base response structure: 823 - prompt: Original user query 824 - iterations: Empty list for iterations 825 - usage: Dictionary for tracking token usage 826 """ 827 return { 828 "prompt": query, 829 "iterations": [], 830 "usage": { 831 "total_tokens": 0, 832 "prompt_tokens": 0, 833 "completion_tokens": 0 834 } 835 } 836 837 def _update_usage(self, response: Dict[str, Any], iteration_usage: Dict[str, int]) -> None: 838 """Update the total usage in the response. 839 840 Parameters 841 ---------- 842 response : Dict[str, Any] 843 Response dictionary to update 844 iteration_usage : Dict[str, int] 845 Usage information from current iteration 846 """ 847 if hasattr(iteration_usage, "usage"): 848 response["usage"]["total_tokens"] += iteration_usage["usage"]["total_tokens"] 849 response["usage"]["prompt_tokens"] += iteration_usage["usage"]["prompt_tokens"] 850 response["usage"]["completion_tokens"] += iteration_usage["usage"]["completion_tokens"] 851 852 def _handle_final_answer(self, iteration: Dict[str, Any], response: Dict[str, Any]) -> bool: 853 """Handle a final answer, returns True if this is the end. 854 855 This method processes iterations that contain final answers and 856 updates the response structure accordingly. It supports both 857 the old format (final_answer key) and new step format. 858 859 Parameters 860 ---------- 861 iteration : Dict[str, Any] 862 Current iteration potentially containing a final answer 863 response : Dict[str, Any] 864 Response dictionary to update 865 866 Returns 867 ------- 868 bool 869 True if a final answer was found and processed, False otherwise 870 871 Notes 872 ----- 873 This method modifies the response object directly. 874 """ 875 if "final_answer" in iteration: 876 response["iterations"].append(iteration) 877 response["response"] = iteration["final_answer"] 878 return True 879 elif iteration.get("step_type") == "final answer": 880 response["iterations"].append(iteration) 881 response["response"] = iteration["content"] 882 return True 883 return False 884 885 def _pngimagefile_to_base64_data_uri(self, img) -> str: 886 """ 887 Converte un oggetto PIL.PngImageFile in una stringa base64 data URI. 888 889 Args: 890 img (Image.Image): immagine PIL (PngImageFile o convertita in PNG) 891 892 Returns: 893 str: stringa "data:image/png;base64,<...>" 894 """ 895 import io 896 import base64 897 buffer = io.BytesIO() 898 img.save(buffer, format="PNG") 899 buffer.seek(0) 900 901 # codifica in base64 902 encoded = base64.b64encode(buffer.read()).decode("utf-8") 903 904 return f"data:image/png;base64,{encoded}" 905 906 def _handle_tool_action(self, iteration: Dict[str, Any], response: Dict[str, Any], messages: List[Dict[str, Any]]) -> None: 907 """Handle a tool action execution. 908 909 This method processes iterations that contain tool actions, executes 910 the corresponding tools, and updates the conversation with the results. 911 It supports both old and new step-based formats. 912 913 Parameters 914 ---------- 915 iteration : Dict[str, Any] 916 Current iteration containing the tool action 917 response : Dict[str, Any] 918 Response dictionary to update 919 messages : List[Dict[str, Any]] 920 Message list to update with observation 921 922 Notes 923 ----- 924 This method modifies the response and messages objects directly. 925 """ 926 # Check if human feedback is required for actions 927 if self._human_feedback == "actions" or self._human_feedback == "all": 928 action_data = iteration.get("action", {}) 929 if not self._request_human_feedback("action", iteration.get("content", ""), action_data): 930 print("Execution stopped by user.") 931 return 932 933 if "action" in iteration and iteration["action"].get("name"): 934 tool_call = iteration["action"] 935 936 if self._stream_callback is not None: 937 self._stream_callback({"type": "tool_call", "tool_call": tool_call}) 938 939 if tool_call.get("name").startswith("mcp_"): 940 tool_result = self._call_mcp_tool(tool_call) 941 else: 942 tool_result = self._call_tool(tool_call) 943 944 iteration["observation"] = tool_result 945 response["iterations"].append(iteration) 946 947 if isinstance(tool_result, dict): 948 if "image" in tool_result: 949 img = self._pngimagefile_to_base64_data_uri(tool_result["image"]) 950 msg = [{"type": "image_url", "image_url": {"url": img}}, 951 {"type": "text", "text": "Ecco l'immagine"}] 952 messages.append({"role": "user", "content": msg}) 953 else: 954 msg = json.dumps({"observation": tool_result}) 955 messages.append({"role": "user", "content": msg}) 956 elif iteration.get("step_type") == "action" and "action" in iteration: 957 tool_call = iteration["action"] 958 tool_result = self._call_tool(tool_call) 959 iteration["observation"] = tool_result 960 response["iterations"].append(iteration) 961 962 # Add observation in the new system format 963 observation_msg = f"Observation {iteration.get('step_number', '')}: {tool_result}".strip() 964 messages.append({"role": "user", "content": observation_msg}) 965 966 def _handle_default(self, iteration: Dict[str, Any], response: Dict[str, Any], messages: List[Dict[str, Any]]) -> None: 967 """Handle default case for unhandled iterations. 968 969 This method processes iterations that don't match specific handlers, 970 adding them to the response and updating the conversation flow. 971 It supports both step-based and JSON formats. 972 973 Parameters 974 ---------- 975 iteration : Dict[str, Any] 976 Current iteration to handle 977 response : Dict[str, Any] 978 Response dictionary to update 979 messages : List[Dict[str, Any]] 980 Message list to update 981 982 Notes 983 ----- 984 This method modifies the response and messages objects directly. 985 """ 986 response["iterations"].append(iteration) 987 988 # For new format, add content as user message 989 if iteration.get("step_type") in ["thought", "observation"]: 990 step_type = iteration["step_type"].capitalize() 991 step_number = iteration.get("step_number", "") 992 content = iteration["content"] 993 message_content = f"{step_type} {step_number}: {content}".strip() 994 messages.append({"role": "user", "content": message_content}) 995 else: 996 # Fallback to JSON format for compatibility 997 messages.append({"role": "user", "content": json.dumps(iteration)}) 998 999 def start(self, query: str) -> Dict[str, Any]: 1000 """Abstract method to start the agentic loop. 1001 1002 This method must be implemented by subclasses to define the specific 1003 agentic behavior and reasoning pattern. 1004 1005 Parameters 1006 ---------- 1007 query : str 1008 User query to process 1009 1010 Returns 1011 ------- 1012 Dict[str, Any] 1013 Agent response containing iterations and final result 1014 1015 Raises 1016 ------ 1017 NotImplementedError 1018 This method must be implemented by subclasses 1019 """ 1020 raise NotImplementedError
Base class for all agentic loop implementations.
This class provides the core functionality for all AI agents, including tool management, message creation, model execution, and streaming support. It's designed to be extended by specific classes that implement different agentic approaches and reasoning patterns.
The base class handles common operations like:
- Tool registration and execution
- Message formatting and conversation management
- Model interaction (both streaming and non-streaming)
- Step-based reasoning format parsing
- Debug output and iteration limits
Attributes
- _model (Any): AI model instance used for execution
- _agentic_prompt (str, Prompt, optional): Custom prompt for the agent (None for default). Can be a string, a Prompt object, or a path to a .prompt file
- _debug (bool): Flag to enable debug output
- _max_iter (Optional[int]): Maximum number of iterations allowed (None for unlimited)
- _stream_callback (Optional[Callable[[str], None]]): Callback function for handling streaming content
- _tools (Dict[str, Any]): Dictionary of available tools, mapped by name
256 def __init__(self, model: Model, agentic_prompt: str=None, debug: bool=False, max_iter: Optional[int]=None, 257 stream_callback: Optional[Callable[[str], None]]=None, human_feedback: Optional[str]=None) -> None: 258 """Initialize the agent with model and configuration. 259 260 Parameters 261 ---------- 262 model : Any 263 AI model instance to use for execution 264 agentic_prompt : str, optional 265 Custom prompt for the agent (None to use default) 266 debug : bool, default False 267 Enable debug output and logging 268 max_iter : Optional[int], default None 269 Maximum number of iterations allowed (None for unlimited) 270 stream_callback : Optional[Callable[[str], None]], default None 271 Callback function for handling streaming content chunks 272 human_feedback : Optional[str], default None 273 Human feedback mode for controlling agent execution. Can be: 274 - None: No human feedback required 275 - "actions": Pause and request confirmation before executing tool actions 276 - "all": Pause after every step for human review 277 """ 278 self._model = model 279 self._agentic_prompt = agentic_prompt 280 self._debug = debug 281 self._max_iter = max_iter 282 self._stream_callback = stream_callback 283 self._human_feedback = human_feedback 284 self._tools = {} 285 self._mcp_servers = {}
Initialize the agent with model and configuration.
Parameters
- model (Any): AI model instance to use for execution
- agentic_prompt (str, optional): Custom prompt for the agent (None to use default)
- debug (bool, default False): Enable debug output and logging
- max_iter (Optional[int], default None): Maximum number of iterations allowed (None for unlimited)
- stream_callback (Optional[Callable[[str], None]], default None): Callback function for handling streaming content chunks
- human_feedback (Optional[str], default None):
Human feedback mode for controlling agent execution. Can be:
- None: No human feedback required
- "actions": Pause and request confirmation before executing tool actions
- "all": Pause after every step for human review
287 def register_tools(self, tools: List[Any]) -> None: 288 """Register tools with the agent. 289 290 Parameters 291 ---------- 292 tools : List[Any] 293 List of tool functions to register. Each tool must have a 294 __name__ attribute for identification. 295 """ 296 for tool in tools: 297 self._tools[tool.__name__] = tool
Register tools with the agent.
Parameters
- tools (List[Any]): List of tool functions to register. Each tool must have a __name__ attribute for identification.
300 def register_mcp_servers(self, mcp_servers: List[Any]) -> None: 301 """Register MCP servers with the agent. 302 303 Parameters 304 ---------- 305 mcp_servers : List[Any] 306 List of MCP server instances to register. 307 """ 308 tools = {} 309 self._mcp_servers = {} 310 for mcp_server in mcp_servers: 311 self._debug_print(f"Connecting to MCP server: {mcp_server.name}") 312 # Use context manager for proper connection handling 313 self._mcp_servers[mcp_server.name] = mcp_server 314 server_tools = mcp_server.get_tools() 315 self._debug_print(f"Retrieved {len(server_tools)} tools from {mcp_server.name}") 316 for tool in server_tools: 317 # Create a new tool name with MCP prefix 318 original_name = tool.name 319 tool.name = f"mcp_{mcp_server.name}_{original_name}" 320 tools[tool.name] = tool 321 self._debug_print(f" - {tool.name}") 322 323 self._tools.update(tools)
Register MCP servers with the agent.
Parameters
- mcp_servers (List[Any]): List of MCP server instances to register.
326 def enable_streaming(self, stream_callback: Optional[Callable[[str], None]] = None) -> None: 327 """Enable streaming responses for this agent. 328 329 When streaming is enabled, the agent will call the provided callback 330 function with each content chunk as it's generated, allowing for 331 real-time processing and display of the AI's response. 332 333 Parameters 334 ---------- 335 stream_callback : Optional[Callable[[str], None]], default None 336 Callback function to handle streaming content chunks. 337 If None, uses a default callback that prints content to console. 338 The callback receives plain text content strings. 339 """ 340 if stream_callback is None: 341 def default_callback(content): 342 print(content, end='', flush=True) 343 344 self._stream_callback = default_callback 345 else: 346 self._stream_callback = stream_callback
Enable streaming responses for this agent.
When streaming is enabled, the agent will call the provided callback function with each content chunk as it's generated, allowing for real-time processing and display of the AI's response.
Parameters
- stream_callback (Optional[Callable[[str], None]], default None): Callback function to handle streaming content chunks. If None, uses a default callback that prints content to console. The callback receives plain text content strings.
348 def disable_streaming(self) -> None: 349 """Disable streaming responses for this agent. 350 351 After calling this method, the agent will use standard (non-streaming) 352 model execution for all subsequent requests. 353 """ 354 self._stream_callback = None
Disable streaming responses for this agent.
After calling this method, the agent will use standard (non-streaming) model execution for all subsequent requests.
356 @classmethod 357 def create_streaming(cls, model: Any, stream_callback: Optional[Callable[[str], None]] = None, 358 **kwargs) -> '_AgenticLoop': 359 """Create an agent instance with streaming enabled. 360 361 This is a convenience class method that creates an agent instance 362 with streaming already enabled, avoiding the need to call 363 enable_streaming() separately. 364 365 Parameters 366 ---------- 367 model : Any 368 AI model instance to use 369 stream_callback : Optional[Callable[[str], None]], default None 370 Callback function for handling streaming content chunks 371 **kwargs 372 Additional parameters to pass to the constructor 373 374 Returns 375 ------- 376 _AgenticLoop 377 Agent instance with streaming enabled 378 """ 379 return cls(model, stream_callback=stream_callback, **kwargs)
Create an agent instance with streaming enabled.
This is a convenience class method that creates an agent instance with streaming already enabled, avoiding the need to call enable_streaming() separately.
Parameters
- model (Any): AI model instance to use
- stream_callback (Optional[Callable[[str], None]], default None): Callback function for handling streaming content chunks
- **kwargs: Additional parameters to pass to the constructor
Returns
- _AgenticLoop: Agent instance with streaming enabled
999 def start(self, query: str) -> Dict[str, Any]: 1000 """Abstract method to start the agentic loop. 1001 1002 This method must be implemented by subclasses to define the specific 1003 agentic behavior and reasoning pattern. 1004 1005 Parameters 1006 ---------- 1007 query : str 1008 User query to process 1009 1010 Returns 1011 ------- 1012 Dict[str, Any] 1013 Agent response containing iterations and final result 1014 1015 Raises 1016 ------ 1017 NotImplementedError 1018 This method must be implemented by subclasses 1019 """ 1020 raise NotImplementedError
Abstract method to start the agentic loop.
This method must be implemented by subclasses to define the specific agentic behavior and reasoning pattern.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response containing iterations and final result
Raises
- NotImplementedError: This method must be implemented by subclasses
21class _FunctionCallingMixin: 22 """Mixin for handling OpenAI function calling tool execution. 23 24 This mixin provides methods for managing tool calls in OpenAI's native 25 function calling format, converting tool responses into standardized messages 26 that can be used in the conversation flow. 27 28 The mixin handles the execution of tools registered with the agent and 29 formats their responses according to OpenAI's message format specification. 30 """ 31 32 def _call_tool(self, tool_call: Any) -> Dict[str, str]: 33 """Execute a tool call and return the formatted response. 34 35 This method takes a tool call object from the AI model's response, 36 executes the corresponding tool function, and returns a properly 37 formatted message that can be added to the conversation history. 38 39 Parameters 40 ---------- 41 tool_call : Any 42 Tool call object containing function execution details. 43 Must have attributes: 44 - function.name: Name of the function to call 45 - function.arguments: JSON string of function arguments 46 - id: Unique identifier for this tool call 47 48 Returns 49 ------- 50 Dict[str, str] 51 Formatted tool response message containing: 52 - tool_call_id: ID of the original tool call 53 - role: Message role (always "tool") 54 - name: Name of the executed function 55 - content: Tool execution result as string 56 57 Raises 58 ------ 59 KeyError 60 If the tool function is not registered with the agent 61 json.JSONDecodeError 62 If the function arguments are not valid JSON 63 Exception 64 If the tool function execution fails 65 """ 66 67 function_name = tool_call.function.name 68 function_to_call = self._tools[function_name] 69 function_args = json.loads(tool_call.function.arguments) 70 function_response = str(function_to_call(**function_args)) 71 return { 72 "tool_call_id": tool_call.id, 73 "role": "tool", 74 "name": function_name, 75 "content": function_response, 76 } 77 78 def _call_mcp_tool(self, tool_call: Dict[str, Any]) -> Any: 79 """Execute a MCP tool call.""" 80 tool_name = tool_call.function.name 81 tool_arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {} 82 tool_call_id = tool_call.id 83 84 # Get server name from tool name (format: mcp_servername_toolname) 85 server_name = tool_name.split("_", 2)[1] 86 if server_name not in self._mcp_servers: 87 raise ValueError(f"MCP server '{server_name}' not found") 88 # Simple approach: just return an error for now since MCP is not working 89 return { 90 "tool_call_id": tool_call_id, 91 "role": "tool", 92 "name": tool_name, 93 "content": str(self._mcp_servers[server_name].call_tool(tool_name, tool_arguments)) 94 }
Mixin for handling OpenAI function calling tool execution.
This mixin provides methods for managing tool calls in OpenAI's native function calling format, converting tool responses into standardized messages that can be used in the conversation flow.
The mixin handles the execution of tools registered with the agent and formats their responses according to OpenAI's message format specification.
98class _ReactMixin: 99 """Mixin for handling ReAct-style tool calls with JSON format. 100 101 This mixin provides methods for managing tool calls in structured JSON format, 102 typical of ReAct-style approaches for AI agents. It handles the encoding 103 of tool functions and execution of tool calls based on parsed JSON responses. 104 105 The ReAct (Reasoning and Acting) pattern allows agents to reason about 106 problems step-by-step and take actions using tools when needed. 107 """ 108 109 def _encode_tool(self, func: Any) -> str: 110 """Encode a function into a descriptive string format. 111 112 This method creates a human-readable description of a tool function 113 that can be included in prompts to help the AI model understand 114 what tools are available and how to use them. 115 116 Parameters 117 ---------- 118 func : Any 119 Function to encode. Must have attributes: 120 - __name__: Function name 121 - __doc__: Function documentation 122 123 Returns 124 ------- 125 str 126 Descriptive string in the format: 127 "function_name(signature): documentation" 128 Newlines are replaced with spaces for single-line format. 129 """ 130 sig = inspect.signature(func) 131 doc = inspect.getdoc(func) 132 encoded = func.__name__ + str(sig) + ": " + doc 133 encoded = encoded.replace("\n", " ") 134 return encoded 135 136 def _encode_mcp_tool(self, tool: dict) -> str: 137 """Converte uno schema tipo JSON Schema in sezione Args Google style.""" 138 schema = tool.inputSchema 139 if schema.get("type") != "object": 140 raise ValueError("Lo schema radice deve avere type=object") 141 142 args = [] 143 args_doc = ["Args:"] 144 properties = schema.get("properties", {}) 145 required = set(schema.get("required", [])) 146 147 for name, spec in properties.items(): 148 args.append(name) 149 typ = spec.get("type", "Any") 150 desc = spec.get("description", "").strip().replace("\n", " ") 151 title = spec.get("title", "") 152 default_info = "" if name in required else " Defaults to None." 153 154 arg_line = f" {name} ({typ}): {desc}{default_info}" 155 if title: 156 arg_line = f" {name} ({typ}): {title}. {desc}{default_info}" 157 args_doc.append(arg_line) 158 159 encoded = tool.name + "(" + ", ".join(args) + "): " 160 encoded += tool.description 161 encoded += ". ".join(args_doc) 162 return encoded 163 164 165 def _call_tool(self, tool_call: Dict[str, Any]) -> Any: 166 """Execute a tool call in ReAct format. 167 168 This method executes a tool call based on a structured dictionary 169 containing the tool name and arguments. It's designed to work with 170 the ReAct pattern where tools are called through JSON-formatted 171 action specifications. 172 173 Parameters 174 ---------- 175 tool_call : Dict[str, Any] 176 Tool call specification containing: 177 - name: Name of the tool to call 178 - arguments: Dictionary of tool arguments 179 180 Returns 181 ------- 182 Any 183 Result of the tool execution 184 185 Raises 186 ------ 187 KeyError 188 If the tool is not registered with the agent 189 TypeError 190 If the tool arguments don't match the function signature 191 Exception 192 If the tool execution fails 193 """ 194 tool = self._tools[tool_call["name"]] 195 kwargs = list(tool_call["arguments"].values()) 196 return tool(*kwargs) 197 198 def _call_mcp_tool(self, tool_call: Dict[str, Any]) -> Any: 199 """Execute a MCP tool call. 200 201 This method executes a MCP tool call based on a structured dictionary 202 containing the tool name and arguments. 203 """ 204 # Extract server name from tool name (format: mcp_servername_toolname) 205 tool_name_parts = tool_call["name"].split("_", 2) 206 if len(tool_name_parts) < 3: 207 raise ValueError(f"Invalid MCP tool name format: {tool_call['name']}") 208 209 server_name = tool_name_parts[1] 210 tool_name = tool_name_parts[2] 211 if server_name not in self._mcp_servers: 212 raise ValueError(f"MCP server '{server_name}' not found") 213 214 return { 215 "tool_call_id": tool_name, 216 "role": "tool", 217 "name": tool_name, 218 "content": str(self._mcp_servers[server_name].call_tool(tool_name, tool_call["arguments"])) 219 }
Mixin for handling ReAct-style tool calls with JSON format.
This mixin provides methods for managing tool calls in structured JSON format, typical of ReAct-style approaches for AI agents. It handles the encoding of tool functions and execution of tool calls based on parsed JSON responses.
The ReAct (Reasoning and Acting) pattern allows agents to reason about problems step-by-step and take actions using tools when needed.
14class _BaseReactLoop(_AgenticLoop, _ReactMixin): 15 """Base class for all ReAct-style agents. 16 17 This class implements the standard loop for agents that use a ReAct-style 18 approach, where the model produces structured JSON responses that are 19 parsed and handled iteratively. The ReAct pattern combines reasoning 20 and acting in a step-by-step manner. 21 22 The base loop handles: 23 - Step-based reasoning format parsing 24 - Tool action execution and observation 25 - Final answer detection 26 - Custom iteration handlers 27 - Error handling and fallbacks 28 29 Attributes 30 ---------- 31 _max_iter : Optional[int] 32 Maximum number of iterations allowed 33 """ 34 35 def _run_react_loop(self, query: str, agent_type: str, 36 custom_handlers: Optional[Dict[str, Callable]] = None) -> Dict[str, Any]: 37 """Execute the standard ReAct loop. 38 39 This method implements the core ReAct reasoning pattern where the 40 agent alternates between thinking, acting, and observing until it 41 reaches a final answer or hits iteration limits. 42 43 Parameters 44 ---------- 45 query : str 46 User query to process 47 agent_type : str 48 Type of agent to determine which prompt template to use 49 custom_handlers : Optional[Dict[str, Callable]], optional 50 Dictionary of custom handlers for specific iteration types. 51 Keys are field names in the iteration, values are functions 52 that handle those iterations. 53 54 Returns 55 ------- 56 Dict[str, Any] 57 Agent response containing: 58 - prompt: Original user query 59 - iterations: List of processed iterations 60 - response: Final response (if present) 61 62 Notes 63 ----- 64 This method automatically handles: 65 - Final answers (final_answer) 66 - Tool actions (action) 67 - Custom cases via custom_handlers 68 - Default cases for unhandled iterations 69 - JSON error handling 70 """ 71 messages = self._get_base_messages(agent_type, query) 72 73 current_iter = 0 74 response = self._create_base_response(query) 75 76 # Handler personalizzati per casi speciali 77 custom_handlers = custom_handlers or {} 78 79 while True: 80 if self._max_iter is not None and current_iter >= self._max_iter: 81 break 82 83 if self._stream_callback is None: 84 resp = self._execute_model_step(messages) 85 else: 86 resp = self._execute_model_step_stream(messages) 87 88 messages.append(resp) 89 content = resp["content"] 90 91 self._debug_print(content) 92 93 # Update usage for this iteration 94 self._update_usage(response, resp) 95 96 if content is not None: 97 iteration = self._parse_step_format(content) 98 if iteration: 99 # Add usage information to the iteration 100 if "usage" in resp: 101 iteration["usage"] = resp["usage"] 102 103 # Check if human feedback is required for all steps 104 if self._human_feedback == "all": 105 if not self._request_human_feedback(iteration.get("step_type", "unknown"), 106 iteration.get("content", ""), 107 iteration.get("action")): 108 print("Execution stopped by user.") 109 break 110 111 # Gestione risposta finale 112 if self._handle_final_answer(iteration, response): 113 break 114 115 # Gestione azioni di tool 116 if iteration["step_type"]=="action": 117 self._handle_tool_action(iteration, response, messages) 118 continue 119 120 # Gestione casi personalizzati 121 handled = False 122 for key, handler in custom_handlers.items(): 123 if iteration["step_type"] == key: 124 handler(iteration, response, messages) 125 handled = True 126 break 127 128 if not handled: 129 self._handle_default(iteration, response, messages) 130 else: 131 # Se non riesce a parsare, aggiungi come messaggio utente 132 messages.append({"role": "user", "content": content}) 133 134 current_iter += 1 135 136 return response
Base class for all ReAct-style agents.
This class implements the standard loop for agents that use a ReAct-style approach, where the model produces structured JSON responses that are parsed and handled iteratively. The ReAct pattern combines reasoning and acting in a step-by-step manner.
The base loop handles:
- Step-based reasoning format parsing
- Tool action execution and observation
- Final answer detection
- Custom iteration handlers
- Error handling and fallbacks
Attributes
- _max_iter (Optional[int]): Maximum number of iterations allowed
15class FunctionCallingAgenticLoop(_AgenticLoop, _FunctionCallingMixin): 16 """Agent that uses OpenAI's native function calling. 17 18 This agent implements a loop that leverages OpenAI's native function calling 19 system, allowing the model to directly call available functions without 20 manual response parsing. This approach is more reliable and efficient 21 than text-based tool calling. 22 23 The agent automatically handles: 24 - Function call detection and execution 25 - Tool result integration into conversation 26 - Iteration limits to prevent infinite loops 27 - Streaming support for real-time responses 28 29 Attributes 30 ---------- 31 _model : Any 32 OpenAI model with function calling support 33 _tools : Dict[str, Any] 34 Available tools for the agent 35 """ 36 37 def _get_base_messages(self, query: str) -> List[Dict[str, Any]]: 38 """Generate base messages for the specific agent type. 39 40 This method creates the initial message structure for the agent, 41 including the appropriate prompt template and user query. The 42 prompt template is selected based on the agent type and includes 43 information about available tools. 44 45 Parameters 46 ---------- 47 query : str 48 User query to include in the prompt 49 50 Returns 51 ------- 52 List[Dict[str, Any]] 53 List of base messages for the agent, including the prompt and query 54 """ 55 messages = [] 56 if self._agentic_prompt is not None: 57 if isinstance(self._agentic_prompt, str): 58 if self._agentic_prompt.endswith(".prompt"): 59 agentic_prompt = Prompt(prompt_id=self._agentic_prompt, is_system=True) 60 else: 61 agentic_prompt = Prompt(prompt=self._agentic_prompt, is_system=True) 62 else: 63 agentic_prompt = self._agentic_prompt 64 messages.append(agentic_prompt.as_dict()) 65 66 if isinstance(query, Prompt): 67 messages.append(query.as_dict()) 68 else: 69 messages.append({"role": "user", "content": query}) 70 71 return messages 72 73 def start(self, query: str) -> Dict[str, Any]: 74 """Start the agentic loop using function calling. 75 76 This method processes user queries through OpenAI's function calling 77 system, automatically executing tools when the model determines 78 they are needed. 79 80 Parameters 81 ---------- 82 query : str 83 User query to process 84 85 Returns 86 ------- 87 Dict[str, Any] 88 Agent response containing: 89 - prompt: Original user query 90 - iterations: List of tool calls executed 91 - response: Final model response 92 """ 93 # Add all tools to the model 94 self._model._add_tools(list(self._tools.values())) 95 messages = self._get_base_messages(query) 96 response = self._create_base_response(query) 97 current_iter = 0 98 max_iterations = self._max_iter if self._max_iter is not None else 10 # Limite di sicurezza 99 100 while current_iter < max_iterations: 101 102 if self._stream_callback is None: 103 resp = self._execute_model_step(messages) 104 else: 105 resp = self._execute_model_step_stream(messages) 106 107 messages.append(resp) 108 content = resp["content"] 109 self._debug_print(content) 110 111 self._update_usage(response, resp) 112 113 if resp.get("tool_calls"): 114 for tool_call in resp["tool_calls"]: 115 # Check if human feedback is required for actions 116 if self._human_feedback == "actions" or self._human_feedback == "all": 117 action_data = { 118 "name": tool_call.function.name, 119 "arguments": json.loads(tool_call.function.arguments) if tool_call.function.arguments else {} 120 } 121 if not self._request_human_feedback("action", f"Function call: {tool_call.function.name}", action_data): 122 print("Execution stopped by user.") 123 return response 124 125 if tool_call.function.name.startswith("mcp_"): 126 tool_result = self._call_mcp_tool(tool_call) 127 else: 128 tool_result = self._call_tool(tool_call) 129 iteration_data = { 130 "name": tool_call.function.name, 131 "arguments": tool_call.function.arguments, 132 "result": tool_result["content"] 133 } 134 135 # Add usage information to the iteration 136 if "usage" in resp: 137 iteration_data["usage"] = resp["usage"] 138 139 response["iterations"].append(iteration_data) 140 messages.append(tool_result) 141 else: 142 # Check if human feedback is required for all steps (non-action responses) 143 if self._human_feedback == "all": 144 if not self._request_human_feedback("response", content): 145 print("Execution stopped by user.") 146 return response 147 148 response["response"] = content 149 break 150 151 current_iter += 1 152 153 # Se arriviamo qui, abbiamo raggiunto il limite di iterazioni 154 if self._debug: 155 print(f"Raggiunto limite di iterazioni ({max_iterations})") 156 157 return response
Agent that uses OpenAI's native function calling.
This agent implements a loop that leverages OpenAI's native function calling system, allowing the model to directly call available functions without manual response parsing. This approach is more reliable and efficient than text-based tool calling.
The agent automatically handles:
- Function call detection and execution
- Tool result integration into conversation
- Iteration limits to prevent infinite loops
- Streaming support for real-time responses
Attributes
- _model (Any): OpenAI model with function calling support
- _tools (Dict[str, Any]): Available tools for the agent
73 def start(self, query: str) -> Dict[str, Any]: 74 """Start the agentic loop using function calling. 75 76 This method processes user queries through OpenAI's function calling 77 system, automatically executing tools when the model determines 78 they are needed. 79 80 Parameters 81 ---------- 82 query : str 83 User query to process 84 85 Returns 86 ------- 87 Dict[str, Any] 88 Agent response containing: 89 - prompt: Original user query 90 - iterations: List of tool calls executed 91 - response: Final model response 92 """ 93 # Add all tools to the model 94 self._model._add_tools(list(self._tools.values())) 95 messages = self._get_base_messages(query) 96 response = self._create_base_response(query) 97 current_iter = 0 98 max_iterations = self._max_iter if self._max_iter is not None else 10 # Limite di sicurezza 99 100 while current_iter < max_iterations: 101 102 if self._stream_callback is None: 103 resp = self._execute_model_step(messages) 104 else: 105 resp = self._execute_model_step_stream(messages) 106 107 messages.append(resp) 108 content = resp["content"] 109 self._debug_print(content) 110 111 self._update_usage(response, resp) 112 113 if resp.get("tool_calls"): 114 for tool_call in resp["tool_calls"]: 115 # Check if human feedback is required for actions 116 if self._human_feedback == "actions" or self._human_feedback == "all": 117 action_data = { 118 "name": tool_call.function.name, 119 "arguments": json.loads(tool_call.function.arguments) if tool_call.function.arguments else {} 120 } 121 if not self._request_human_feedback("action", f"Function call: {tool_call.function.name}", action_data): 122 print("Execution stopped by user.") 123 return response 124 125 if tool_call.function.name.startswith("mcp_"): 126 tool_result = self._call_mcp_tool(tool_call) 127 else: 128 tool_result = self._call_tool(tool_call) 129 iteration_data = { 130 "name": tool_call.function.name, 131 "arguments": tool_call.function.arguments, 132 "result": tool_result["content"] 133 } 134 135 # Add usage information to the iteration 136 if "usage" in resp: 137 iteration_data["usage"] = resp["usage"] 138 139 response["iterations"].append(iteration_data) 140 messages.append(tool_result) 141 else: 142 # Check if human feedback is required for all steps (non-action responses) 143 if self._human_feedback == "all": 144 if not self._request_human_feedback("response", content): 145 print("Execution stopped by user.") 146 return response 147 148 response["response"] = content 149 break 150 151 current_iter += 1 152 153 # Se arriviamo qui, abbiamo raggiunto il limite di iterazioni 154 if self._debug: 155 print(f"Raggiunto limite di iterazioni ({max_iterations})") 156 157 return response
Start the agentic loop using function calling.
This method processes user queries through OpenAI's function calling system, automatically executing tools when the model determines they are needed.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response containing:
- prompt: Original user query
- iterations: List of tool calls executed
- response: Final model response
139class ReactAgenticLoop(_BaseReactLoop): 140 """Standard ReAct agent. 141 142 This agent implements the standard ReAct pattern, where the model 143 produces JSON responses that are parsed and handled iteratively. 144 The ReAct pattern combines reasoning and acting in a structured way. 145 """ 146 147 def start(self, query: str) -> Dict[str, Any]: 148 """Start the ReAct agentic loop. 149 150 Parameters 151 ---------- 152 query : str 153 User query to process 154 155 Returns 156 ------- 157 Dict[str, Any] 158 Agent response processed through the ReAct loop 159 """ 160 return self._run_react_loop(query, "react")
Standard ReAct agent.
This agent implements the standard ReAct pattern, where the model produces JSON responses that are parsed and handled iteratively. The ReAct pattern combines reasoning and acting in a structured way.
147 def start(self, query: str) -> Dict[str, Any]: 148 """Start the ReAct agentic loop. 149 150 Parameters 151 ---------- 152 query : str 153 User query to process 154 155 Returns 156 ------- 157 Dict[str, Any] 158 Agent response processed through the ReAct loop 159 """ 160 return self._run_react_loop(query, "react")
Start the ReAct agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response processed through the ReAct loop
163class ReactWithFCAgenticLoop(_AgenticLoop, _FunctionCallingMixin): 164 """Agent that combines ReAct and Function Calling. 165 166 This agent combines the ReAct approach with OpenAI's native function 167 calling, allowing for hybrid tool call management. This provides 168 the flexibility of ReAct reasoning with the reliability of function calling. 169 """ 170 171 def start(self, query: str) -> Dict[str, Any]: 172 """Start the hybrid ReAct + Function Calling agentic loop. 173 174 Parameters 175 ---------- 176 query : str 177 User query to process 178 179 Returns 180 ------- 181 Dict[str, Any] 182 Agent response (to be implemented) 183 184 Notes 185 ----- 186 TODO: Implement combination of ReAct and Function Calling 187 """ 188 # TODO: Implement combination of ReAct and Function Calling 189 pass
Agent that combines ReAct and Function Calling.
This agent combines the ReAct approach with OpenAI's native function calling, allowing for hybrid tool call management. This provides the flexibility of ReAct reasoning with the reliability of function calling.
171 def start(self, query: str) -> Dict[str, Any]: 172 """Start the hybrid ReAct + Function Calling agentic loop. 173 174 Parameters 175 ---------- 176 query : str 177 User query to process 178 179 Returns 180 ------- 181 Dict[str, Any] 182 Agent response (to be implemented) 183 184 Notes 185 ----- 186 TODO: Implement combination of ReAct and Function Calling 187 """ 188 # TODO: Implement combination of ReAct and Function Calling 189 pass
Start the hybrid ReAct + Function Calling agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response (to be implemented)
Notes
TODO: Implement combination of ReAct and Function Calling
15class ProgrammaticAgenticLoop(_BaseReactLoop): 16 """Programmatic agent. 17 18 This agent implements a programmatic approach where the model 19 produces code or structured instructions that are executed. 20 It's designed for tasks that require code generation and execution. 21 """ 22 23 def start(self, query: str) -> Dict[str, Any]: 24 """Start the programmatic agentic loop. 25 26 Parameters 27 ---------- 28 query : str 29 User query to process 30 31 Returns 32 ------- 33 Dict[str, Any] 34 Agent response processed through the programmatic loop 35 """ 36 return self._run_react_loop(query, "programmatic")
Programmatic agent.
This agent implements a programmatic approach where the model produces code or structured instructions that are executed. It's designed for tasks that require code generation and execution.
23 def start(self, query: str) -> Dict[str, Any]: 24 """Start the programmatic agentic loop. 25 26 Parameters 27 ---------- 28 query : str 29 User query to process 30 31 Returns 32 ------- 33 Dict[str, Any] 34 Agent response processed through the programmatic loop 35 """ 36 return self._run_react_loop(query, "programmatic")
Start the programmatic agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response processed through the programmatic loop
39class PlanAndExecuteAgenticLoop(_BaseReactLoop): 40 """Plan-and-execute agent. 41 42 This agent implements the plan-and-execute pattern, where the model 43 first plans the actions and then executes them sequentially. 44 This approach is useful for complex tasks that require careful planning. 45 """ 46 47 def start(self, query: str) -> Dict[str, Any]: 48 """Start the plan-and-execute agentic loop. 49 50 Parameters 51 ---------- 52 query : str 53 User query to process 54 55 Returns 56 ------- 57 Dict[str, Any] 58 Agent response processed through the plan-and-execute loop 59 """ 60 return self._run_react_loop(query, "plan_and_execute")
Plan-and-execute agent.
This agent implements the plan-and-execute pattern, where the model first plans the actions and then executes them sequentially. This approach is useful for complex tasks that require careful planning.
47 def start(self, query: str) -> Dict[str, Any]: 48 """Start the plan-and-execute agentic loop. 49 50 Parameters 51 ---------- 52 query : str 53 User query to process 54 55 Returns 56 ------- 57 Dict[str, Any] 58 Agent response processed through the plan-and-execute loop 59 """ 60 return self._run_react_loop(query, "plan_and_execute")
Start the plan-and-execute agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response processed through the plan-and-execute loop
63class ReflexionAgenticLoop(_BaseReactLoop): 64 """Agent with reflection capabilities. 65 66 This agent implements the reflexion pattern, where the model 67 reflects on its own actions and decisions to improve performance. 68 This self-reflective approach helps the agent learn from mistakes 69 and improve its reasoning over time. 70 """ 71 72 def start(self, query: str) -> Dict[str, Any]: 73 """Start the reflexion agentic loop. 74 75 Parameters 76 ---------- 77 query : str 78 User query to process 79 80 Returns 81 ------- 82 Dict[str, Any] 83 Agent response processed through the reflexion loop 84 """ 85 return self._run_react_loop(query, "reflexion")
Agent with reflection capabilities.
This agent implements the reflexion pattern, where the model reflects on its own actions and decisions to improve performance. This self-reflective approach helps the agent learn from mistakes and improve its reasoning over time.
72 def start(self, query: str) -> Dict[str, Any]: 73 """Start the reflexion agentic loop. 74 75 Parameters 76 ---------- 77 query : str 78 User query to process 79 80 Returns 81 ------- 82 Dict[str, Any] 83 Agent response processed through the reflexion loop 84 """ 85 return self._run_react_loop(query, "reflexion")
Start the reflexion agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response processed through the reflexion loop
88class SelfAskAgenticLoop(_BaseReactLoop): 89 """Self-ask agent. 90 91 This agent implements the self-ask pattern, where the model 92 asks itself questions to guide the reasoning process. 93 This approach helps break down complex problems into smaller, 94 more manageable questions. 95 """ 96 97 def start(self, query: str) -> Dict[str, Any]: 98 """Start the self-ask agentic loop. 99 100 Parameters 101 ---------- 102 query : str 103 User query to process 104 105 Returns 106 ------- 107 Dict[str, Any] 108 Agent response processed through the self-ask loop 109 """ 110 return self._run_react_loop(query, "self_ask")
Self-ask agent.
This agent implements the self-ask pattern, where the model asks itself questions to guide the reasoning process. This approach helps break down complex problems into smaller, more manageable questions.
97 def start(self, query: str) -> Dict[str, Any]: 98 """Start the self-ask agentic loop. 99 100 Parameters 101 ---------- 102 query : str 103 User query to process 104 105 Returns 106 ------- 107 Dict[str, Any] 108 Agent response processed through the self-ask loop 109 """ 110 return self._run_react_loop(query, "self_ask")
Start the self-ask agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response processed through the self-ask loop
113class SelfAskWithSearchLoop(_BaseReactLoop): 114 """Self-ask agent with web search capabilities. 115 116 This agent extends the self-ask pattern with the ability to 117 perform web searches to obtain additional information. 118 It's particularly useful for questions that require current 119 or factual information not available in the model's training data. 120 121 Attributes 122 ---------- 123 _handle_search_query : callable 124 Method for handling web search queries 125 """ 126 127 def _handle_search_query(self, iteration: Dict[str, Any], response: Dict[str, Any], messages: List[Dict[str, Any]]) -> None: 128 """Handle web search queries. 129 130 This method processes search queries by executing web searches 131 using the Tavily search engine and integrating the results 132 into the conversation flow. 133 134 Parameters 135 ---------- 136 iteration : Dict[str, Any] 137 Current iteration containing the search query 138 response : Dict[str, Any] 139 Response dictionary to update 140 messages : List[Dict[str, Any]] 141 Message list to update with search results 142 143 Notes 144 ----- 145 This method modifies the response and messages objects directly. 146 Uses the Tavily search engine for web searches. 147 """ 148 from ...tools.websearch import search_web 149 150 query = iteration["search_query"] 151 result = search_web(query, engine="tavily")["text"] 152 iteration["search_result"] = result 153 154 msg = json.dumps({"query_results": result}) 155 messages.append({"role": "user", "content": msg}) 156 response["iterations"].append(iteration) 157 158 def start(self, query: str) -> Dict[str, Any]: 159 """Start the self-ask with search agentic loop. 160 161 Parameters 162 ---------- 163 query : str 164 User query to process 165 166 Returns 167 ------- 168 Dict[str, Any] 169 Agent response processed through the self-ask with search loop 170 171 Notes 172 ----- 173 This agent uses a custom handler for web search queries, allowing 174 the model to obtain up-to-date information during the process. 175 """ 176 custom_handlers = {"search_query": self._handle_search_query} 177 return self._run_react_loop(query, "self_ask_with_search", custom_handlers)
Self-ask agent with web search capabilities.
This agent extends the self-ask pattern with the ability to perform web searches to obtain additional information. It's particularly useful for questions that require current or factual information not available in the model's training data.
Attributes
- _handle_search_query (callable): Method for handling web search queries
158 def start(self, query: str) -> Dict[str, Any]: 159 """Start the self-ask with search agentic loop. 160 161 Parameters 162 ---------- 163 query : str 164 User query to process 165 166 Returns 167 ------- 168 Dict[str, Any] 169 Agent response processed through the self-ask with search loop 170 171 Notes 172 ----- 173 This agent uses a custom handler for web search queries, allowing 174 the model to obtain up-to-date information during the process. 175 """ 176 custom_handlers = {"search_query": self._handle_search_query} 177 return self._run_react_loop(query, "self_ask_with_search", custom_handlers)
Start the self-ask with search agentic loop.
Parameters
- query (str): User query to process
Returns
- Dict[str, Any]: Agent response processed through the self-ask with search loop
Notes
This agent uses a custom handler for web search queries, allowing the model to obtain up-to-date information during the process.
18class Agent: 19 """ 20 AI Agent that implements different reasoning paradigms for autonomous task execution. 21 22 The Agent class provides a unified interface for creating and using AI agents 23 with various reasoning paradigms. It acts as a high-level wrapper around 24 specific agentic loop implementations, offering a consistent API regardless 25 of the underlying reasoning pattern. 26 27 The agent can be configured with different reasoning approaches, tools, 28 and execution parameters to handle complex tasks autonomously or with 29 human oversight. 30 31 Parameters 32 ---------- 33 model : Model 34 AI model instance to use for agent execution 35 tools : list, optional 36 List of available tools for the agent. Each tool should be a 37 callable function. Default is None. 38 paradigm : str or _AgenticLoop, optional 39 Reasoning paradigm to use. Default is "function_calling". 40 agent_prompt : str, optional 41 Custom prompt for the agent. If None, uses the default prompt 42 for the chosen paradigm. Default is None. 43 name : str, optional 44 Name identifier for the agent. Default is empty string. 45 mcp_servers : list, optional 46 List of MCP (Model Context Protocol) servers to register. 47 Default is None. 48 debug : bool, optional 49 Flag to enable debug output during execution. Default is False. 50 max_iter : int, optional 51 Maximum number of iterations allowed for the agent. 52 If None, there are no limits. Default is None. 53 native_web_search : str, optional 54 Native web search capability level. Must be one of: 55 "low", "medium", or "high". Default is None. 56 human_feedback : str, optional 57 Human feedback mode for controlling agent execution. Can be: 58 - None: No human feedback required (default) 59 - "actions": Pause and request confirmation before executing tool actions 60 - "all": Pause after every step for human review 61 Default is None. 62 63 Attributes 64 ---------- 65 name : str 66 Name identifier for the agent 67 _model : Model 68 The AI model instance used for execution 69 _loop : _AgenticLoop 70 The agentic loop implementation for the chosen paradigm 71 72 Supported Paradigms 73 ------------------- 74 - **function_calling**: Native OpenAI function calling with tool integration 75 - **react**: Reasoning and Acting pattern with iterative problem solving 76 - **react_with_function_calling**: Combines ReAct reasoning with function calling 77 - **plan-and-execute**: Two-phase approach: planning then execution 78 - **programmatic**: Code generation and execution for computational tasks 79 - **reflexion**: Self-reflective reasoning with error correction 80 - **self_ask**: Self-questioning approach for complex reasoning 81 - **self_ask_with_search**: Self-ask with web search capabilities 82 83 Examples 84 -------- 85 Basic function calling agent: 86 >>> from monoai.models import Model 87 >>> from monoai.agents import Agent 88 >>> 89 >>> model = Model(provider="openai", model="gpt-4") 90 >>> agent = Agent(model=model, paradigm="function_calling") 91 >>> response = agent.run("What's the weather like today?") 92 93 ReAct agent with custom tools: 94 >>> def calculator(expression: str) -> str: 95 ... return str(eval(expression)) 96 >>> 97 >>> agent = Agent( 98 ... model=model, 99 ... paradigm="react", 100 ... tools=[calculator], 101 ... name="MathAgent" 102 ... ) 103 >>> response = agent.run("Calculate 15 * 23 + 7") 104 105 Agent with human feedback: 106 >>> agent = Agent( 107 ... model=model, 108 ... paradigm="plan-and-execute", 109 ... human_feedback="actions", 110 ... debug=True 111 ... ) 112 >>> response = agent.run("Plan and execute a data analysis task") 113 114 Agent with MCP servers: 115 >>> mcp_server = McpServer("coingecko", "http", server_url="https://mcp.api.coingecko.com/sse") 116 >>> agent = Agent( 117 ... model=model, 118 ... paradigm="function_calling", 119 ... mcp_servers=[mcp_server] 120 ... ) 121 >>> response = agent.run("What's the price of Bitcoin?") 122 123 Custom paradigm agent: 124 >>> class CustomLoop(_AgenticLoop): 125 ... def start(self, prompt): 126 ... # Custom implementation 127 ... return {"response": "Custom result"} 128 >>> 129 >>> custom_loop = CustomLoop() 130 >>> agent = Agent(model=model, paradigm=custom_loop) 131 >>> response = agent.run("Test custom paradigm") 132 """ 133 134 def __init__(self, model: Model, tools=None, paradigm="function_calling", 135 agent_prompt=None, name="", mcp_servers=None, debug=False, max_iter=None, native_web_search=None, 136 human_feedback=None): 137 """ 138 Initialize the agent with the specified model and configuration. 139 140 Sets up an AI agent with the chosen reasoning paradigm, registers 141 any provided tools, and configures execution parameters. 142 143 Raises 144 ------ 145 ValueError 146 If the specified paradigm is not supported or invalid 147 TypeError 148 If a custom object is passed that doesn't derive from _AgenticLoop 149 """ 150 self._model = model 151 self.name = name 152 153 if native_web_search is not None and native_web_search not in ["low", "medium", "high"]: 154 raise ValueError("native_web_search must be 'low', 'medium' or 'high'") 155 156 if human_feedback is not None and human_feedback not in ["actions", "all"]: 157 raise ValueError("human_feedback must be None, 'actions', or 'all'") 158 159 self._model._web_search = native_web_search 160 self._human_feedback = human_feedback 161 162 # Handle custom paradigm 163 if isinstance(paradigm, _AgenticLoop): 164 # Verify that the custom object is valid 165 if not hasattr(paradigm, 'start') or not callable(paradigm.start): 166 raise TypeError("Custom paradigm must have a callable 'start' method") 167 self._loop = paradigm 168 else: 169 # Predefined paradigms 170 loop_kwargs = self._model, agent_prompt, debug, max_iter, None, human_feedback 171 172 if paradigm == "function_calling": 173 self._loop = FunctionCallingAgenticLoop(*loop_kwargs) 174 elif paradigm == "react": 175 self._loop = ReactAgenticLoop(*loop_kwargs) 176 elif paradigm == "react_with_function_calling": 177 self._loop = ReactWithFCAgenticLoop(*loop_kwargs) 178 elif paradigm == "plan-and-execute": 179 self._loop = PlanAndExecuteAgenticLoop(*loop_kwargs) 180 elif paradigm == "programmatic": 181 self._loop = ProgrammaticAgenticLoop(*loop_kwargs) 182 elif paradigm == "reflexion": 183 self._loop = ReflexionAgenticLoop(*loop_kwargs) 184 elif paradigm == "self_ask": 185 self._loop = SelfAskAgenticLoop(*loop_kwargs) 186 elif paradigm == "self_ask_with_search": 187 self._loop = SelfAskWithSearchLoop(*loop_kwargs) 188 else: 189 raise ValueError(f"Paradigm '{paradigm}' not supported. " 190 f"Available paradigms: function_calling, react, " 191 f"react_with_function_calling, plan-and-execute, " 192 f"programmatic, reflexion, self_ask, self_ask_with_search, " 193 f"or a custom object derived from _AgenticLoop") 194 195 if tools is not None: 196 self._loop.register_tools(tools) 197 198 if mcp_servers is not None: 199 self._loop.register_mcp_servers(mcp_servers) 200 201 def run(self, prompt: str | Prompt): 202 """ 203 Execute the agent with the specified prompt. 204 205 Processes a user prompt through the agent's reasoning paradigm, 206 returning a structured response that includes the reasoning process 207 and final answer. 208 209 Parameters 210 ---------- 211 prompt : str or Prompt 212 User prompt or query to process. Can be a string or a Prompt object. 213 214 Returns 215 ------- 216 Dict[str, Any] 217 Agent response containing: 218 - prompt: Original user query 219 - iterations: List of processed reasoning iterations 220 - response: Final response (if available) 221 - metadata: Additional execution metadata (paradigm-specific) 222 223 Examples 224 -------- 225 >>> agent = Agent(model=model, paradigm="function_calling") 226 >>> response = agent.run("What's the weather in New York?") 227 >>> print(response['response']) 228 229 >>> from monoai.prompts import Prompt 230 >>> prompt = Prompt("Analyze this data: [1, 2, 3, 4, 5]") 231 >>> response = agent.run(prompt) 232 >>> print(response['iterations']) 233 234 Notes 235 ----- 236 The response structure varies by paradigm: 237 - **Function calling**: Includes tool call details and function results 238 - **ReAct**: Includes thought-action-observation cycles 239 - **Plan-and-execute**: Includes planning and execution phases 240 - **Programmatic**: Includes code generation and execution results 241 - **Reflexion**: Includes self-reflection and error correction steps 242 - **Self-ask**: Includes question-answer reasoning chains 243 - **Self-ask with search**: Includes web search results and reasoning 244 - **Custom paradigms**: May include paradigm-specific information 245 """ 246 247 return self._loop.start(prompt) 248 249 def enable_streaming(self, stream_callback=None): 250 """ 251 Enable streaming responses for this agent. 252 253 When streaming is enabled, the agent will call the provided callback 254 function with each content chunk as it's generated, allowing for 255 real-time processing and display of the AI's response. 256 257 Parameters 258 ---------- 259 stream_callback : callable, optional 260 Callback function to handle streaming content chunks. 261 If None, uses a default callback that prints content to console. 262 The callback receives plain text content strings. 263 264 Callback signature: callback(content: str) -> None 265 where content is the streaming text chunk. 266 267 Raises 268 ------ 269 AttributeError 270 If the current paradigm doesn't support streaming 271 272 Examples 273 -------- 274 >>> def my_callback(content): 275 ... print(f"Streaming: {content}") 276 >>> 277 >>> agent = Agent(model=model, paradigm="function_calling") 278 >>> agent.enable_streaming(my_callback) 279 >>> response = agent.run("Tell me a story") 280 281 Notes 282 ----- 283 Not all paradigms support streaming. Check the specific paradigm 284 implementation to ensure streaming is available. 285 """ 286 if hasattr(self._loop, 'enable_streaming'): 287 self._loop.enable_streaming(stream_callback) 288 else: 289 raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming") 290 291 def disable_streaming(self): 292 """ 293 Disable streaming responses for this agent. 294 295 After calling this method, the agent will use standard (non-streaming) 296 model execution for all subsequent requests. 297 298 Raises 299 ------ 300 AttributeError 301 If the current paradigm doesn't support streaming 302 303 Examples 304 -------- 305 >>> agent = Agent(model=model, paradigm="function_calling") 306 >>> agent.enable_streaming() 307 >>> # ... use streaming ... 308 >>> agent.disable_streaming() 309 >>> # Now using standard execution 310 311 Notes 312 ----- 313 Not all paradigms support streaming. Check the specific paradigm 314 implementation to ensure streaming is available. 315 """ 316 if hasattr(self._loop, 'disable_streaming'): 317 self._loop.disable_streaming() 318 else: 319 raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")
AI Agent that implements different reasoning paradigms for autonomous task execution.
The Agent class provides a unified interface for creating and using AI agents with various reasoning paradigms. It acts as a high-level wrapper around specific agentic loop implementations, offering a consistent API regardless of the underlying reasoning pattern.
The agent can be configured with different reasoning approaches, tools, and execution parameters to handle complex tasks autonomously or with human oversight.
Parameters
- model (Model): AI model instance to use for agent execution
- tools (list, optional): List of available tools for the agent. Each tool should be a callable function. Default is None.
- paradigm (str or _AgenticLoop, optional): Reasoning paradigm to use. Default is "function_calling".
- agent_prompt (str, optional): Custom prompt for the agent. If None, uses the default prompt for the chosen paradigm. Default is None.
- name (str, optional): Name identifier for the agent. Default is empty string.
- mcp_servers (list, optional): List of MCP (Model Context Protocol) servers to register. Default is None.
- debug (bool, optional): Flag to enable debug output during execution. Default is False.
- max_iter (int, optional): Maximum number of iterations allowed for the agent. If None, there are no limits. Default is None.
- native_web_search (str, optional): Native web search capability level. Must be one of: "low", "medium", or "high". Default is None.
- human_feedback (str, optional):
Human feedback mode for controlling agent execution. Can be:
- None: No human feedback required (default)
- "actions": Pause and request confirmation before executing tool actions
- "all": Pause after every step for human review Default is None.
Attributes
- name (str): Name identifier for the agent
- _model (Model): The AI model instance used for execution
- _loop (_AgenticLoop): The agentic loop implementation for the chosen paradigm
Supported Paradigms
- function_calling: Native OpenAI function calling with tool integration
- react: Reasoning and Acting pattern with iterative problem solving
- react_with_function_calling: Combines ReAct reasoning with function calling
- plan-and-execute: Two-phase approach: planning then execution
- programmatic: Code generation and execution for computational tasks
- reflexion: Self-reflective reasoning with error correction
- self_ask: Self-questioning approach for complex reasoning
- self_ask_with_search: Self-ask with web search capabilities
Examples
Basic function calling agent:
>>> from monoai.models import Model
>>> from monoai.agents import Agent
>>>
>>> model = Model(provider="openai", model="gpt-4")
>>> agent = Agent(model=model, paradigm="function_calling")
>>> response = agent.run("What's the weather like today?")
ReAct agent with custom tools:
>>> def calculator(expression: str) -> str:
... return str(eval(expression))
>>>
>>> agent = Agent(
... model=model,
... paradigm="react",
... tools=[calculator],
... name="MathAgent"
... )
>>> response = agent.run("Calculate 15 * 23 + 7")
Agent with human feedback:
>>> agent = Agent(
... model=model,
... paradigm="plan-and-execute",
... human_feedback="actions",
... debug=True
... )
>>> response = agent.run("Plan and execute a data analysis task")
Agent with MCP servers:
>>> mcp_server = McpServer("coingecko", "http", server_url="https://mcp.api.coingecko.com/sse")
>>> agent = Agent(
... model=model,
... paradigm="function_calling",
... mcp_servers=[mcp_server]
... )
>>> response = agent.run("What's the price of Bitcoin?")
Custom paradigm agent:
>>> class CustomLoop(_AgenticLoop):
... def start(self, prompt):
... # Custom implementation
... return {"response": "Custom result"}
>>>
>>> custom_loop = CustomLoop()
>>> agent = Agent(model=model, paradigm=custom_loop)
>>> response = agent.run("Test custom paradigm")
134 def __init__(self, model: Model, tools=None, paradigm="function_calling", 135 agent_prompt=None, name="", mcp_servers=None, debug=False, max_iter=None, native_web_search=None, 136 human_feedback=None): 137 """ 138 Initialize the agent with the specified model and configuration. 139 140 Sets up an AI agent with the chosen reasoning paradigm, registers 141 any provided tools, and configures execution parameters. 142 143 Raises 144 ------ 145 ValueError 146 If the specified paradigm is not supported or invalid 147 TypeError 148 If a custom object is passed that doesn't derive from _AgenticLoop 149 """ 150 self._model = model 151 self.name = name 152 153 if native_web_search is not None and native_web_search not in ["low", "medium", "high"]: 154 raise ValueError("native_web_search must be 'low', 'medium' or 'high'") 155 156 if human_feedback is not None and human_feedback not in ["actions", "all"]: 157 raise ValueError("human_feedback must be None, 'actions', or 'all'") 158 159 self._model._web_search = native_web_search 160 self._human_feedback = human_feedback 161 162 # Handle custom paradigm 163 if isinstance(paradigm, _AgenticLoop): 164 # Verify that the custom object is valid 165 if not hasattr(paradigm, 'start') or not callable(paradigm.start): 166 raise TypeError("Custom paradigm must have a callable 'start' method") 167 self._loop = paradigm 168 else: 169 # Predefined paradigms 170 loop_kwargs = self._model, agent_prompt, debug, max_iter, None, human_feedback 171 172 if paradigm == "function_calling": 173 self._loop = FunctionCallingAgenticLoop(*loop_kwargs) 174 elif paradigm == "react": 175 self._loop = ReactAgenticLoop(*loop_kwargs) 176 elif paradigm == "react_with_function_calling": 177 self._loop = ReactWithFCAgenticLoop(*loop_kwargs) 178 elif paradigm == "plan-and-execute": 179 self._loop = PlanAndExecuteAgenticLoop(*loop_kwargs) 180 elif paradigm == "programmatic": 181 self._loop = ProgrammaticAgenticLoop(*loop_kwargs) 182 elif paradigm == "reflexion": 183 self._loop = ReflexionAgenticLoop(*loop_kwargs) 184 elif paradigm == "self_ask": 185 self._loop = SelfAskAgenticLoop(*loop_kwargs) 186 elif paradigm == "self_ask_with_search": 187 self._loop = SelfAskWithSearchLoop(*loop_kwargs) 188 else: 189 raise ValueError(f"Paradigm '{paradigm}' not supported. " 190 f"Available paradigms: function_calling, react, " 191 f"react_with_function_calling, plan-and-execute, " 192 f"programmatic, reflexion, self_ask, self_ask_with_search, " 193 f"or a custom object derived from _AgenticLoop") 194 195 if tools is not None: 196 self._loop.register_tools(tools) 197 198 if mcp_servers is not None: 199 self._loop.register_mcp_servers(mcp_servers)
Initialize the agent with the specified model and configuration.
Sets up an AI agent with the chosen reasoning paradigm, registers any provided tools, and configures execution parameters.
Raises
- ValueError: If the specified paradigm is not supported or invalid
- TypeError: If a custom object is passed that doesn't derive from _AgenticLoop
201 def run(self, prompt: str | Prompt): 202 """ 203 Execute the agent with the specified prompt. 204 205 Processes a user prompt through the agent's reasoning paradigm, 206 returning a structured response that includes the reasoning process 207 and final answer. 208 209 Parameters 210 ---------- 211 prompt : str or Prompt 212 User prompt or query to process. Can be a string or a Prompt object. 213 214 Returns 215 ------- 216 Dict[str, Any] 217 Agent response containing: 218 - prompt: Original user query 219 - iterations: List of processed reasoning iterations 220 - response: Final response (if available) 221 - metadata: Additional execution metadata (paradigm-specific) 222 223 Examples 224 -------- 225 >>> agent = Agent(model=model, paradigm="function_calling") 226 >>> response = agent.run("What's the weather in New York?") 227 >>> print(response['response']) 228 229 >>> from monoai.prompts import Prompt 230 >>> prompt = Prompt("Analyze this data: [1, 2, 3, 4, 5]") 231 >>> response = agent.run(prompt) 232 >>> print(response['iterations']) 233 234 Notes 235 ----- 236 The response structure varies by paradigm: 237 - **Function calling**: Includes tool call details and function results 238 - **ReAct**: Includes thought-action-observation cycles 239 - **Plan-and-execute**: Includes planning and execution phases 240 - **Programmatic**: Includes code generation and execution results 241 - **Reflexion**: Includes self-reflection and error correction steps 242 - **Self-ask**: Includes question-answer reasoning chains 243 - **Self-ask with search**: Includes web search results and reasoning 244 - **Custom paradigms**: May include paradigm-specific information 245 """ 246 247 return self._loop.start(prompt)
Execute the agent with the specified prompt.
Processes a user prompt through the agent's reasoning paradigm, returning a structured response that includes the reasoning process and final answer.
Parameters
- prompt (str or Prompt): User prompt or query to process. Can be a string or a Prompt object.
Returns
- Dict[str, Any]: Agent response containing:
- prompt: Original user query
- iterations: List of processed reasoning iterations
- response: Final response (if available)
- metadata: Additional execution metadata (paradigm-specific)
Examples
>>> agent = Agent(model=model, paradigm="function_calling")
>>> response = agent.run("What's the weather in New York?")
>>> print(response['response'])
>>> from monoai.prompts import Prompt
>>> prompt = Prompt("Analyze this data: [1, 2, 3, 4, 5]")
>>> response = agent.run(prompt)
>>> print(response['iterations'])
Notes
The response structure varies by paradigm:
- Function calling: Includes tool call details and function results
- ReAct: Includes thought-action-observation cycles
- Plan-and-execute: Includes planning and execution phases
- Programmatic: Includes code generation and execution results
- Reflexion: Includes self-reflection and error correction steps
- Self-ask: Includes question-answer reasoning chains
- Self-ask with search: Includes web search results and reasoning
- Custom paradigms: May include paradigm-specific information
249 def enable_streaming(self, stream_callback=None): 250 """ 251 Enable streaming responses for this agent. 252 253 When streaming is enabled, the agent will call the provided callback 254 function with each content chunk as it's generated, allowing for 255 real-time processing and display of the AI's response. 256 257 Parameters 258 ---------- 259 stream_callback : callable, optional 260 Callback function to handle streaming content chunks. 261 If None, uses a default callback that prints content to console. 262 The callback receives plain text content strings. 263 264 Callback signature: callback(content: str) -> None 265 where content is the streaming text chunk. 266 267 Raises 268 ------ 269 AttributeError 270 If the current paradigm doesn't support streaming 271 272 Examples 273 -------- 274 >>> def my_callback(content): 275 ... print(f"Streaming: {content}") 276 >>> 277 >>> agent = Agent(model=model, paradigm="function_calling") 278 >>> agent.enable_streaming(my_callback) 279 >>> response = agent.run("Tell me a story") 280 281 Notes 282 ----- 283 Not all paradigms support streaming. Check the specific paradigm 284 implementation to ensure streaming is available. 285 """ 286 if hasattr(self._loop, 'enable_streaming'): 287 self._loop.enable_streaming(stream_callback) 288 else: 289 raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")
Enable streaming responses for this agent.
When streaming is enabled, the agent will call the provided callback function with each content chunk as it's generated, allowing for real-time processing and display of the AI's response.
Parameters
stream_callback (callable, optional): Callback function to handle streaming content chunks. If None, uses a default callback that prints content to console. The callback receives plain text content strings.
Callback signature: callback(content: str) -> None where content is the streaming text chunk.
Raises
- AttributeError: If the current paradigm doesn't support streaming
Examples
>>> def my_callback(content):
... print(f"Streaming: {content}")
>>>
>>> agent = Agent(model=model, paradigm="function_calling")
>>> agent.enable_streaming(my_callback)
>>> response = agent.run("Tell me a story")
Notes
Not all paradigms support streaming. Check the specific paradigm implementation to ensure streaming is available.
291 def disable_streaming(self): 292 """ 293 Disable streaming responses for this agent. 294 295 After calling this method, the agent will use standard (non-streaming) 296 model execution for all subsequent requests. 297 298 Raises 299 ------ 300 AttributeError 301 If the current paradigm doesn't support streaming 302 303 Examples 304 -------- 305 >>> agent = Agent(model=model, paradigm="function_calling") 306 >>> agent.enable_streaming() 307 >>> # ... use streaming ... 308 >>> agent.disable_streaming() 309 >>> # Now using standard execution 310 311 Notes 312 ----- 313 Not all paradigms support streaming. Check the specific paradigm 314 implementation to ensure streaming is available. 315 """ 316 if hasattr(self._loop, 'disable_streaming'): 317 self._loop.disable_streaming() 318 else: 319 raise AttributeError(f"Paradigm '{self._loop.__class__.__name__}' doesn't support streaming")
Disable streaming responses for this agent.
After calling this method, the agent will use standard (non-streaming) model execution for all subsequent requests.
Raises
- AttributeError: If the current paradigm doesn't support streaming
Examples
>>> agent = Agent(model=model, paradigm="function_calling")
>>> agent.enable_streaming()
>>> # ... use streaming ...
>>> agent.disable_streaming()
>>> # Now using standard execution
Notes
Not all paradigms support streaming. Check the specific paradigm implementation to ensure streaming is available.