monoai.application
The Application module provides a high-level interface for building and serving AI applications incorporating models and agents.
11class Application: 12 """ 13 FastAPI-based application for serving AI models and agents. 14 15 The Application class provides a complete web service wrapper around AI models 16 and agents, offering REST API endpoints, WebSocket support, rate limiting, 17 and user validation capabilities. 18 19 This class automatically creates FastAPI endpoints based on the configured 20 models and agents, handling request validation, rate limiting, and response 21 formatting. 22 23 Examples 24 -------- 25 Basic usage with a model: 26 ``` 27 from monoai.models import Model 28 from monoai.application import Application 29 30 model = Model(provider="openai", model="gpt-4o-mini") 31 app = Application(name="MyAIApp", model=model) 32 app.serve(port=8000) 33 ``` 34 35 With agents and rate limiting: 36 ``` 37 from monoai.models import Model 38 from monoai.agents import Agent 39 from monoai.application import Application, RateLimiter 40 41 model = Model(provider="openai", model="gpt-4o-mini") 42 agent = Agent(model=model, paradigm="react") 43 rate_limiter = RateLimiter(requests_per_minute=60) 44 45 app = Application( 46 name="AgentApp", 47 agents=[agent], 48 rate_limiter=rate_limiter 49 ) 50 app.serve(port=8000) 51 ``` 52 53 With user validation: 54 ``` 55 def validate_user(user_id: str): 56 # Custom validation logic 57 if user_id.startswith("user_"): 58 return True 59 elif user_id.isdigit(): 60 return f"user_{user_id}" # Normalize 61 return False 62 63 app = Application( 64 name="SecureApp", 65 model=model, 66 user_validator=validate_user 67 ) 68 ``` 69 """ 70 71 def __init__(self, name: str, model: Optional[Model] = None, agents: Optional[List[Agent]] = None, 72 rate_limiter: Optional[RateLimiter] = None, user_validator: Optional[Callable[[str], Union[bool, str]]] = None): 73 """ 74 Initialize the application. 75 76 Parameters 77 ---------- 78 name : str 79 Application name. Used in API responses and logging. 80 model : Optional[Model], default None 81 AI model to use. If provided, creates /model endpoints. 82 The model will be available at POST /model and POST /model/stream. 83 agents : Optional[List[Agent]], default None 84 List of available agents. Each agent must have a unique name. 85 Creates /agent/{agent_name} endpoints for each agent. 86 rate_limiter : Optional[RateLimiter], default None 87 Rate limiter to control API usage. Applies to all endpoints. 88 If not provided, no rate limiting is enforced. 89 user_validator : Optional[Callable[[str], Union[bool, str]]], default None 90 Function to validate user_id from requests. Must return: 91 - True: user_id is valid and accepted as-is 92 - False: user_id is invalid, will fallback to IP-based identification 93 - str: user_id is valid but normalized (e.g. "user123" -> "user_123") 94 95 Notes 96 ----- 97 At least one of model or agents must be provided to create useful endpoints. 98 If neither is provided, only meta endpoints (/, /health) will be available. 99 100 The user_validator function is called for every request that includes 101 a user_id in the request body. If validation fails, the application 102 falls back to using the client IP address for rate limiting. 103 """ 104 self.name = name 105 self._model = model 106 self._agents: Optional[Dict[str, Agent]] = ( 107 {a.name: a for a in agents} if agents else None 108 ) 109 self._rate_limiter = rate_limiter 110 self._user_validator = user_validator 111 self._started_at = datetime.now(timezone.utc) 112 113 @staticmethod 114 async def _maybe_await(fn: Callable[..., Any], *args, **kwargs) -> Any: 115 """ 116 Execute a function and await it if it's a coroutine. 117 118 This helper method allows the application to work with both 119 synchronous and asynchronous model/agent methods. 120 121 Parameters 122 ---------- 123 fn : Callable[..., Any] 124 Function to execute 125 *args 126 Positional arguments to pass to the function 127 **kwargs 128 Keyword arguments to pass to the function 129 130 Returns 131 ------- 132 Any 133 Result of the function execution 134 """ 135 result = fn(*args, **kwargs) 136 if hasattr(result, "__await__"): 137 return await result 138 return result 139 140 def _get_user_identifier(self, request, data: Dict[str, Any]) -> str: 141 """ 142 Extract user identifier from the request. 143 144 This method implements a multi-step user identification process: 145 1. Look for user_id in the request body 146 2. Validate user_id using the configured validator (if any) 147 3. Fall back to client IP address if user_id is invalid or missing 148 149 The method handles various proxy headers (X-Forwarded-For, X-Real-IP) 150 to get the real client IP when behind load balancers or proxies. 151 152 Parameters 153 ---------- 154 request : Request 155 FastAPI request object (can be HTTP Request or WebSocket) 156 data : Dict[str, Any] 157 Request body data containing potential user_id 158 159 Returns 160 ------- 161 str 162 User identifier in one of these formats: 163 - "user_id" if user_id is provided and valid 164 - "ip:192.168.1.1" if using IP-based identification 165 - "ip:unknown" if IP cannot be determined 166 167 Notes 168 ----- 169 The user identifier is used for rate limiting and request tracking. 170 IP-based identifiers are prefixed with "ip:" to distinguish them 171 from actual user IDs. 172 """ 173 # Look for user_id in the request body 174 user_id = data.get("user_id") 175 if user_id: 176 user_id_str = str(user_id) 177 178 # Validate user_id if validator is configured 179 if self._user_validator: 180 try: 181 validation_result = self._user_validator(user_id_str) 182 183 if validation_result is True: 184 # user_id is valid, use as is 185 return user_id_str 186 elif validation_result is False: 187 # user_id is invalid, fallback to IP 188 pass 189 elif isinstance(validation_result, str): 190 # user_id is valid but normalized 191 return validation_result 192 else: 193 # Unknown validation result, fallback to IP 194 pass 195 except Exception: 196 # Error during validation, fallback to IP 197 pass 198 else: 199 # No validator configured, use user_id as is 200 return user_id_str 201 202 # Fallback to client IP 203 # Try different headers to get the real IP 204 forwarded_for = request.headers.get("X-Forwarded-For") 205 if forwarded_for: 206 # X-Forwarded-For can contain multiple IPs, take the first one 207 client_ip = forwarded_for.split(",")[0].strip() 208 else: 209 real_ip = request.headers.get("X-Real-IP") 210 if real_ip: 211 client_ip = real_ip 212 else: 213 # Handle both Request and WebSocket 214 if hasattr(request, 'client') and request.client: 215 client_ip = request.client.host 216 elif hasattr(request, 'url') and hasattr(request.url, 'hostname'): 217 client_ip = request.url.hostname 218 else: 219 client_ip = "unknown" 220 221 return f"ip:{client_ip}" 222 223 def validate_user_id(self, user_id: str) -> Union[bool, str]: 224 """ 225 Validate a user_id using the configured validator. 226 227 This method provides a safe way to validate user IDs, handling 228 any exceptions that might occur during validation. 229 230 Parameters 231 ---------- 232 user_id : str 233 user_id to validate 234 235 Returns 236 ------- 237 Union[bool, str] 238 - True: user_id is valid and accepted as-is 239 - False: user_id is invalid or validation failed 240 - str: user_id is valid but normalized (use this value instead) 241 242 Notes 243 ----- 244 If no validator is configured, this method always returns True. 245 Any exceptions during validation are caught and result in False. 246 """ 247 if not self._user_validator: 248 return True # No validator, always consider valid 249 250 try: 251 return self._user_validator(user_id) 252 except Exception: 253 return False # Error during validation, consider invalid 254 255 def _build_app(self): 256 """ 257 Build and configure the FastAPI application. 258 259 This method creates a FastAPI app with all the necessary endpoints, 260 middleware, and error handling based on the configured models and agents. 261 262 Returns 263 ------- 264 FastAPI 265 Configured FastAPI application instance 266 267 Raises 268 ------ 269 ImportError 270 If FastAPI is not installed 271 272 Notes 273 ----- 274 The method dynamically creates endpoints based on what's configured: 275 - Model endpoints are created if a model is provided 276 - Agent endpoints are created if agents are provided 277 - Meta endpoints (/, /health) are always created 278 - Rate limiting and user validation are applied to all endpoints 279 """ 280 281 try: 282 from fastapi import FastAPI, Request, HTTPException, status 283 except ImportError as e: 284 raise ImportError( 285 "fastapi is required to build the application. " 286 "Install it with: pip install fastapi" 287 ) from e 288 289 from fastapi.middleware.cors import CORSMiddleware 290 291 app = FastAPI(title=self.name) 292 293 app.add_middleware( 294 CORSMiddleware, 295 allow_origins=["*"], 296 allow_credentials=True, 297 allow_methods=["*"], 298 allow_headers=["*"], 299 ) 300 301 @app.get("/", tags=["meta"], summary="Ping app") 302 async def root(): 303 return { 304 "msg": f"App {self.name} successfully started", 305 "started_at": self._started_at.isoformat() + "Z", 306 } 307 308 @app.get("/health", tags=["meta"], summary="Health check") 309 async def health(): 310 return {"status": "ok", "app": self.name} 311 312 @app.get("/rate-limit/stats", tags=["rate-limit"], summary="Rate limiter statistics") 313 async def rate_limit_stats(): 314 if not self._rate_limiter: 315 return {"message": "Rate limiter not configured"} 316 317 stats = self._rate_limiter.get_stats() 318 return { 319 "rate_limiter": str(self._rate_limiter), 320 "global_stats": stats 321 } 322 323 @app.get("/rate-limit/stats/{user_id}", tags=["rate-limit"], summary="Statistics for a specific user") 324 async def rate_limit_user_stats(user_id: str): 325 if not self._rate_limiter: 326 return {"message": "Rate limiter not configured"} 327 328 user_stats = self._rate_limiter.get_stats(user_id) 329 usage = self._rate_limiter.get_usage(user_id) 330 remaining = self._rate_limiter.get_remaining(user_id) 331 332 return { 333 "user_id": user_id, 334 "usage": usage, 335 "remaining": remaining, 336 "stats": user_stats 337 } 338 339 @app.post("/validate-user", tags=["auth"], summary="Validate a user_id") 340 async def validate_user_endpoint(request: Request): 341 try: 342 data = await request.json() 343 except Exception: 344 raise HTTPException(status_code=400, detail="Invalid JSON body") 345 346 user_id = data.get("user_id") 347 if not user_id: 348 raise HTTPException(status_code=400, detail="'user_id' is required") 349 350 validation_result = self.validate_user_id(str(user_id)) 351 352 if validation_result is True: 353 return { 354 "valid": True, 355 "user_id": user_id, 356 "normalized": None 357 } 358 elif validation_result is False: 359 return { 360 "valid": False, 361 "user_id": user_id, 362 "error": "Invalid user_id" 363 } 364 elif isinstance(validation_result, str): 365 return { 366 "valid": True, 367 "user_id": user_id, 368 "normalized": validation_result 369 } 370 else: 371 return { 372 "valid": False, 373 "user_id": user_id, 374 "error": "Unknown validation result" 375 } 376 377 if self._model is not None: 378 @app.post( 379 "/model", 380 tags=["model"], 381 summary="Ask the model", 382 status_code=status.HTTP_200_OK, 383 ) 384 async def model_route(request: Request): 385 try: 386 data = await request.json() 387 except Exception: 388 raise HTTPException(status_code=400, detail="Invalid JSON body") 389 390 prompt = (data or {}).get("prompt") 391 if not prompt: 392 raise HTTPException(status_code=400, detail="'prompt' is required") 393 394 # Extract user identifier 395 user_identifier = self._get_user_identifier(request, data or {}) 396 397 # Execute the request to the model 398 result = await self._maybe_await(self._model.ask, prompt) 399 400 # Check and update rate limit if configured 401 if self._rate_limiter: 402 # Check rate limit based on the response 403 if not self._rate_limiter.check_with_response(user_identifier, result): 404 raise HTTPException( 405 status_code=429, 406 detail="Rate limit exceeded. Please try again later." 407 ) 408 409 # Update the rate limiter with the response 410 self._rate_limiter.update_with_response(user_identifier, result) 411 412 return result 413 414 if self._agents is not None: 415 @app.post( 416 "/agent/{agent_name}", 417 tags=["agents"], 418 summary="Execute an agent", 419 status_code=status.HTTP_200_OK, 420 ) 421 async def agent_route(agent_name: str, request: Request): 422 if agent_name not in self._agents: 423 raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found") 424 425 try: 426 data = await request.json() 427 except Exception: 428 raise HTTPException(status_code=400, detail="Invalid JSON body") 429 430 prompt = (data or {}).get("prompt") 431 if not prompt: 432 raise HTTPException(status_code=400, detail="'prompt' is required") 433 434 # Extract user identifier 435 user_identifier = self._get_user_identifier(request, data or {}) 436 437 # Execute the agent 438 agent = self._agents[agent_name] 439 result = await self._maybe_await(agent.run, prompt) 440 441 # Check and update rate limit if configured 442 if self._rate_limiter: 443 # Check rate limit based on the response 444 if not self._rate_limiter.check_with_response(user_identifier, result): 445 raise HTTPException( 446 status_code=429, 447 detail="Rate limit exceeded. Please try again later." 448 ) 449 450 # Update the rate limiter with the response 451 self._rate_limiter.update_with_response(user_identifier, result) 452 453 return result 454 455 # Model streaming endpoint 456 if self._model is not None: 457 @app.post( 458 "/model/stream", 459 tags=["model"], 460 summary="Ask the model with streaming", 461 status_code=status.HTTP_200_OK, 462 ) 463 async def model_stream_route(request: Request): 464 try: 465 data = await request.json() 466 except Exception: 467 raise HTTPException(status_code=400, detail="Invalid JSON body") 468 469 prompt = (data or {}).get("prompt") 470 if not prompt: 471 raise HTTPException(status_code=400, detail="'prompt' is required") 472 473 # Extract user identifier 474 user_identifier = self._get_user_identifier(request, data or {}) 475 476 # Create a function to handle streaming 477 def stream_handler(content: str): 478 return content 479 480 # Enable streaming on the model 481 if hasattr(self._model, 'enable_streaming'): 482 self._model.enable_streaming(stream_handler) 483 484 # Execute the request to the model with streaming 485 result = await self._maybe_await(self._model.ask, prompt) 486 487 # Check and update rate limit if configured 488 if self._rate_limiter: 489 # Check rate limit based on the response 490 if not self._rate_limiter.check_with_response(user_identifier, result): 491 raise HTTPException( 492 status_code=429, 493 detail="Rate limit exceeded. Please try again later." 494 ) 495 496 # Update the rate limiter with the response 497 self._rate_limiter.update_with_response(user_identifier, result) 498 499 return result 500 501 # Agent streaming endpoint 502 if self._agents is not None: 503 @app.post( 504 "/agent/{agent_name}/stream", 505 tags=["agents"], 506 summary="Execute an agent with streaming", 507 status_code=status.HTTP_200_OK, 508 ) 509 async def agent_stream_route(agent_name: str, request: Request): 510 if agent_name not in self._agents: 511 raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found") 512 513 try: 514 data = await request.json() 515 except Exception: 516 raise HTTPException(status_code=400, detail="Invalid JSON body") 517 518 prompt = (data or {}).get("prompt") 519 if not prompt: 520 raise HTTPException(status_code=400, detail="'prompt' is required") 521 522 # Extract user identifier 523 user_identifier = self._get_user_identifier(request, data or {}) 524 525 # Create a function to handle streaming 526 def stream_handler(content: str): 527 return content 528 529 # Enable streaming on the agent 530 agent = self._agents[agent_name] 531 if hasattr(agent, 'enable_streaming'): 532 agent.enable_streaming(stream_handler) 533 534 # Execute the agent with streaming 535 result = await self._maybe_await(agent.run, prompt) 536 537 # Check and update rate limit if configured 538 if self._rate_limiter: 539 # Check rate limit based on the response 540 if not self._rate_limiter.check_with_response(user_identifier, result): 541 raise HTTPException( 542 status_code=429, 543 detail="Rate limit exceeded. Please try again later." 544 ) 545 546 # Update the rate limiter with the response 547 self._rate_limiter.update_with_response(user_identifier, result) 548 549 return result 550 551 # WebSocket endpoint for real-time streaming 552 if self._model is not None: 553 @app.websocket("/model/ws") 554 async def model_websocket(websocket): 555 try: 556 from fastapi import WebSocket 557 await websocket.accept() 558 559 while True: 560 # Receive prompt from client 561 data = await websocket.receive_text() 562 try: 563 request_data = json.loads(data) 564 except json.JSONDecodeError: 565 await websocket.send_text(json.dumps({"error": "Invalid JSON"})) 566 continue 567 568 prompt = request_data.get("prompt") 569 if not prompt: 570 await websocket.send_text(json.dumps({"error": "Prompt is required"})) 571 continue 572 573 # Extract user identifier 574 user_identifier = self._get_user_identifier(websocket, request_data) 575 576 # Create a handler to send chunks via WebSocket 577 def ws_stream_handler(content: str): 578 asyncio.create_task(websocket.send_text(json.dumps({ 579 "type": "chunk", 580 "content": content 581 }))) 582 583 # Enable streaming on the model 584 if hasattr(self._model, 'enable_streaming'): 585 self._model.enable_streaming(ws_stream_handler) 586 587 # Execute the request to the model 588 result = await self._maybe_await(self._model.ask, prompt) 589 590 # Send the final result 591 await websocket.send_text(json.dumps({ 592 "type": "complete", 593 "result": result 594 })) 595 596 except Exception as e: 597 await websocket.send_text(json.dumps({"error": str(e)})) 598 finally: 599 await websocket.close() 600 601 if self._agents is not None: 602 @app.websocket("/agent/{agent_name}/ws") 603 async def agent_websocket(websocket, agent_name: str): 604 try: 605 from fastapi import WebSocket 606 await websocket.accept() 607 608 if agent_name not in self._agents: 609 await websocket.send_text(json.dumps({"error": f"Agent '{agent_name}' not found"})) 610 await websocket.close() 611 return 612 613 agent = self._agents[agent_name] 614 615 while True: 616 # Receive prompt from client 617 data = await websocket.receive_text() 618 try: 619 request_data = json.loads(data) 620 except json.JSONDecodeError: 621 await websocket.send_text(json.dumps({"error": "Invalid JSON"})) 622 continue 623 624 prompt = request_data.get("prompt") 625 if not prompt: 626 await websocket.send_text(json.dumps({"error": "Prompt is required"})) 627 continue 628 629 # Extract user identifier 630 user_identifier = self._get_user_identifier(websocket, request_data) 631 632 # Create a handler to send chunks via WebSocket 633 def ws_stream_handler(content: str): 634 asyncio.create_task(websocket.send_text(json.dumps({ 635 "type": "chunk", 636 "content": content 637 }))) 638 639 # Enable streaming on the agent 640 if hasattr(agent, 'enable_streaming'): 641 agent.enable_streaming(ws_stream_handler) 642 643 # Execute the agent 644 result = await self._maybe_await(agent.run, prompt) 645 646 # Send the final result 647 await websocket.send_text(json.dumps({ 648 "type": "complete", 649 "result": result 650 })) 651 652 except Exception as e: 653 await websocket.send_text(json.dumps({"error": str(e)})) 654 finally: 655 await websocket.close() 656 657 return app 658 659 def serve( 660 self, 661 host: str = "0.0.0.0", 662 port: int = 8000, 663 reload: bool = False, 664 workers: Optional[int] = None, 665 log_level: str = "info", 666 ): 667 668 """ 669 Serve the application creating endpoints for the model and agents. 670 671 This method starts a uvicorn server with the configured FastAPI application. 672 The server provides REST API and WebSocket endpoints for interacting with 673 AI models and agents. 674 675 API Endpoints 676 ------------- 677 When the server is running, the following endpoints are available: 678 679 **Model Endpoints** (if model is configured): 680 - POST /model Ask the model 681 - POST /model/stream Ask the model with streaming 682 - WS /model/ws Ask the model with WebSocket streaming 683 684 **Agent Endpoints** (if agents are configured): 685 - POST /agent/{agent_name} Execute an agent 686 - POST /agent/{agent_name}/stream Execute an agent with streaming 687 - WS /agent/{agent_name}/ws Execute an agent with WebSocket streaming 688 689 **Authentication & Validation:** 690 - POST /validate-user Validate a user_id 691 692 **Rate Limiting & Monitoring:** 693 - GET /rate-limit/stats Rate limiter statistics 694 - GET /rate-limit/stats/{user_id} Statistics for a specific user 695 696 **Meta & Health:** 697 - GET / Ping the application 698 - GET /health Health check 699 700 **Request Format:** 701 All POST endpoints expect JSON with a "prompt" field: 702 ```json 703 { 704 "prompt": "Your question here", 705 "user_id": "optional_user_id" 706 } 707 ``` 708 709 **Response Format:** 710 - Model responses: Standard model response format 711 - Agent responses: Agent execution result with iterations 712 - WebSocket: JSON messages with "type" and "content" fields 713 714 Parameters 715 ---------- 716 host : str, default "0.0.0.0" 717 Host to serve the application. Use "0.0.0.0" for external access. 718 port : int, default 8000 719 Port to serve the application on. 720 reload : bool, default False 721 Whether to reload the application when code changes are detected. 722 Useful for development, not recommended for production. 723 workers : Optional[int], default None 724 Number of worker processes to use. If None, uses uvicorn default. 725 log_level : str, default "info" 726 Log level for uvicorn. Options: "critical", "error", "warning", "info", "debug". 727 728 Raises 729 ------ 730 ImportError 731 If uvicorn is not installed 732 733 Examples 734 -------- 735 Basic serving: 736 ``` 737 app = Application(name="MyApp", model=model) 738 app.serve() # Serves on http://localhost:8000 739 ``` 740 741 Production serving: 742 ``` 743 app.serve(host="0.0.0.0", port=8080, workers=4, log_level="warning") 744 ``` 745 746 Development serving: 747 ``` 748 app.serve(reload=True, log_level="debug") 749 ``` 750 """ 751 752 try: 753 import uvicorn 754 except ImportError as e: 755 raise ImportError( 756 "uvicorn is required to serve the application. " 757 "Install it with: pip install uvicorn" 758 ) from e 759 760 uvicorn.run( 761 self._build_app(), 762 host=host, 763 port=port, 764 reload=reload, 765 workers=workers, 766 log_level=log_level, 767 )
FastAPI-based application for serving AI models and agents.
The Application class provides a complete web service wrapper around AI models and agents, offering REST API endpoints, WebSocket support, rate limiting, and user validation capabilities.
This class automatically creates FastAPI endpoints based on the configured models and agents, handling request validation, rate limiting, and response formatting.
Examples
Basic usage with a model:
from monoai.models import Model
from monoai.application import Application
model = Model(provider="openai", model="gpt-4o-mini")
app = Application(name="MyAIApp", model=model)
app.serve(port=8000)
With agents and rate limiting:
from monoai.models import Model
from monoai.agents import Agent
from monoai.application import Application, RateLimiter
model = Model(provider="openai", model="gpt-4o-mini")
agent = Agent(model=model, paradigm="react")
rate_limiter = RateLimiter(requests_per_minute=60)
app = Application(
name="AgentApp",
agents=[agent],
rate_limiter=rate_limiter
)
app.serve(port=8000)
With user validation:
def validate_user(user_id: str):
# Custom validation logic
if user_id.startswith("user_"):
return True
elif user_id.isdigit():
return f"user_{user_id}" # Normalize
return False
app = Application(
name="SecureApp",
model=model,
user_validator=validate_user
)
71 def __init__(self, name: str, model: Optional[Model] = None, agents: Optional[List[Agent]] = None, 72 rate_limiter: Optional[RateLimiter] = None, user_validator: Optional[Callable[[str], Union[bool, str]]] = None): 73 """ 74 Initialize the application. 75 76 Parameters 77 ---------- 78 name : str 79 Application name. Used in API responses and logging. 80 model : Optional[Model], default None 81 AI model to use. If provided, creates /model endpoints. 82 The model will be available at POST /model and POST /model/stream. 83 agents : Optional[List[Agent]], default None 84 List of available agents. Each agent must have a unique name. 85 Creates /agent/{agent_name} endpoints for each agent. 86 rate_limiter : Optional[RateLimiter], default None 87 Rate limiter to control API usage. Applies to all endpoints. 88 If not provided, no rate limiting is enforced. 89 user_validator : Optional[Callable[[str], Union[bool, str]]], default None 90 Function to validate user_id from requests. Must return: 91 - True: user_id is valid and accepted as-is 92 - False: user_id is invalid, will fallback to IP-based identification 93 - str: user_id is valid but normalized (e.g. "user123" -> "user_123") 94 95 Notes 96 ----- 97 At least one of model or agents must be provided to create useful endpoints. 98 If neither is provided, only meta endpoints (/, /health) will be available. 99 100 The user_validator function is called for every request that includes 101 a user_id in the request body. If validation fails, the application 102 falls back to using the client IP address for rate limiting. 103 """ 104 self.name = name 105 self._model = model 106 self._agents: Optional[Dict[str, Agent]] = ( 107 {a.name: a for a in agents} if agents else None 108 ) 109 self._rate_limiter = rate_limiter 110 self._user_validator = user_validator 111 self._started_at = datetime.now(timezone.utc)
Initialize the application.
Parameters
- name (str): Application name. Used in API responses and logging.
- model (Optional[Model], default None): AI model to use. If provided, creates /model endpoints. The model will be available at POST /model and POST /model/stream.
- agents (Optional[List[Agent]], default None): List of available agents. Each agent must have a unique name. Creates /agent/{agent_name} endpoints for each agent.
- rate_limiter (Optional[RateLimiter], default None): Rate limiter to control API usage. Applies to all endpoints. If not provided, no rate limiting is enforced.
- user_validator (Optional[Callable[[str], Union[bool, str]]], default None):
Function to validate user_id from requests. Must return:
- True: user_id is valid and accepted as-is
- False: user_id is invalid, will fallback to IP-based identification
- str: user_id is valid but normalized (e.g. "user123" -> "user_123")
Notes
At least one of model or agents must be provided to create useful endpoints. If neither is provided, only meta endpoints (/, /health) will be available.
The user_validator function is called for every request that includes a user_id in the request body. If validation fails, the application falls back to using the client IP address for rate limiting.
223 def validate_user_id(self, user_id: str) -> Union[bool, str]: 224 """ 225 Validate a user_id using the configured validator. 226 227 This method provides a safe way to validate user IDs, handling 228 any exceptions that might occur during validation. 229 230 Parameters 231 ---------- 232 user_id : str 233 user_id to validate 234 235 Returns 236 ------- 237 Union[bool, str] 238 - True: user_id is valid and accepted as-is 239 - False: user_id is invalid or validation failed 240 - str: user_id is valid but normalized (use this value instead) 241 242 Notes 243 ----- 244 If no validator is configured, this method always returns True. 245 Any exceptions during validation are caught and result in False. 246 """ 247 if not self._user_validator: 248 return True # No validator, always consider valid 249 250 try: 251 return self._user_validator(user_id) 252 except Exception: 253 return False # Error during validation, consider invalid
Validate a user_id using the configured validator.
This method provides a safe way to validate user IDs, handling any exceptions that might occur during validation.
Parameters
- user_id (str): user_id to validate
Returns
- Union[bool, str]: - True: user_id is valid and accepted as-is
- False: user_id is invalid or validation failed
- str: user_id is valid but normalized (use this value instead)
Notes
If no validator is configured, this method always returns True. Any exceptions during validation are caught and result in False.
659 def serve( 660 self, 661 host: str = "0.0.0.0", 662 port: int = 8000, 663 reload: bool = False, 664 workers: Optional[int] = None, 665 log_level: str = "info", 666 ): 667 668 """ 669 Serve the application creating endpoints for the model and agents. 670 671 This method starts a uvicorn server with the configured FastAPI application. 672 The server provides REST API and WebSocket endpoints for interacting with 673 AI models and agents. 674 675 API Endpoints 676 ------------- 677 When the server is running, the following endpoints are available: 678 679 **Model Endpoints** (if model is configured): 680 - POST /model Ask the model 681 - POST /model/stream Ask the model with streaming 682 - WS /model/ws Ask the model with WebSocket streaming 683 684 **Agent Endpoints** (if agents are configured): 685 - POST /agent/{agent_name} Execute an agent 686 - POST /agent/{agent_name}/stream Execute an agent with streaming 687 - WS /agent/{agent_name}/ws Execute an agent with WebSocket streaming 688 689 **Authentication & Validation:** 690 - POST /validate-user Validate a user_id 691 692 **Rate Limiting & Monitoring:** 693 - GET /rate-limit/stats Rate limiter statistics 694 - GET /rate-limit/stats/{user_id} Statistics for a specific user 695 696 **Meta & Health:** 697 - GET / Ping the application 698 - GET /health Health check 699 700 **Request Format:** 701 All POST endpoints expect JSON with a "prompt" field: 702 ```json 703 { 704 "prompt": "Your question here", 705 "user_id": "optional_user_id" 706 } 707 ``` 708 709 **Response Format:** 710 - Model responses: Standard model response format 711 - Agent responses: Agent execution result with iterations 712 - WebSocket: JSON messages with "type" and "content" fields 713 714 Parameters 715 ---------- 716 host : str, default "0.0.0.0" 717 Host to serve the application. Use "0.0.0.0" for external access. 718 port : int, default 8000 719 Port to serve the application on. 720 reload : bool, default False 721 Whether to reload the application when code changes are detected. 722 Useful for development, not recommended for production. 723 workers : Optional[int], default None 724 Number of worker processes to use. If None, uses uvicorn default. 725 log_level : str, default "info" 726 Log level for uvicorn. Options: "critical", "error", "warning", "info", "debug". 727 728 Raises 729 ------ 730 ImportError 731 If uvicorn is not installed 732 733 Examples 734 -------- 735 Basic serving: 736 ``` 737 app = Application(name="MyApp", model=model) 738 app.serve() # Serves on http://localhost:8000 739 ``` 740 741 Production serving: 742 ``` 743 app.serve(host="0.0.0.0", port=8080, workers=4, log_level="warning") 744 ``` 745 746 Development serving: 747 ``` 748 app.serve(reload=True, log_level="debug") 749 ``` 750 """ 751 752 try: 753 import uvicorn 754 except ImportError as e: 755 raise ImportError( 756 "uvicorn is required to serve the application. " 757 "Install it with: pip install uvicorn" 758 ) from e 759 760 uvicorn.run( 761 self._build_app(), 762 host=host, 763 port=port, 764 reload=reload, 765 workers=workers, 766 log_level=log_level, 767 )
Serve the application creating endpoints for the model and agents.
This method starts a uvicorn server with the configured FastAPI application. The server provides REST API and WebSocket endpoints for interacting with AI models and agents.
API Endpoints
When the server is running, the following endpoints are available:
Model Endpoints (if model is configured): - POST /model Ask the model - POST /model/stream Ask the model with streaming - WS /model/ws Ask the model with WebSocket streaming
Agent Endpoints (if agents are configured): - POST /agent/{agent_name} Execute an agent - POST /agent/{agent_name}/stream Execute an agent with streaming - WS /agent/{agent_name}/ws Execute an agent with WebSocket streaming
Authentication & Validation: - POST /validate-user Validate a user_id
Rate Limiting & Monitoring: - GET /rate-limit/stats Rate limiter statistics - GET /rate-limit/stats/{user_id} Statistics for a specific user
Meta & Health: - GET / Ping the application - GET /health Health check
Request Format: All POST endpoints expect JSON with a "prompt" field:
{
"prompt": "Your question here",
"user_id": "optional_user_id"
}
Response Format:
- Model responses: Standard model response format
- Agent responses: Agent execution result with iterations
- WebSocket: JSON messages with "type" and "content" fields
Parameters
- host (str, default "0.0.0.0"): Host to serve the application. Use "0.0.0.0" for external access.
- port (int, default 8000): Port to serve the application on.
- reload (bool, default False): Whether to reload the application when code changes are detected. Useful for development, not recommended for production.
- workers (Optional[int], default None): Number of worker processes to use. If None, uses uvicorn default.
- log_level (str, default "info"): Log level for uvicorn. Options: "critical", "error", "warning", "info", "debug".
Raises
- ImportError: If uvicorn is not installed
Examples
Basic serving:
app = Application(name="MyApp", model=model)
app.serve() # Serves on http://localhost:8000
Production serving:
app.serve(host="0.0.0.0", port=8080, workers=4, log_level="warning")
Development serving:
app.serve(reload=True, log_level="debug")
66class RateLimiter: 67 """ 68 Rate limiter che usa SQLite per persistenza dei dati. 69 70 Supporta multiple limiti simultanei per un singolo utente. 71 Ad esempio: 100 richieste al giorno E 10 richieste all'ora. 72 73 Supporta diversi tipi di unità di misura: 74 - 'token': limite basato sui token 75 - 'request': limite basato sulle richieste 76 77 E diverse unità di tempo per il reset con valori personalizzabili: 78 - 'second': reset ogni N secondi (es. reset_value=5 = ogni 5 secondi) 79 - 'minute': reset ogni N minuti (es. reset_value=2 = ogni 2 minuti) 80 - 'hour': reset ogni N ore (es. reset_value=3 = ogni 3 ore) 81 - 'day': reset ogni N giorni (es. reset_value=1 = ogni giorno) 82 """ 83 84 def __init__(self, limits: List[Limit], db_path: str = "rate_limiter.db"): 85 """ 86 Inizializza il rate limiter con multiple limiti. 87 88 Parameters 89 ---------- 90 limits : List[Limit] 91 Lista di limiti da applicare simultaneamente 92 db_path : str 93 Percorso del database SQLite 94 """ 95 if not limits: 96 raise ValueError("At least one limit must be provided") 97 98 self._limits = limits 99 self._db_path = db_path 100 101 # Crea il database e le tabelle se non esistono 102 self._init_database() 103 104 105 def _init_database(self): 106 """Inizializza il database SQLite e crea le tabelle necessarie.""" 107 with self._get_connection() as conn: 108 conn.execute(""" 109 CREATE TABLE IF NOT EXISTS rate_limits ( 110 user_id TEXT NOT NULL, 111 limit_name TEXT NOT NULL, 112 window_start INTEGER NOT NULL, 113 usage_count INTEGER NOT NULL DEFAULT 0, 114 PRIMARY KEY (user_id, limit_name, window_start) 115 ) 116 """) 117 conn.execute(""" 118 CREATE INDEX IF NOT EXISTS idx_user_limit_window 119 ON rate_limits(user_id, limit_name, window_start) 120 """) 121 conn.execute(""" 122 CREATE INDEX IF NOT EXISTS idx_limit_window 123 ON rate_limits(limit_name, window_start) 124 """) 125 conn.commit() 126 127 @contextmanager 128 def _get_connection(self): 129 """Context manager per gestire le connessioni SQLite.""" 130 conn = sqlite3.connect(self._db_path) 131 conn.row_factory = sqlite3.Row # Permette accesso per nome colonna 132 try: 133 yield conn 134 finally: 135 conn.close() 136 137 def _get_current_window_start(self, limit: Limit) -> int: 138 """Calcola l'inizio della finestra temporale corrente per un limite specifico.""" 139 current_time = int(time.time()) 140 time_window = limit.get_time_window_seconds() 141 return current_time - (current_time % time_window) 142 143 def _cleanup_old_windows(self, conn: sqlite3.Connection): 144 """Rimuove le finestre temporali vecchie dal database.""" 145 current_time = int(time.time()) 146 for limit in self._limits: 147 # Mantieni 2 finestre per ogni limite 148 time_window = limit.get_time_window_seconds() 149 cutoff_time = current_time - (time_window * 2) 150 conn.execute("DELETE FROM rate_limits WHERE limit_name = ? AND window_start < ?", 151 (limit.name, cutoff_time)) 152 153 def update(self, user_id: str, usage: int = 1) -> bool: 154 """ 155 Aggiorna l'uso per un utente specifico per tutti i limiti. 156 157 Parameters 158 ---------- 159 user_id : str 160 ID dell'utente 161 usage : int 162 Quantità di uso da aggiungere (default: 1) 163 164 Returns 165 ------- 166 bool 167 True se l'aggiornamento è andato a buon fine 168 """ 169 with self._get_connection() as conn: 170 # Pulisci le finestre vecchie 171 self._cleanup_old_windows(conn) 172 173 # Aggiorna l'uso per ogni limite 174 for limit in self._limits: 175 window_start = self._get_current_window_start(limit) 176 177 # Inserisci o aggiorna l'uso per questa finestra e limite 178 conn.execute(""" 179 INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count) 180 VALUES (?, ?, ?, ?) 181 ON CONFLICT(user_id, limit_name, window_start) 182 DO UPDATE SET usage_count = usage_count + ? 183 """, (user_id, limit.name, window_start, usage, usage)) 184 185 conn.commit() 186 return True 187 188 def check(self, user_id: str) -> bool: 189 """ 190 Controlla se un utente ha superato uno qualsiasi dei limiti. 191 192 Parameters 193 ---------- 194 user_id : str 195 ID dell'utente 196 197 Returns 198 ------- 199 bool 200 True se l'utente può fare richieste, False se ha superato almeno un limite 201 """ 202 for limit in self._limits: 203 current_usage = self.get_usage(user_id, limit) 204 if current_usage >= limit.value: 205 return False 206 return True 207 208 def get_usage(self, user_id: str, limit: Optional[Limit] = None) -> int: 209 """ 210 Ottiene l'uso corrente per un utente nella finestra temporale corrente. 211 212 Parameters 213 ---------- 214 user_id : str 215 ID dell'utente 216 limit : Limit, optional 217 Limite specifico. Se None, restituisce l'uso totale per tutti i limiti. 218 219 Returns 220 ------- 221 int 222 Numero di richieste nella finestra temporale corrente 223 """ 224 if limit is None: 225 # Restituisce l'uso totale per tutti i limiti 226 total_usage = 0 227 for limit_obj in self._limits: 228 total_usage += self.get_usage(user_id, limit_obj) 229 return total_usage 230 231 window_start = self._get_current_window_start(limit) 232 233 with self._get_connection() as conn: 234 cursor = conn.execute(""" 235 SELECT usage_count FROM rate_limits 236 WHERE user_id = ? AND limit_name = ? AND window_start = ? 237 """, (user_id, limit.name, window_start)) 238 239 result = cursor.fetchone() 240 return result['usage_count'] if result else 0 241 242 def get_remaining(self, user_id: str, limit: Optional[Limit] = None) -> int: 243 """ 244 Ottiene il numero di richieste rimanenti per un utente. 245 246 Parameters 247 ---------- 248 user_id : str 249 ID dell'utente 250 limit : Limit, optional 251 Limite specifico. Se None, restituisce il minimo tra tutti i limiti. 252 253 Returns 254 ------- 255 int 256 Numero di richieste rimanenti 257 """ 258 if limit is None: 259 # Restituisce il minimo rimanente tra tutti i limiti 260 min_remaining = float('inf') 261 for limit_obj in self._limits: 262 remaining = self.get_remaining(user_id, limit_obj) 263 min_remaining = min(min_remaining, remaining) 264 return int(min_remaining) if min_remaining != float('inf') else 0 265 266 current_usage = self.get_usage(user_id, limit) 267 return max(0, limit.value - current_usage) 268 269 def reset_user(self, user_id: str, limit: Optional[Limit] = None) -> bool: 270 """ 271 Resetta l'uso per un utente specifico. 272 273 Parameters 274 ---------- 275 user_id : str 276 ID dell'utente 277 limit : Limit, optional 278 Limite specifico. Se None, resetta tutti i limiti per l'utente. 279 280 Returns 281 ------- 282 bool 283 True se il reset è andato a buon fine 284 """ 285 with self._get_connection() as conn: 286 if limit is None: 287 conn.execute("DELETE FROM rate_limits WHERE user_id = ?", (user_id,)) 288 else: 289 conn.execute("DELETE FROM rate_limits WHERE user_id = ? AND limit_name = ?", 290 (user_id, limit.name)) 291 conn.commit() 292 return True 293 294 def get_stats(self, user_id: Optional[str] = None, limit: Optional[Limit] = None) -> Dict[str, Any]: 295 """ 296 Ottiene statistiche di utilizzo. 297 298 Parameters 299 ---------- 300 user_id : str, optional 301 ID dell'utente specifico. Se None, restituisce statistiche globali. 302 limit : Limit, optional 303 Limite specifico. Se None, restituisce statistiche per tutti i limiti. 304 305 Returns 306 ------- 307 Dict[str, Any] 308 Statistiche di utilizzo 309 """ 310 with self._get_connection() as conn: 311 if user_id: 312 if limit: 313 # Statistiche per utente e limite specifico 314 cursor = conn.execute(""" 315 SELECT 316 COUNT(*) as total_windows, 317 SUM(usage_count) as total_usage, 318 AVG(usage_count) as avg_usage, 319 MAX(usage_count) as max_usage 320 FROM rate_limits 321 WHERE user_id = ? AND limit_name = ? 322 """, (user_id, limit.name)) 323 else: 324 # Statistiche per utente specifico (tutti i limiti) 325 cursor = conn.execute(""" 326 SELECT 327 COUNT(*) as total_windows, 328 SUM(usage_count) as total_usage, 329 AVG(usage_count) as avg_usage, 330 MAX(usage_count) as max_usage 331 FROM rate_limits 332 WHERE user_id = ? 333 """, (user_id,)) 334 else: 335 if limit: 336 # Statistiche globali per limite specifico 337 cursor = conn.execute(""" 338 SELECT 339 COUNT(DISTINCT user_id) as unique_users, 340 COUNT(*) as total_windows, 341 SUM(usage_count) as total_usage, 342 AVG(usage_count) as avg_usage, 343 MAX(usage_count) as max_usage 344 FROM rate_limits 345 WHERE limit_name = ? 346 """, (limit.name,)) 347 else: 348 # Statistiche globali (tutti i limiti) 349 cursor = conn.execute(""" 350 SELECT 351 COUNT(DISTINCT user_id) as unique_users, 352 COUNT(*) as total_windows, 353 SUM(usage_count) as total_usage, 354 AVG(usage_count) as avg_usage, 355 MAX(usage_count) as max_usage 356 FROM rate_limits 357 """) 358 359 result = cursor.fetchone() 360 return dict(result) if result else {} 361 362 def cleanup(self) -> int: 363 """ 364 Pulisce le finestre temporali vecchie dal database. 365 366 Returns 367 ------- 368 int 369 Numero di record rimossi 370 """ 371 with self._get_connection() as conn: 372 self._cleanup_old_windows(conn) 373 conn.commit() 374 return 0 # Il conteggio è gestito in _cleanup_old_windows 375 376 def get_limits(self) -> List[Limit]: 377 """ 378 Restituisce la lista dei limiti configurati. 379 380 Returns 381 ------- 382 List[Limit] 383 Lista dei limiti 384 """ 385 return self._limits.copy() 386 387 def get_limit_by_name(self, name: str) -> Optional[Limit]: 388 """ 389 Restituisce un limite specifico per nome. 390 391 Parameters 392 ---------- 393 name : str 394 Nome del limite 395 396 Returns 397 ------- 398 Limit, optional 399 Il limite se trovato, None altrimenti 400 """ 401 for limit in self._limits: 402 if limit.name == name: 403 return limit 404 return None 405 406 def get_usage_by_limit(self, user_id: str) -> Dict[str, int]: 407 """ 408 Restituisce l'uso per ogni limite per un utente specifico. 409 410 Parameters 411 ---------- 412 user_id : str 413 ID dell'utente 414 415 Returns 416 ------- 417 Dict[str, int] 418 Dizionario con nome limite -> uso corrente 419 """ 420 usage_by_limit = {} 421 for limit in self._limits: 422 usage_by_limit[limit.name] = self.get_usage(user_id, limit) 423 return usage_by_limit 424 425 def get_remaining_by_limit(self, user_id: str) -> Dict[str, int]: 426 """ 427 Restituisce le richieste rimanenti per ogni limite per un utente specifico. 428 429 Parameters 430 ---------- 431 user_id : str 432 ID dell'utente 433 434 Returns 435 ------- 436 Dict[str, int] 437 Dizionario con nome limite -> richieste rimanenti 438 """ 439 remaining_by_limit = {} 440 for limit in self._limits: 441 remaining_by_limit[limit.name] = self.get_remaining(user_id, limit) 442 return remaining_by_limit 443 444 def _extract_tokens_from_response(self, response: Union[str, Dict[str, Any]]) -> int: 445 """ 446 Estrae il numero di token dalla risposta del modello/agente. 447 448 Parameters 449 ---------- 450 response : Union[str, Dict[str, Any]] 451 Risposta del modello/agente (JSON string o dict) 452 453 Returns 454 ------- 455 int 456 Numero di token estratti, 0 se non trovato 457 """ 458 try: 459 # Se è una stringa, prova a parsarla come JSON 460 if isinstance(response, str): 461 try: 462 response = json.loads(response) 463 except json.JSONDecodeError: 464 return 0 465 466 # Se è un dict, cerca il campo usage->total_tokens 467 if isinstance(response, dict): 468 usage = response.get("usage", {}) 469 if isinstance(usage, dict): 470 total_tokens = usage.get("total_tokens", 0) 471 if isinstance(total_tokens, (int, float)): 472 return int(total_tokens) 473 474 return 0 475 except Exception: 476 return 0 477 478 def update_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool: 479 """ 480 Aggiorna il rate limiter basandosi sulla risposta del modello/agente. 481 Estrae automaticamente i token se presenti nella risposta. 482 483 Parameters 484 ---------- 485 user_id : str 486 ID dell'utente 487 response : Union[str, Dict[str, Any]] 488 Risposta del modello/agente (JSON string o dict) 489 490 Returns 491 ------- 492 bool 493 True se l'aggiornamento è andato a buon fine 494 """ 495 # Estrai i token dalla risposta 496 tokens = self._extract_tokens_from_response(response) 497 498 # Aggiorna ogni limite basandosi sulla sua unità 499 with self._get_connection() as conn: 500 # Pulisci le finestre vecchie 501 self._cleanup_old_windows(conn) 502 503 for limit in self._limits: 504 window_start = self._get_current_window_start(limit) 505 506 # Determina l'uso basandosi sull'unità del limite 507 if limit.unit == "token": 508 usage = tokens 509 else: # request 510 usage = 1 511 512 # Inserisci o aggiorna l'uso per questa finestra e limite 513 conn.execute(""" 514 INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count) 515 VALUES (?, ?, ?, ?) 516 ON CONFLICT(user_id, limit_name, window_start) 517 DO UPDATE SET usage_count = usage_count + ? 518 """, (user_id, limit.name, window_start, usage, usage)) 519 520 conn.commit() 521 return True 522 523 def check_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool: 524 """ 525 Controlla se un utente può fare una richiesta basandosi sulla risposta del modello/agente. 526 Estrae automaticamente i token se presenti nella risposta. 527 528 Parameters 529 ---------- 530 user_id : str 531 ID dell'utente 532 response : Union[str, Dict[str, Any]] 533 Risposta del modello/agente (JSON string o dict) 534 535 Returns 536 ------- 537 bool 538 True se l'utente può fare richieste, False se ha superato almeno un limite 539 """ 540 # Estrai i token dalla risposta 541 tokens = self._extract_tokens_from_response(response) 542 543 # Verifica ogni limite 544 for limit in self._limits: 545 current_usage = self.get_usage(user_id, limit) 546 547 # Determina l'uso aggiuntivo basandosi sull'unità del limite 548 if limit.unit == "token": 549 additional_usage = tokens 550 else: # request 551 additional_usage = 1 552 553 # Verifica se l'uso aggiuntivo supererebbe il limite 554 if current_usage + additional_usage > limit.value: 555 return False 556 557 return True 558 559 def __repr__(self) -> str: 560 limits_str = ", ".join([str(limit) for limit in self._limits]) 561 return f"RateLimiter(limits=[{limits_str}], db_path='{self._db_path}')"
Rate limiter che usa SQLite per persistenza dei dati.
Supporta multiple limiti simultanei per un singolo utente. Ad esempio: 100 richieste al giorno E 10 richieste all'ora.
Supporta diversi tipi di unità di misura:
- 'token': limite basato sui token
- 'request': limite basato sulle richieste
E diverse unità di tempo per il reset con valori personalizzabili:
- 'second': reset ogni N secondi (es. reset_value=5 = ogni 5 secondi)
- 'minute': reset ogni N minuti (es. reset_value=2 = ogni 2 minuti)
- 'hour': reset ogni N ore (es. reset_value=3 = ogni 3 ore)
- 'day': reset ogni N giorni (es. reset_value=1 = ogni giorno)
84 def __init__(self, limits: List[Limit], db_path: str = "rate_limiter.db"): 85 """ 86 Inizializza il rate limiter con multiple limiti. 87 88 Parameters 89 ---------- 90 limits : List[Limit] 91 Lista di limiti da applicare simultaneamente 92 db_path : str 93 Percorso del database SQLite 94 """ 95 if not limits: 96 raise ValueError("At least one limit must be provided") 97 98 self._limits = limits 99 self._db_path = db_path 100 101 # Crea il database e le tabelle se non esistono 102 self._init_database()
Inizializza il rate limiter con multiple limiti.
Parameters
- limits (List[Limit]): Lista di limiti da applicare simultaneamente
- db_path (str): Percorso del database SQLite
153 def update(self, user_id: str, usage: int = 1) -> bool: 154 """ 155 Aggiorna l'uso per un utente specifico per tutti i limiti. 156 157 Parameters 158 ---------- 159 user_id : str 160 ID dell'utente 161 usage : int 162 Quantità di uso da aggiungere (default: 1) 163 164 Returns 165 ------- 166 bool 167 True se l'aggiornamento è andato a buon fine 168 """ 169 with self._get_connection() as conn: 170 # Pulisci le finestre vecchie 171 self._cleanup_old_windows(conn) 172 173 # Aggiorna l'uso per ogni limite 174 for limit in self._limits: 175 window_start = self._get_current_window_start(limit) 176 177 # Inserisci o aggiorna l'uso per questa finestra e limite 178 conn.execute(""" 179 INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count) 180 VALUES (?, ?, ?, ?) 181 ON CONFLICT(user_id, limit_name, window_start) 182 DO UPDATE SET usage_count = usage_count + ? 183 """, (user_id, limit.name, window_start, usage, usage)) 184 185 conn.commit() 186 return True
Aggiorna l'uso per un utente specifico per tutti i limiti.
Parameters
- user_id (str): ID dell'utente
- usage (int): Quantità di uso da aggiungere (default: 1)
Returns
- bool: True se l'aggiornamento è andato a buon fine
188 def check(self, user_id: str) -> bool: 189 """ 190 Controlla se un utente ha superato uno qualsiasi dei limiti. 191 192 Parameters 193 ---------- 194 user_id : str 195 ID dell'utente 196 197 Returns 198 ------- 199 bool 200 True se l'utente può fare richieste, False se ha superato almeno un limite 201 """ 202 for limit in self._limits: 203 current_usage = self.get_usage(user_id, limit) 204 if current_usage >= limit.value: 205 return False 206 return True
Controlla se un utente ha superato uno qualsiasi dei limiti.
Parameters
- user_id (str): ID dell'utente
Returns
- bool: True se l'utente può fare richieste, False se ha superato almeno un limite
208 def get_usage(self, user_id: str, limit: Optional[Limit] = None) -> int: 209 """ 210 Ottiene l'uso corrente per un utente nella finestra temporale corrente. 211 212 Parameters 213 ---------- 214 user_id : str 215 ID dell'utente 216 limit : Limit, optional 217 Limite specifico. Se None, restituisce l'uso totale per tutti i limiti. 218 219 Returns 220 ------- 221 int 222 Numero di richieste nella finestra temporale corrente 223 """ 224 if limit is None: 225 # Restituisce l'uso totale per tutti i limiti 226 total_usage = 0 227 for limit_obj in self._limits: 228 total_usage += self.get_usage(user_id, limit_obj) 229 return total_usage 230 231 window_start = self._get_current_window_start(limit) 232 233 with self._get_connection() as conn: 234 cursor = conn.execute(""" 235 SELECT usage_count FROM rate_limits 236 WHERE user_id = ? AND limit_name = ? AND window_start = ? 237 """, (user_id, limit.name, window_start)) 238 239 result = cursor.fetchone() 240 return result['usage_count'] if result else 0
Ottiene l'uso corrente per un utente nella finestra temporale corrente.
Parameters
- user_id (str): ID dell'utente
- limit (Limit, optional): Limite specifico. Se None, restituisce l'uso totale per tutti i limiti.
Returns
- int: Numero di richieste nella finestra temporale corrente
242 def get_remaining(self, user_id: str, limit: Optional[Limit] = None) -> int: 243 """ 244 Ottiene il numero di richieste rimanenti per un utente. 245 246 Parameters 247 ---------- 248 user_id : str 249 ID dell'utente 250 limit : Limit, optional 251 Limite specifico. Se None, restituisce il minimo tra tutti i limiti. 252 253 Returns 254 ------- 255 int 256 Numero di richieste rimanenti 257 """ 258 if limit is None: 259 # Restituisce il minimo rimanente tra tutti i limiti 260 min_remaining = float('inf') 261 for limit_obj in self._limits: 262 remaining = self.get_remaining(user_id, limit_obj) 263 min_remaining = min(min_remaining, remaining) 264 return int(min_remaining) if min_remaining != float('inf') else 0 265 266 current_usage = self.get_usage(user_id, limit) 267 return max(0, limit.value - current_usage)
Ottiene il numero di richieste rimanenti per un utente.
Parameters
- user_id (str): ID dell'utente
- limit (Limit, optional): Limite specifico. Se None, restituisce il minimo tra tutti i limiti.
Returns
- int: Numero di richieste rimanenti
269 def reset_user(self, user_id: str, limit: Optional[Limit] = None) -> bool: 270 """ 271 Resetta l'uso per un utente specifico. 272 273 Parameters 274 ---------- 275 user_id : str 276 ID dell'utente 277 limit : Limit, optional 278 Limite specifico. Se None, resetta tutti i limiti per l'utente. 279 280 Returns 281 ------- 282 bool 283 True se il reset è andato a buon fine 284 """ 285 with self._get_connection() as conn: 286 if limit is None: 287 conn.execute("DELETE FROM rate_limits WHERE user_id = ?", (user_id,)) 288 else: 289 conn.execute("DELETE FROM rate_limits WHERE user_id = ? AND limit_name = ?", 290 (user_id, limit.name)) 291 conn.commit() 292 return True
Resetta l'uso per un utente specifico.
Parameters
- user_id (str): ID dell'utente
- limit (Limit, optional): Limite specifico. Se None, resetta tutti i limiti per l'utente.
Returns
- bool: True se il reset è andato a buon fine
294 def get_stats(self, user_id: Optional[str] = None, limit: Optional[Limit] = None) -> Dict[str, Any]: 295 """ 296 Ottiene statistiche di utilizzo. 297 298 Parameters 299 ---------- 300 user_id : str, optional 301 ID dell'utente specifico. Se None, restituisce statistiche globali. 302 limit : Limit, optional 303 Limite specifico. Se None, restituisce statistiche per tutti i limiti. 304 305 Returns 306 ------- 307 Dict[str, Any] 308 Statistiche di utilizzo 309 """ 310 with self._get_connection() as conn: 311 if user_id: 312 if limit: 313 # Statistiche per utente e limite specifico 314 cursor = conn.execute(""" 315 SELECT 316 COUNT(*) as total_windows, 317 SUM(usage_count) as total_usage, 318 AVG(usage_count) as avg_usage, 319 MAX(usage_count) as max_usage 320 FROM rate_limits 321 WHERE user_id = ? AND limit_name = ? 322 """, (user_id, limit.name)) 323 else: 324 # Statistiche per utente specifico (tutti i limiti) 325 cursor = conn.execute(""" 326 SELECT 327 COUNT(*) as total_windows, 328 SUM(usage_count) as total_usage, 329 AVG(usage_count) as avg_usage, 330 MAX(usage_count) as max_usage 331 FROM rate_limits 332 WHERE user_id = ? 333 """, (user_id,)) 334 else: 335 if limit: 336 # Statistiche globali per limite specifico 337 cursor = conn.execute(""" 338 SELECT 339 COUNT(DISTINCT user_id) as unique_users, 340 COUNT(*) as total_windows, 341 SUM(usage_count) as total_usage, 342 AVG(usage_count) as avg_usage, 343 MAX(usage_count) as max_usage 344 FROM rate_limits 345 WHERE limit_name = ? 346 """, (limit.name,)) 347 else: 348 # Statistiche globali (tutti i limiti) 349 cursor = conn.execute(""" 350 SELECT 351 COUNT(DISTINCT user_id) as unique_users, 352 COUNT(*) as total_windows, 353 SUM(usage_count) as total_usage, 354 AVG(usage_count) as avg_usage, 355 MAX(usage_count) as max_usage 356 FROM rate_limits 357 """) 358 359 result = cursor.fetchone() 360 return dict(result) if result else {}
Ottiene statistiche di utilizzo.
Parameters
- user_id (str, optional): ID dell'utente specifico. Se None, restituisce statistiche globali.
- limit (Limit, optional): Limite specifico. Se None, restituisce statistiche per tutti i limiti.
Returns
- Dict[str, Any]: Statistiche di utilizzo
362 def cleanup(self) -> int: 363 """ 364 Pulisce le finestre temporali vecchie dal database. 365 366 Returns 367 ------- 368 int 369 Numero di record rimossi 370 """ 371 with self._get_connection() as conn: 372 self._cleanup_old_windows(conn) 373 conn.commit() 374 return 0 # Il conteggio è gestito in _cleanup_old_windows
Pulisce le finestre temporali vecchie dal database.
Returns
- int: Numero di record rimossi
376 def get_limits(self) -> List[Limit]: 377 """ 378 Restituisce la lista dei limiti configurati. 379 380 Returns 381 ------- 382 List[Limit] 383 Lista dei limiti 384 """ 385 return self._limits.copy()
Restituisce la lista dei limiti configurati.
Returns
- List[Limit]: Lista dei limiti
387 def get_limit_by_name(self, name: str) -> Optional[Limit]: 388 """ 389 Restituisce un limite specifico per nome. 390 391 Parameters 392 ---------- 393 name : str 394 Nome del limite 395 396 Returns 397 ------- 398 Limit, optional 399 Il limite se trovato, None altrimenti 400 """ 401 for limit in self._limits: 402 if limit.name == name: 403 return limit 404 return None
Restituisce un limite specifico per nome.
Parameters
- name (str): Nome del limite
Returns
- Limit, optional: Il limite se trovato, None altrimenti
406 def get_usage_by_limit(self, user_id: str) -> Dict[str, int]: 407 """ 408 Restituisce l'uso per ogni limite per un utente specifico. 409 410 Parameters 411 ---------- 412 user_id : str 413 ID dell'utente 414 415 Returns 416 ------- 417 Dict[str, int] 418 Dizionario con nome limite -> uso corrente 419 """ 420 usage_by_limit = {} 421 for limit in self._limits: 422 usage_by_limit[limit.name] = self.get_usage(user_id, limit) 423 return usage_by_limit
Restituisce l'uso per ogni limite per un utente specifico.
Parameters
- user_id (str): ID dell'utente
Returns
- Dict[str, int]: Dizionario con nome limite -> uso corrente
425 def get_remaining_by_limit(self, user_id: str) -> Dict[str, int]: 426 """ 427 Restituisce le richieste rimanenti per ogni limite per un utente specifico. 428 429 Parameters 430 ---------- 431 user_id : str 432 ID dell'utente 433 434 Returns 435 ------- 436 Dict[str, int] 437 Dizionario con nome limite -> richieste rimanenti 438 """ 439 remaining_by_limit = {} 440 for limit in self._limits: 441 remaining_by_limit[limit.name] = self.get_remaining(user_id, limit) 442 return remaining_by_limit
Restituisce le richieste rimanenti per ogni limite per un utente specifico.
Parameters
- user_id (str): ID dell'utente
Returns
- Dict[str, int]: Dizionario con nome limite -> richieste rimanenti
478 def update_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool: 479 """ 480 Aggiorna il rate limiter basandosi sulla risposta del modello/agente. 481 Estrae automaticamente i token se presenti nella risposta. 482 483 Parameters 484 ---------- 485 user_id : str 486 ID dell'utente 487 response : Union[str, Dict[str, Any]] 488 Risposta del modello/agente (JSON string o dict) 489 490 Returns 491 ------- 492 bool 493 True se l'aggiornamento è andato a buon fine 494 """ 495 # Estrai i token dalla risposta 496 tokens = self._extract_tokens_from_response(response) 497 498 # Aggiorna ogni limite basandosi sulla sua unità 499 with self._get_connection() as conn: 500 # Pulisci le finestre vecchie 501 self._cleanup_old_windows(conn) 502 503 for limit in self._limits: 504 window_start = self._get_current_window_start(limit) 505 506 # Determina l'uso basandosi sull'unità del limite 507 if limit.unit == "token": 508 usage = tokens 509 else: # request 510 usage = 1 511 512 # Inserisci o aggiorna l'uso per questa finestra e limite 513 conn.execute(""" 514 INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count) 515 VALUES (?, ?, ?, ?) 516 ON CONFLICT(user_id, limit_name, window_start) 517 DO UPDATE SET usage_count = usage_count + ? 518 """, (user_id, limit.name, window_start, usage, usage)) 519 520 conn.commit() 521 return True
Aggiorna il rate limiter basandosi sulla risposta del modello/agente. Estrae automaticamente i token se presenti nella risposta.
Parameters
- user_id (str): ID dell'utente
- response (Union[str, Dict[str, Any]]): Risposta del modello/agente (JSON string o dict)
Returns
- bool: True se l'aggiornamento è andato a buon fine
523 def check_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool: 524 """ 525 Controlla se un utente può fare una richiesta basandosi sulla risposta del modello/agente. 526 Estrae automaticamente i token se presenti nella risposta. 527 528 Parameters 529 ---------- 530 user_id : str 531 ID dell'utente 532 response : Union[str, Dict[str, Any]] 533 Risposta del modello/agente (JSON string o dict) 534 535 Returns 536 ------- 537 bool 538 True se l'utente può fare richieste, False se ha superato almeno un limite 539 """ 540 # Estrai i token dalla risposta 541 tokens = self._extract_tokens_from_response(response) 542 543 # Verifica ogni limite 544 for limit in self._limits: 545 current_usage = self.get_usage(user_id, limit) 546 547 # Determina l'uso aggiuntivo basandosi sull'unità del limite 548 if limit.unit == "token": 549 additional_usage = tokens 550 else: # request 551 additional_usage = 1 552 553 # Verifica se l'uso aggiuntivo supererebbe il limite 554 if current_usage + additional_usage > limit.value: 555 return False 556 557 return True
Controlla se un utente può fare una richiesta basandosi sulla risposta del modello/agente. Estrae automaticamente i token se presenti nella risposta.
Parameters
- user_id (str): ID dell'utente
- response (Union[str, Dict[str, Any]]): Risposta del modello/agente (JSON string o dict)
Returns
- bool: True se l'utente può fare richieste, False se ha superato almeno un limite
10@dataclass 11class Limit: 12 """ 13 Rappresenta un singolo limite di rate limiting. 14 15 Attributes 16 ---------- 17 unit : str 18 Unità di misura ('token', 'request') 19 value : int 20 Numero massimo di unità consentite nel periodo di reset 21 reset_unit : str 22 Unità di tempo per il reset ('second', 'minute', 'hour', 'day') 23 reset_value : int 24 Valore numerico per l'unità di tempo (es. 1 per "1 giorno") 25 name : str, optional 26 Nome identificativo per il limite (default: auto-generato) 27 """ 28 unit: str 29 value: int 30 reset_unit: str 31 reset_value: int 32 name: Optional[str] = None 33 34 def __post_init__(self): 35 """Validazione e generazione automatica del nome.""" 36 if self.unit not in ['token', 'request']: 37 raise ValueError("Unit must be one of: 'token', 'request'") 38 39 if self.reset_unit not in ['second', 'minute', 'hour', 'day']: 40 raise ValueError("Reset_unit must be one of: 'second', 'minute', 'hour', 'day'") 41 42 if self.reset_value <= 0: 43 raise ValueError("Reset_value must be a positive integer") 44 45 if self.value <= 0: 46 raise ValueError("Value must be a positive integer") 47 48 # Genera nome automatico se non fornito 49 if self.name is None: 50 self.name = f"{self.value}_{self.unit}_{self.reset_value}_{self.reset_unit}" 51 52 def get_time_window_seconds(self) -> int: 53 """Converte l'unità di tempo in secondi.""" 54 time_windows = { 55 'second': 1, 56 'minute': 60, 57 'hour': 3600, 58 'day': 86400 59 } 60 return time_windows[self.reset_unit] * self.reset_value 61 62 def __repr__(self) -> str: 63 return f"Limit(name='{self.name}', unit='{self.unit}', value={self.value}, reset_unit='{self.reset_unit}', reset_value={self.reset_value})"
Rappresenta un singolo limite di rate limiting.
Attributes
- unit (str): Unità di misura ('token', 'request')
- value (int): Numero massimo di unità consentite nel periodo di reset
- reset_unit (str): Unità di tempo per il reset ('second', 'minute', 'hour', 'day')
- reset_value (int): Valore numerico per l'unità di tempo (es. 1 per "1 giorno")
- name (str, optional): Nome identificativo per il limite (default: auto-generato)
52 def get_time_window_seconds(self) -> int: 53 """Converte l'unità di tempo in secondi.""" 54 time_windows = { 55 'second': 1, 56 'minute': 60, 57 'hour': 3600, 58 'day': 86400 59 } 60 return time_windows[self.reset_unit] * self.reset_value
Converte l'unità di tempo in secondi.