monoai.application

The Application module provides a high-level interface for building and serving AI applications incorporating models and agents.

1"""
2The Application module provides a high-level interface for building and serving AI applications incorporating models and agents.
3"""
4
5from .application import Application
6from .rate_limiter import RateLimiter, Limit
7
8__all__ = ["Application", "RateLimiter", "Limit"]
class Application:
 11class Application:
 12    """
 13    FastAPI-based application for serving AI models and agents.
 14    
 15    The Application class provides a complete web service wrapper around AI models
 16    and agents, offering REST API endpoints, WebSocket support, rate limiting,
 17    and user validation capabilities.
 18    
 19    This class automatically creates FastAPI endpoints based on the configured
 20    models and agents, handling request validation, rate limiting, and response
 21    formatting.
 22        
 23    Examples
 24    --------
 25    Basic usage with a model:
 26    ```
 27    from monoai.models import Model
 28    from monoai.application import Application
 29    
 30    model = Model(provider="openai", model="gpt-4o-mini")
 31    app = Application(name="MyAIApp", model=model)
 32    app.serve(port=8000)
 33    ```
 34    
 35    With agents and rate limiting:
 36    ```
 37    from monoai.models import Model
 38    from monoai.agents import Agent
 39    from monoai.application import Application, RateLimiter
 40    
 41    model = Model(provider="openai", model="gpt-4o-mini")
 42    agent = Agent(model=model, paradigm="react")
 43    rate_limiter = RateLimiter(requests_per_minute=60)
 44    
 45    app = Application(
 46        name="AgentApp",
 47        agents=[agent],
 48        rate_limiter=rate_limiter
 49    )
 50    app.serve(port=8000)
 51    ```
 52    
 53    With user validation:
 54    ```
 55    def validate_user(user_id: str):
 56        # Custom validation logic
 57        if user_id.startswith("user_"):
 58            return True
 59        elif user_id.isdigit():
 60            return f"user_{user_id}"  # Normalize
 61        return False
 62    
 63    app = Application(
 64        name="SecureApp",
 65        model=model,
 66        user_validator=validate_user
 67    )
 68    ```
 69    """
 70
 71    def __init__(self, name: str, model: Optional[Model] = None, agents: Optional[List[Agent]] = None, 
 72                 rate_limiter: Optional[RateLimiter] = None, user_validator: Optional[Callable[[str], Union[bool, str]]] = None):
 73        """
 74        Initialize the application.
 75        
 76        Parameters
 77        ----------
 78        name : str
 79            Application name. Used in API responses and logging.
 80        model : Optional[Model], default None
 81            AI model to use. If provided, creates /model endpoints.
 82            The model will be available at POST /model and POST /model/stream.
 83        agents : Optional[List[Agent]], default None
 84            List of available agents. Each agent must have a unique name.
 85            Creates /agent/{agent_name} endpoints for each agent.
 86        rate_limiter : Optional[RateLimiter], default None
 87            Rate limiter to control API usage. Applies to all endpoints.
 88            If not provided, no rate limiting is enforced.
 89        user_validator : Optional[Callable[[str], Union[bool, str]]], default None
 90            Function to validate user_id from requests. Must return:
 91            - True: user_id is valid and accepted as-is
 92            - False: user_id is invalid, will fallback to IP-based identification
 93            - str: user_id is valid but normalized (e.g. "user123" -> "user_123")
 94            
 95        Notes
 96        -----
 97        At least one of model or agents must be provided to create useful endpoints.
 98        If neither is provided, only meta endpoints (/, /health) will be available.
 99        
100        The user_validator function is called for every request that includes
101        a user_id in the request body. If validation fails, the application
102        falls back to using the client IP address for rate limiting.
103        """
104        self.name = name
105        self._model = model
106        self._agents: Optional[Dict[str, Agent]] = (
107            {a.name: a for a in agents} if agents else None
108        )
109        self._rate_limiter = rate_limiter
110        self._user_validator = user_validator
111        self._started_at = datetime.now(timezone.utc)
112
113    @staticmethod
114    async def _maybe_await(fn: Callable[..., Any], *args, **kwargs) -> Any:
115        """
116        Execute a function and await it if it's a coroutine.
117        
118        This helper method allows the application to work with both
119        synchronous and asynchronous model/agent methods.
120        
121        Parameters
122        ----------
123        fn : Callable[..., Any]
124            Function to execute
125        *args
126            Positional arguments to pass to the function
127        **kwargs
128            Keyword arguments to pass to the function
129            
130        Returns
131        -------
132        Any
133            Result of the function execution
134        """
135        result = fn(*args, **kwargs)
136        if hasattr(result, "__await__"):
137            return await result 
138        return result
139
140    def _get_user_identifier(self, request, data: Dict[str, Any]) -> str:
141        """
142        Extract user identifier from the request.
143        
144        This method implements a multi-step user identification process:
145        1. Look for user_id in the request body
146        2. Validate user_id using the configured validator (if any)
147        3. Fall back to client IP address if user_id is invalid or missing
148        
149        The method handles various proxy headers (X-Forwarded-For, X-Real-IP)
150        to get the real client IP when behind load balancers or proxies.
151        
152        Parameters
153        ----------
154        request : Request
155            FastAPI request object (can be HTTP Request or WebSocket)
156        data : Dict[str, Any]
157            Request body data containing potential user_id
158            
159        Returns
160        -------
161        str
162            User identifier in one of these formats:
163            - "user_id" if user_id is provided and valid
164            - "ip:192.168.1.1" if using IP-based identification
165            - "ip:unknown" if IP cannot be determined
166            
167        Notes
168        -----
169        The user identifier is used for rate limiting and request tracking.
170        IP-based identifiers are prefixed with "ip:" to distinguish them
171        from actual user IDs.
172        """
173        # Look for user_id in the request body
174        user_id = data.get("user_id")
175        if user_id:
176            user_id_str = str(user_id)
177            
178            # Validate user_id if validator is configured
179            if self._user_validator:
180                try:
181                    validation_result = self._user_validator(user_id_str)
182                    
183                    if validation_result is True:
184                        # user_id is valid, use as is
185                        return user_id_str
186                    elif validation_result is False:
187                        # user_id is invalid, fallback to IP
188                        pass
189                    elif isinstance(validation_result, str):
190                        # user_id is valid but normalized
191                        return validation_result
192                    else:
193                        # Unknown validation result, fallback to IP
194                        pass
195                except Exception:
196                    # Error during validation, fallback to IP
197                    pass
198            else:
199                # No validator configured, use user_id as is
200                return user_id_str
201        
202        # Fallback to client IP
203        # Try different headers to get the real IP
204        forwarded_for = request.headers.get("X-Forwarded-For")
205        if forwarded_for:
206            # X-Forwarded-For can contain multiple IPs, take the first one
207            client_ip = forwarded_for.split(",")[0].strip()
208        else:
209            real_ip = request.headers.get("X-Real-IP")
210            if real_ip:
211                client_ip = real_ip
212            else:
213                # Handle both Request and WebSocket
214                if hasattr(request, 'client') and request.client:
215                    client_ip = request.client.host
216                elif hasattr(request, 'url') and hasattr(request.url, 'hostname'):
217                    client_ip = request.url.hostname
218                else:
219                    client_ip = "unknown"
220        
221        return f"ip:{client_ip}"
222
223    def validate_user_id(self, user_id: str) -> Union[bool, str]:
224        """
225        Validate a user_id using the configured validator.
226        
227        This method provides a safe way to validate user IDs, handling
228        any exceptions that might occur during validation.
229        
230        Parameters
231        ----------
232        user_id : str
233            user_id to validate
234        
235        Returns
236        -------
237        Union[bool, str]
238            - True: user_id is valid and accepted as-is
239            - False: user_id is invalid or validation failed
240            - str: user_id is valid but normalized (use this value instead)
241            
242        Notes
243        -----
244        If no validator is configured, this method always returns True.
245        Any exceptions during validation are caught and result in False.
246        """
247        if not self._user_validator:
248            return True  # No validator, always consider valid
249        
250        try:
251            return self._user_validator(user_id)
252        except Exception:
253            return False  # Error during validation, consider invalid
254
255    def _build_app(self):
256        """
257        Build and configure the FastAPI application.
258        
259        This method creates a FastAPI app with all the necessary endpoints,
260        middleware, and error handling based on the configured models and agents.
261        
262        Returns
263        -------
264        FastAPI
265            Configured FastAPI application instance
266            
267        Raises
268        ------
269        ImportError
270            If FastAPI is not installed
271            
272        Notes
273        -----
274        The method dynamically creates endpoints based on what's configured:
275        - Model endpoints are created if a model is provided
276        - Agent endpoints are created if agents are provided
277        - Meta endpoints (/, /health) are always created
278        - Rate limiting and user validation are applied to all endpoints
279        """
280
281        try:
282            from fastapi import FastAPI, Request, HTTPException, status
283        except ImportError as e:
284            raise ImportError(
285                "fastapi is required to build the application. "
286                "Install it with: pip install fastapi"
287            ) from e
288
289        from fastapi.middleware.cors import CORSMiddleware
290
291        app = FastAPI(title=self.name)
292
293        app.add_middleware(
294            CORSMiddleware,
295            allow_origins=["*"],
296            allow_credentials=True,
297            allow_methods=["*"],
298            allow_headers=["*"],
299        )
300
301        @app.get("/", tags=["meta"], summary="Ping app")
302        async def root():
303            return {
304                "msg": f"App {self.name} successfully started",
305                "started_at": self._started_at.isoformat() + "Z",
306            }
307
308        @app.get("/health", tags=["meta"], summary="Health check")
309        async def health():
310            return {"status": "ok", "app": self.name}
311
312        @app.get("/rate-limit/stats", tags=["rate-limit"], summary="Rate limiter statistics")
313        async def rate_limit_stats():
314            if not self._rate_limiter:
315                return {"message": "Rate limiter not configured"}
316            
317            stats = self._rate_limiter.get_stats()
318            return {
319                "rate_limiter": str(self._rate_limiter),
320                "global_stats": stats
321            }
322
323        @app.get("/rate-limit/stats/{user_id}", tags=["rate-limit"], summary="Statistics for a specific user")
324        async def rate_limit_user_stats(user_id: str):
325            if not self._rate_limiter:
326                return {"message": "Rate limiter not configured"}
327            
328            user_stats = self._rate_limiter.get_stats(user_id)
329            usage = self._rate_limiter.get_usage(user_id)
330            remaining = self._rate_limiter.get_remaining(user_id)
331            
332            return {
333                "user_id": user_id,
334                "usage": usage,
335                "remaining": remaining,
336                "stats": user_stats
337            }
338
339        @app.post("/validate-user", tags=["auth"], summary="Validate a user_id")
340        async def validate_user_endpoint(request: Request):
341            try:
342                data = await request.json()
343            except Exception:
344                raise HTTPException(status_code=400, detail="Invalid JSON body")
345            
346            user_id = data.get("user_id")
347            if not user_id:
348                raise HTTPException(status_code=400, detail="'user_id' is required")
349            
350            validation_result = self.validate_user_id(str(user_id))
351            
352            if validation_result is True:
353                return {
354                    "valid": True,
355                    "user_id": user_id,
356                    "normalized": None
357                }
358            elif validation_result is False:
359                return {
360                    "valid": False,
361                    "user_id": user_id,
362                    "error": "Invalid user_id"
363                }
364            elif isinstance(validation_result, str):
365                return {
366                    "valid": True,
367                    "user_id": user_id,
368                    "normalized": validation_result
369                }
370            else:
371                return {
372                    "valid": False,
373                    "user_id": user_id,
374                    "error": "Unknown validation result"
375                }
376
377        if self._model is not None:
378            @app.post(
379                "/model",
380                tags=["model"],
381                summary="Ask the model",
382                status_code=status.HTTP_200_OK,
383            )
384            async def model_route(request: Request):
385                try:
386                    data = await request.json()
387                except Exception:
388                    raise HTTPException(status_code=400, detail="Invalid JSON body")
389
390                prompt = (data or {}).get("prompt")
391                if not prompt:
392                    raise HTTPException(status_code=400, detail="'prompt' is required")
393
394                # Extract user identifier
395                user_identifier = self._get_user_identifier(request, data or {})
396                
397                # Execute the request to the model
398                result = await self._maybe_await(self._model.ask, prompt)
399                
400                # Check and update rate limit if configured
401                if self._rate_limiter:
402                    # Check rate limit based on the response
403                    if not self._rate_limiter.check_with_response(user_identifier, result):
404                        raise HTTPException(
405                            status_code=429, 
406                            detail="Rate limit exceeded. Please try again later."
407                        )
408
409                    # Update the rate limiter with the response
410                    self._rate_limiter.update_with_response(user_identifier, result)
411                
412                return result
413
414        if self._agents is not None:
415            @app.post(
416                "/agent/{agent_name}",
417                tags=["agents"],
418                summary="Execute an agent",
419                status_code=status.HTTP_200_OK,
420            )
421            async def agent_route(agent_name: str, request: Request):
422                if agent_name not in self._agents:
423                    raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found")
424
425                try:
426                    data = await request.json()
427                except Exception:
428                    raise HTTPException(status_code=400, detail="Invalid JSON body")
429
430                prompt = (data or {}).get("prompt")
431                if not prompt:
432                    raise HTTPException(status_code=400, detail="'prompt' is required")
433
434                # Extract user identifier
435                user_identifier = self._get_user_identifier(request, data or {})
436                
437                # Execute the agent
438                agent = self._agents[agent_name]
439                result = await self._maybe_await(agent.run, prompt)
440                
441                # Check and update rate limit if configured
442                if self._rate_limiter:
443                    # Check rate limit based on the response
444                    if not self._rate_limiter.check_with_response(user_identifier, result):
445                        raise HTTPException(
446                            status_code=429, 
447                            detail="Rate limit exceeded. Please try again later."
448                        )
449
450                    # Update the rate limiter with the response
451                    self._rate_limiter.update_with_response(user_identifier, result)
452                
453                return result
454
455        # Model streaming endpoint
456        if self._model is not None:
457            @app.post(
458                "/model/stream",
459                tags=["model"],
460                summary="Ask the model with streaming",
461                status_code=status.HTTP_200_OK,
462            )
463            async def model_stream_route(request: Request):
464                try:
465                    data = await request.json()
466                except Exception:
467                    raise HTTPException(status_code=400, detail="Invalid JSON body")
468
469                prompt = (data or {}).get("prompt")
470                if not prompt:
471                    raise HTTPException(status_code=400, detail="'prompt' is required")
472
473                # Extract user identifier
474                user_identifier = self._get_user_identifier(request, data or {})
475                
476                # Create a function to handle streaming
477                def stream_handler(content: str):
478                    return content
479                
480                # Enable streaming on the model
481                if hasattr(self._model, 'enable_streaming'):
482                    self._model.enable_streaming(stream_handler)
483                
484                # Execute the request to the model with streaming
485                result = await self._maybe_await(self._model.ask, prompt)
486                
487                # Check and update rate limit if configured
488                if self._rate_limiter:
489                    # Check rate limit based on the response
490                    if not self._rate_limiter.check_with_response(user_identifier, result):
491                        raise HTTPException(
492                            status_code=429, 
493                            detail="Rate limit exceeded. Please try again later."
494                        )
495
496                    # Update the rate limiter with the response
497                    self._rate_limiter.update_with_response(user_identifier, result)
498                
499                return result
500
501        # Agent streaming endpoint
502        if self._agents is not None:
503            @app.post(
504                "/agent/{agent_name}/stream",
505                tags=["agents"],
506                summary="Execute an agent with streaming",
507                status_code=status.HTTP_200_OK,
508            )
509            async def agent_stream_route(agent_name: str, request: Request):
510                if agent_name not in self._agents:
511                    raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found")
512
513                try:
514                    data = await request.json()
515                except Exception:
516                    raise HTTPException(status_code=400, detail="Invalid JSON body")
517
518                prompt = (data or {}).get("prompt")
519                if not prompt:
520                    raise HTTPException(status_code=400, detail="'prompt' is required")
521
522                # Extract user identifier
523                user_identifier = self._get_user_identifier(request, data or {})
524                
525                # Create a function to handle streaming
526                def stream_handler(content: str):
527                    return content
528                
529                # Enable streaming on the agent
530                agent = self._agents[agent_name]
531                if hasattr(agent, 'enable_streaming'):
532                    agent.enable_streaming(stream_handler)
533                
534                # Execute the agent with streaming
535                result = await self._maybe_await(agent.run, prompt)
536                
537                # Check and update rate limit if configured
538                if self._rate_limiter:
539                    # Check rate limit based on the response
540                    if not self._rate_limiter.check_with_response(user_identifier, result):
541                        raise HTTPException(
542                            status_code=429, 
543                            detail="Rate limit exceeded. Please try again later."
544                        )
545
546                    # Update the rate limiter with the response
547                    self._rate_limiter.update_with_response(user_identifier, result)
548                
549                return result
550
551        # WebSocket endpoint for real-time streaming
552        if self._model is not None:
553            @app.websocket("/model/ws")
554            async def model_websocket(websocket):
555                try:
556                    from fastapi import WebSocket
557                    await websocket.accept()
558                    
559                    while True:
560                        # Receive prompt from client
561                        data = await websocket.receive_text()
562                        try:
563                            request_data = json.loads(data)
564                        except json.JSONDecodeError:
565                            await websocket.send_text(json.dumps({"error": "Invalid JSON"}))
566                            continue
567                        
568                        prompt = request_data.get("prompt")
569                        if not prompt:
570                            await websocket.send_text(json.dumps({"error": "Prompt is required"}))
571                            continue
572                        
573                        # Extract user identifier
574                        user_identifier = self._get_user_identifier(websocket, request_data)
575                        
576                        # Create a handler to send chunks via WebSocket
577                        def ws_stream_handler(content: str):
578                            asyncio.create_task(websocket.send_text(json.dumps({
579                                "type": "chunk",
580                                "content": content
581                            })))
582                        
583                        # Enable streaming on the model
584                        if hasattr(self._model, 'enable_streaming'):
585                            self._model.enable_streaming(ws_stream_handler)
586                        
587                        # Execute the request to the model
588                        result = await self._maybe_await(self._model.ask, prompt)
589                        
590                        # Send the final result
591                        await websocket.send_text(json.dumps({
592                            "type": "complete",
593                            "result": result
594                        }))
595                        
596                except Exception as e:
597                    await websocket.send_text(json.dumps({"error": str(e)}))
598                finally:
599                    await websocket.close()
600
601        if self._agents is not None:
602            @app.websocket("/agent/{agent_name}/ws")
603            async def agent_websocket(websocket, agent_name: str):
604                try:
605                    from fastapi import WebSocket
606                    await websocket.accept()
607                    
608                    if agent_name not in self._agents:
609                        await websocket.send_text(json.dumps({"error": f"Agent '{agent_name}' not found"}))
610                        await websocket.close()
611                        return
612                    
613                    agent = self._agents[agent_name]
614                    
615                    while True:
616                        # Receive prompt from client
617                        data = await websocket.receive_text()
618                        try:
619                            request_data = json.loads(data)
620                        except json.JSONDecodeError:
621                            await websocket.send_text(json.dumps({"error": "Invalid JSON"}))
622                            continue
623                        
624                        prompt = request_data.get("prompt")
625                        if not prompt:
626                            await websocket.send_text(json.dumps({"error": "Prompt is required"}))
627                            continue
628                        
629                        # Extract user identifier
630                        user_identifier = self._get_user_identifier(websocket, request_data)
631                        
632                        # Create a handler to send chunks via WebSocket
633                        def ws_stream_handler(content: str):
634                            asyncio.create_task(websocket.send_text(json.dumps({
635                                "type": "chunk",
636                                "content": content
637                            })))
638                        
639                        # Enable streaming on the agent
640                        if hasattr(agent, 'enable_streaming'):
641                            agent.enable_streaming(ws_stream_handler)
642                        
643                        # Execute the agent
644                        result = await self._maybe_await(agent.run, prompt)
645                        
646                        # Send the final result
647                        await websocket.send_text(json.dumps({
648                            "type": "complete",
649                            "result": result
650                        }))
651                        
652                except Exception as e:
653                    await websocket.send_text(json.dumps({"error": str(e)}))
654                finally:
655                    await websocket.close()
656
657        return app
658
659    def serve(
660        self,
661        host: str = "0.0.0.0",
662        port: int = 8000,
663        reload: bool = False,
664        workers: Optional[int] = None,
665        log_level: str = "info",
666    ):
667
668        """
669        Serve the application creating endpoints for the model and agents.
670        
671        This method starts a uvicorn server with the configured FastAPI application.
672        The server provides REST API and WebSocket endpoints for interacting with
673        AI models and agents.
674        
675        API Endpoints
676        -------------
677        When the server is running, the following endpoints are available:
678        
679        **Model Endpoints** (if model is configured):
680            - POST /model                    Ask the model
681            - POST /model/stream             Ask the model with streaming
682            - WS  /model/ws                  Ask the model with WebSocket streaming
683        
684        **Agent Endpoints** (if agents are configured):
685            - POST /agent/{agent_name}       Execute an agent
686            - POST /agent/{agent_name}/stream Execute an agent with streaming
687            - WS  /agent/{agent_name}/ws     Execute an agent with WebSocket streaming
688        
689        **Authentication & Validation:**
690            - POST /validate-user            Validate a user_id
691        
692        **Rate Limiting & Monitoring:**
693            - GET  /rate-limit/stats         Rate limiter statistics
694            - GET  /rate-limit/stats/{user_id} Statistics for a specific user
695        
696        **Meta & Health:**
697            - GET  /                         Ping the application
698            - GET  /health                   Health check
699        
700        **Request Format:**
701        All POST endpoints expect JSON with a "prompt" field:
702        ```json
703        {
704            "prompt": "Your question here",
705            "user_id": "optional_user_id"
706        }
707        ```
708        
709        **Response Format:**
710        - Model responses: Standard model response format
711        - Agent responses: Agent execution result with iterations
712        - WebSocket: JSON messages with "type" and "content" fields
713        
714        Parameters
715        ----------
716        host : str, default "0.0.0.0"
717            Host to serve the application. Use "0.0.0.0" for external access.
718        port : int, default 8000
719            Port to serve the application on.
720        reload : bool, default False
721            Whether to reload the application when code changes are detected.
722            Useful for development, not recommended for production.
723        workers : Optional[int], default None
724            Number of worker processes to use. If None, uses uvicorn default.
725        log_level : str, default "info"
726            Log level for uvicorn. Options: "critical", "error", "warning", "info", "debug".
727            
728        Raises
729        ------
730        ImportError
731            If uvicorn is not installed
732            
733        Examples
734        --------
735        Basic serving:
736        ```
737        app = Application(name="MyApp", model=model)
738        app.serve()  # Serves on http://localhost:8000
739        ```
740        
741        Production serving:
742        ```
743        app.serve(host="0.0.0.0", port=8080, workers=4, log_level="warning")
744        ```
745        
746        Development serving:
747        ```
748        app.serve(reload=True, log_level="debug")
749        ```
750        """
751
752        try:
753            import uvicorn
754        except ImportError as e:
755            raise ImportError(
756                "uvicorn is required to serve the application. "
757                "Install it with: pip install uvicorn"
758            ) from e
759
760        uvicorn.run(
761            self._build_app(),
762            host=host,
763            port=port,
764            reload=reload,
765            workers=workers,
766            log_level=log_level,
767        )

FastAPI-based application for serving AI models and agents.

The Application class provides a complete web service wrapper around AI models and agents, offering REST API endpoints, WebSocket support, rate limiting, and user validation capabilities.

This class automatically creates FastAPI endpoints based on the configured models and agents, handling request validation, rate limiting, and response formatting.

Examples

Basic usage with a model:

from monoai.models import Model
from monoai.application import Application

model = Model(provider="openai", model="gpt-4o-mini")
app = Application(name="MyAIApp", model=model)
app.serve(port=8000)

With agents and rate limiting:

from monoai.models import Model
from monoai.agents import Agent
from monoai.application import Application, RateLimiter

model = Model(provider="openai", model="gpt-4o-mini")
agent = Agent(model=model, paradigm="react")
rate_limiter = RateLimiter(requests_per_minute=60)

app = Application(
    name="AgentApp",
    agents=[agent],
    rate_limiter=rate_limiter
)
app.serve(port=8000)

With user validation:

def validate_user(user_id: str):
    # Custom validation logic
    if user_id.startswith("user_"):
        return True
    elif user_id.isdigit():
        return f"user_{user_id}"  # Normalize
    return False

app = Application(
    name="SecureApp",
    model=model,
    user_validator=validate_user
)
Application( name: str, model: Optional[monoai.models.Model] = None, agents: Optional[List[monoai.agents.Agent]] = None, rate_limiter: Optional[RateLimiter] = None, user_validator: Optional[Callable[[str], Union[bool, str]]] = None)
 71    def __init__(self, name: str, model: Optional[Model] = None, agents: Optional[List[Agent]] = None, 
 72                 rate_limiter: Optional[RateLimiter] = None, user_validator: Optional[Callable[[str], Union[bool, str]]] = None):
 73        """
 74        Initialize the application.
 75        
 76        Parameters
 77        ----------
 78        name : str
 79            Application name. Used in API responses and logging.
 80        model : Optional[Model], default None
 81            AI model to use. If provided, creates /model endpoints.
 82            The model will be available at POST /model and POST /model/stream.
 83        agents : Optional[List[Agent]], default None
 84            List of available agents. Each agent must have a unique name.
 85            Creates /agent/{agent_name} endpoints for each agent.
 86        rate_limiter : Optional[RateLimiter], default None
 87            Rate limiter to control API usage. Applies to all endpoints.
 88            If not provided, no rate limiting is enforced.
 89        user_validator : Optional[Callable[[str], Union[bool, str]]], default None
 90            Function to validate user_id from requests. Must return:
 91            - True: user_id is valid and accepted as-is
 92            - False: user_id is invalid, will fallback to IP-based identification
 93            - str: user_id is valid but normalized (e.g. "user123" -> "user_123")
 94            
 95        Notes
 96        -----
 97        At least one of model or agents must be provided to create useful endpoints.
 98        If neither is provided, only meta endpoints (/, /health) will be available.
 99        
100        The user_validator function is called for every request that includes
101        a user_id in the request body. If validation fails, the application
102        falls back to using the client IP address for rate limiting.
103        """
104        self.name = name
105        self._model = model
106        self._agents: Optional[Dict[str, Agent]] = (
107            {a.name: a for a in agents} if agents else None
108        )
109        self._rate_limiter = rate_limiter
110        self._user_validator = user_validator
111        self._started_at = datetime.now(timezone.utc)

Initialize the application.

Parameters
  • name (str): Application name. Used in API responses and logging.
  • model (Optional[Model], default None): AI model to use. If provided, creates /model endpoints. The model will be available at POST /model and POST /model/stream.
  • agents (Optional[List[Agent]], default None): List of available agents. Each agent must have a unique name. Creates /agent/{agent_name} endpoints for each agent.
  • rate_limiter (Optional[RateLimiter], default None): Rate limiter to control API usage. Applies to all endpoints. If not provided, no rate limiting is enforced.
  • user_validator (Optional[Callable[[str], Union[bool, str]]], default None): Function to validate user_id from requests. Must return:
    • True: user_id is valid and accepted as-is
    • False: user_id is invalid, will fallback to IP-based identification
    • str: user_id is valid but normalized (e.g. "user123" -> "user_123")
Notes

At least one of model or agents must be provided to create useful endpoints. If neither is provided, only meta endpoints (/, /health) will be available.

The user_validator function is called for every request that includes a user_id in the request body. If validation fails, the application falls back to using the client IP address for rate limiting.

name
def validate_user_id(self, user_id: str) -> Union[bool, str]:
223    def validate_user_id(self, user_id: str) -> Union[bool, str]:
224        """
225        Validate a user_id using the configured validator.
226        
227        This method provides a safe way to validate user IDs, handling
228        any exceptions that might occur during validation.
229        
230        Parameters
231        ----------
232        user_id : str
233            user_id to validate
234        
235        Returns
236        -------
237        Union[bool, str]
238            - True: user_id is valid and accepted as-is
239            - False: user_id is invalid or validation failed
240            - str: user_id is valid but normalized (use this value instead)
241            
242        Notes
243        -----
244        If no validator is configured, this method always returns True.
245        Any exceptions during validation are caught and result in False.
246        """
247        if not self._user_validator:
248            return True  # No validator, always consider valid
249        
250        try:
251            return self._user_validator(user_id)
252        except Exception:
253            return False  # Error during validation, consider invalid

Validate a user_id using the configured validator.

This method provides a safe way to validate user IDs, handling any exceptions that might occur during validation.

Parameters
  • user_id (str): user_id to validate
Returns
  • Union[bool, str]: - True: user_id is valid and accepted as-is
    • False: user_id is invalid or validation failed
    • str: user_id is valid but normalized (use this value instead)
Notes

If no validator is configured, this method always returns True. Any exceptions during validation are caught and result in False.

def serve( self, host: str = '0.0.0.0', port: int = 8000, reload: bool = False, workers: Optional[int] = None, log_level: str = 'info'):
659    def serve(
660        self,
661        host: str = "0.0.0.0",
662        port: int = 8000,
663        reload: bool = False,
664        workers: Optional[int] = None,
665        log_level: str = "info",
666    ):
667
668        """
669        Serve the application creating endpoints for the model and agents.
670        
671        This method starts a uvicorn server with the configured FastAPI application.
672        The server provides REST API and WebSocket endpoints for interacting with
673        AI models and agents.
674        
675        API Endpoints
676        -------------
677        When the server is running, the following endpoints are available:
678        
679        **Model Endpoints** (if model is configured):
680            - POST /model                    Ask the model
681            - POST /model/stream             Ask the model with streaming
682            - WS  /model/ws                  Ask the model with WebSocket streaming
683        
684        **Agent Endpoints** (if agents are configured):
685            - POST /agent/{agent_name}       Execute an agent
686            - POST /agent/{agent_name}/stream Execute an agent with streaming
687            - WS  /agent/{agent_name}/ws     Execute an agent with WebSocket streaming
688        
689        **Authentication & Validation:**
690            - POST /validate-user            Validate a user_id
691        
692        **Rate Limiting & Monitoring:**
693            - GET  /rate-limit/stats         Rate limiter statistics
694            - GET  /rate-limit/stats/{user_id} Statistics for a specific user
695        
696        **Meta & Health:**
697            - GET  /                         Ping the application
698            - GET  /health                   Health check
699        
700        **Request Format:**
701        All POST endpoints expect JSON with a "prompt" field:
702        ```json
703        {
704            "prompt": "Your question here",
705            "user_id": "optional_user_id"
706        }
707        ```
708        
709        **Response Format:**
710        - Model responses: Standard model response format
711        - Agent responses: Agent execution result with iterations
712        - WebSocket: JSON messages with "type" and "content" fields
713        
714        Parameters
715        ----------
716        host : str, default "0.0.0.0"
717            Host to serve the application. Use "0.0.0.0" for external access.
718        port : int, default 8000
719            Port to serve the application on.
720        reload : bool, default False
721            Whether to reload the application when code changes are detected.
722            Useful for development, not recommended for production.
723        workers : Optional[int], default None
724            Number of worker processes to use. If None, uses uvicorn default.
725        log_level : str, default "info"
726            Log level for uvicorn. Options: "critical", "error", "warning", "info", "debug".
727            
728        Raises
729        ------
730        ImportError
731            If uvicorn is not installed
732            
733        Examples
734        --------
735        Basic serving:
736        ```
737        app = Application(name="MyApp", model=model)
738        app.serve()  # Serves on http://localhost:8000
739        ```
740        
741        Production serving:
742        ```
743        app.serve(host="0.0.0.0", port=8080, workers=4, log_level="warning")
744        ```
745        
746        Development serving:
747        ```
748        app.serve(reload=True, log_level="debug")
749        ```
750        """
751
752        try:
753            import uvicorn
754        except ImportError as e:
755            raise ImportError(
756                "uvicorn is required to serve the application. "
757                "Install it with: pip install uvicorn"
758            ) from e
759
760        uvicorn.run(
761            self._build_app(),
762            host=host,
763            port=port,
764            reload=reload,
765            workers=workers,
766            log_level=log_level,
767        )

Serve the application creating endpoints for the model and agents.

This method starts a uvicorn server with the configured FastAPI application. The server provides REST API and WebSocket endpoints for interacting with AI models and agents.

API Endpoints

When the server is running, the following endpoints are available:

Model Endpoints (if model is configured): - POST /model Ask the model - POST /model/stream Ask the model with streaming - WS /model/ws Ask the model with WebSocket streaming

Agent Endpoints (if agents are configured): - POST /agent/{agent_name} Execute an agent - POST /agent/{agent_name}/stream Execute an agent with streaming - WS /agent/{agent_name}/ws Execute an agent with WebSocket streaming

Authentication & Validation: - POST /validate-user Validate a user_id

Rate Limiting & Monitoring: - GET /rate-limit/stats Rate limiter statistics - GET /rate-limit/stats/{user_id} Statistics for a specific user

Meta & Health: - GET / Ping the application - GET /health Health check

Request Format: All POST endpoints expect JSON with a "prompt" field:

{
    "prompt": "Your question here",
    "user_id": "optional_user_id"
}

Response Format:

  • Model responses: Standard model response format
  • Agent responses: Agent execution result with iterations
  • WebSocket: JSON messages with "type" and "content" fields
Parameters
  • host (str, default "0.0.0.0"): Host to serve the application. Use "0.0.0.0" for external access.
  • port (int, default 8000): Port to serve the application on.
  • reload (bool, default False): Whether to reload the application when code changes are detected. Useful for development, not recommended for production.
  • workers (Optional[int], default None): Number of worker processes to use. If None, uses uvicorn default.
  • log_level (str, default "info"): Log level for uvicorn. Options: "critical", "error", "warning", "info", "debug".
Raises
  • ImportError: If uvicorn is not installed
Examples

Basic serving:

app = Application(name="MyApp", model=model)
app.serve()  # Serves on http://localhost:8000

Production serving:

app.serve(host="0.0.0.0", port=8080, workers=4, log_level="warning")

Development serving:

app.serve(reload=True, log_level="debug")
class RateLimiter:
 66class RateLimiter:
 67    """
 68    Rate limiter che usa SQLite per persistenza dei dati.
 69    
 70    Supporta multiple limiti simultanei per un singolo utente.
 71    Ad esempio: 100 richieste al giorno E 10 richieste all'ora.
 72    
 73    Supporta diversi tipi di unità di misura:
 74    - 'token': limite basato sui token
 75    - 'request': limite basato sulle richieste
 76    
 77    E diverse unità di tempo per il reset con valori personalizzabili:
 78    - 'second': reset ogni N secondi (es. reset_value=5 = ogni 5 secondi)
 79    - 'minute': reset ogni N minuti (es. reset_value=2 = ogni 2 minuti)
 80    - 'hour': reset ogni N ore (es. reset_value=3 = ogni 3 ore)
 81    - 'day': reset ogni N giorni (es. reset_value=1 = ogni giorno)
 82    """
 83
 84    def __init__(self, limits: List[Limit], db_path: str = "rate_limiter.db"):
 85        """
 86        Inizializza il rate limiter con multiple limiti.
 87        
 88        Parameters
 89        ----------
 90        limits : List[Limit]
 91            Lista di limiti da applicare simultaneamente
 92        db_path : str
 93            Percorso del database SQLite
 94        """
 95        if not limits:
 96            raise ValueError("At least one limit must be provided")
 97        
 98        self._limits = limits
 99        self._db_path = db_path
100        
101        # Crea il database e le tabelle se non esistono
102        self._init_database()
103
104
105    def _init_database(self):
106        """Inizializza il database SQLite e crea le tabelle necessarie."""
107        with self._get_connection() as conn:
108            conn.execute("""
109                CREATE TABLE IF NOT EXISTS rate_limits (
110                    user_id TEXT NOT NULL,
111                    limit_name TEXT NOT NULL,
112                    window_start INTEGER NOT NULL,
113                    usage_count INTEGER NOT NULL DEFAULT 0,
114                    PRIMARY KEY (user_id, limit_name, window_start)
115                )
116            """)
117            conn.execute("""
118                CREATE INDEX IF NOT EXISTS idx_user_limit_window 
119                ON rate_limits(user_id, limit_name, window_start)
120            """)
121            conn.execute("""
122                CREATE INDEX IF NOT EXISTS idx_limit_window 
123                ON rate_limits(limit_name, window_start)
124            """)
125            conn.commit()
126
127    @contextmanager
128    def _get_connection(self):
129        """Context manager per gestire le connessioni SQLite."""
130        conn = sqlite3.connect(self._db_path)
131        conn.row_factory = sqlite3.Row  # Permette accesso per nome colonna
132        try:
133            yield conn
134        finally:
135            conn.close()
136
137    def _get_current_window_start(self, limit: Limit) -> int:
138        """Calcola l'inizio della finestra temporale corrente per un limite specifico."""
139        current_time = int(time.time())
140        time_window = limit.get_time_window_seconds()
141        return current_time - (current_time % time_window)
142
143    def _cleanup_old_windows(self, conn: sqlite3.Connection):
144        """Rimuove le finestre temporali vecchie dal database."""
145        current_time = int(time.time())
146        for limit in self._limits:
147            # Mantieni 2 finestre per ogni limite
148            time_window = limit.get_time_window_seconds()
149            cutoff_time = current_time - (time_window * 2)
150            conn.execute("DELETE FROM rate_limits WHERE limit_name = ? AND window_start < ?", 
151                        (limit.name, cutoff_time))
152
153    def update(self, user_id: str, usage: int = 1) -> bool:
154        """
155        Aggiorna l'uso per un utente specifico per tutti i limiti.
156        
157        Parameters
158        ----------
159        user_id : str
160            ID dell'utente
161        usage : int
162            Quantità di uso da aggiungere (default: 1)
163            
164        Returns
165        -------
166        bool
167            True se l'aggiornamento è andato a buon fine
168        """
169        with self._get_connection() as conn:
170            # Pulisci le finestre vecchie
171            self._cleanup_old_windows(conn)
172            
173            # Aggiorna l'uso per ogni limite
174            for limit in self._limits:
175                window_start = self._get_current_window_start(limit)
176                
177                # Inserisci o aggiorna l'uso per questa finestra e limite
178                conn.execute("""
179                    INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count)
180                    VALUES (?, ?, ?, ?)
181                    ON CONFLICT(user_id, limit_name, window_start) 
182                    DO UPDATE SET usage_count = usage_count + ?
183                """, (user_id, limit.name, window_start, usage, usage))
184            
185            conn.commit()
186            return True
187
188    def check(self, user_id: str) -> bool:
189        """
190        Controlla se un utente ha superato uno qualsiasi dei limiti.
191        
192        Parameters
193        ----------
194        user_id : str
195            ID dell'utente
196            
197        Returns
198        -------
199        bool
200            True se l'utente può fare richieste, False se ha superato almeno un limite
201        """
202        for limit in self._limits:
203            current_usage = self.get_usage(user_id, limit)
204            if current_usage >= limit.value:
205                return False
206        return True
207
208    def get_usage(self, user_id: str, limit: Optional[Limit] = None) -> int:
209        """
210        Ottiene l'uso corrente per un utente nella finestra temporale corrente.
211        
212        Parameters
213        ----------
214        user_id : str
215            ID dell'utente
216        limit : Limit, optional
217            Limite specifico. Se None, restituisce l'uso totale per tutti i limiti.
218            
219        Returns
220        -------
221        int
222            Numero di richieste nella finestra temporale corrente
223        """
224        if limit is None:
225            # Restituisce l'uso totale per tutti i limiti
226            total_usage = 0
227            for limit_obj in self._limits:
228                total_usage += self.get_usage(user_id, limit_obj)
229            return total_usage
230        
231        window_start = self._get_current_window_start(limit)
232        
233        with self._get_connection() as conn:
234            cursor = conn.execute("""
235                SELECT usage_count FROM rate_limits 
236                WHERE user_id = ? AND limit_name = ? AND window_start = ?
237            """, (user_id, limit.name, window_start))
238            
239            result = cursor.fetchone()
240            return result['usage_count'] if result else 0
241
242    def get_remaining(self, user_id: str, limit: Optional[Limit] = None) -> int:
243        """
244        Ottiene il numero di richieste rimanenti per un utente.
245        
246        Parameters
247        ----------
248        user_id : str
249            ID dell'utente
250        limit : Limit, optional
251            Limite specifico. Se None, restituisce il minimo tra tutti i limiti.
252            
253        Returns
254        -------
255        int
256            Numero di richieste rimanenti
257        """
258        if limit is None:
259            # Restituisce il minimo rimanente tra tutti i limiti
260            min_remaining = float('inf')
261            for limit_obj in self._limits:
262                remaining = self.get_remaining(user_id, limit_obj)
263                min_remaining = min(min_remaining, remaining)
264            return int(min_remaining) if min_remaining != float('inf') else 0
265        
266        current_usage = self.get_usage(user_id, limit)
267        return max(0, limit.value - current_usage)
268
269    def reset_user(self, user_id: str, limit: Optional[Limit] = None) -> bool:
270        """
271        Resetta l'uso per un utente specifico.
272        
273        Parameters
274        ----------
275        user_id : str
276            ID dell'utente
277        limit : Limit, optional
278            Limite specifico. Se None, resetta tutti i limiti per l'utente.
279            
280        Returns
281        -------
282        bool
283            True se il reset è andato a buon fine
284        """
285        with self._get_connection() as conn:
286            if limit is None:
287                conn.execute("DELETE FROM rate_limits WHERE user_id = ?", (user_id,))
288            else:
289                conn.execute("DELETE FROM rate_limits WHERE user_id = ? AND limit_name = ?", 
290                           (user_id, limit.name))
291            conn.commit()
292            return True
293
294    def get_stats(self, user_id: Optional[str] = None, limit: Optional[Limit] = None) -> Dict[str, Any]:
295        """
296        Ottiene statistiche di utilizzo.
297        
298        Parameters
299        ----------
300        user_id : str, optional
301            ID dell'utente specifico. Se None, restituisce statistiche globali.
302        limit : Limit, optional
303            Limite specifico. Se None, restituisce statistiche per tutti i limiti.
304            
305        Returns
306        -------
307        Dict[str, Any]
308            Statistiche di utilizzo
309        """
310        with self._get_connection() as conn:
311            if user_id:
312                if limit:
313                    # Statistiche per utente e limite specifico
314                    cursor = conn.execute("""
315                        SELECT 
316                            COUNT(*) as total_windows,
317                            SUM(usage_count) as total_usage,
318                            AVG(usage_count) as avg_usage,
319                            MAX(usage_count) as max_usage
320                        FROM rate_limits 
321                        WHERE user_id = ? AND limit_name = ?
322                    """, (user_id, limit.name))
323                else:
324                    # Statistiche per utente specifico (tutti i limiti)
325                    cursor = conn.execute("""
326                        SELECT 
327                            COUNT(*) as total_windows,
328                            SUM(usage_count) as total_usage,
329                            AVG(usage_count) as avg_usage,
330                            MAX(usage_count) as max_usage
331                        FROM rate_limits 
332                        WHERE user_id = ?
333                    """, (user_id,))
334            else:
335                if limit:
336                    # Statistiche globali per limite specifico
337                    cursor = conn.execute("""
338                        SELECT 
339                            COUNT(DISTINCT user_id) as unique_users,
340                            COUNT(*) as total_windows,
341                            SUM(usage_count) as total_usage,
342                            AVG(usage_count) as avg_usage,
343                            MAX(usage_count) as max_usage
344                        FROM rate_limits
345                        WHERE limit_name = ?
346                    """, (limit.name,))
347                else:
348                    # Statistiche globali (tutti i limiti)
349                    cursor = conn.execute("""
350                        SELECT 
351                            COUNT(DISTINCT user_id) as unique_users,
352                            COUNT(*) as total_windows,
353                            SUM(usage_count) as total_usage,
354                            AVG(usage_count) as avg_usage,
355                            MAX(usage_count) as max_usage
356                        FROM rate_limits
357                    """)
358            
359            result = cursor.fetchone()
360            return dict(result) if result else {}
361
362    def cleanup(self) -> int:
363        """
364        Pulisce le finestre temporali vecchie dal database.
365        
366        Returns
367        -------
368        int
369            Numero di record rimossi
370        """
371        with self._get_connection() as conn:
372            self._cleanup_old_windows(conn)
373            conn.commit()
374            return 0  # Il conteggio è gestito in _cleanup_old_windows
375
376    def get_limits(self) -> List[Limit]:
377        """
378        Restituisce la lista dei limiti configurati.
379        
380        Returns
381        -------
382        List[Limit]
383            Lista dei limiti
384        """
385        return self._limits.copy()
386    
387    def get_limit_by_name(self, name: str) -> Optional[Limit]:
388        """
389        Restituisce un limite specifico per nome.
390        
391        Parameters
392        ----------
393        name : str
394            Nome del limite
395            
396        Returns
397        -------
398        Limit, optional
399            Il limite se trovato, None altrimenti
400        """
401        for limit in self._limits:
402            if limit.name == name:
403                return limit
404        return None
405    
406    def get_usage_by_limit(self, user_id: str) -> Dict[str, int]:
407        """
408        Restituisce l'uso per ogni limite per un utente specifico.
409        
410        Parameters
411        ----------
412        user_id : str
413            ID dell'utente
414            
415        Returns
416        -------
417        Dict[str, int]
418            Dizionario con nome limite -> uso corrente
419        """
420        usage_by_limit = {}
421        for limit in self._limits:
422            usage_by_limit[limit.name] = self.get_usage(user_id, limit)
423        return usage_by_limit
424    
425    def get_remaining_by_limit(self, user_id: str) -> Dict[str, int]:
426        """
427        Restituisce le richieste rimanenti per ogni limite per un utente specifico.
428        
429        Parameters
430        ----------
431        user_id : str
432            ID dell'utente
433            
434        Returns
435        -------
436        Dict[str, int]
437            Dizionario con nome limite -> richieste rimanenti
438        """
439        remaining_by_limit = {}
440        for limit in self._limits:
441            remaining_by_limit[limit.name] = self.get_remaining(user_id, limit)
442        return remaining_by_limit
443
444    def _extract_tokens_from_response(self, response: Union[str, Dict[str, Any]]) -> int:
445        """
446        Estrae il numero di token dalla risposta del modello/agente.
447        
448        Parameters
449        ----------
450        response : Union[str, Dict[str, Any]]
451            Risposta del modello/agente (JSON string o dict)
452            
453        Returns
454        -------
455        int
456            Numero di token estratti, 0 se non trovato
457        """
458        try:
459            # Se è una stringa, prova a parsarla come JSON
460            if isinstance(response, str):
461                try:
462                    response = json.loads(response)
463                except json.JSONDecodeError:
464                    return 0
465            
466            # Se è un dict, cerca il campo usage->total_tokens
467            if isinstance(response, dict):
468                usage = response.get("usage", {})
469                if isinstance(usage, dict):
470                    total_tokens = usage.get("total_tokens", 0)
471                    if isinstance(total_tokens, (int, float)):
472                        return int(total_tokens)
473            
474            return 0
475        except Exception:
476            return 0
477
478    def update_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool:
479        """
480        Aggiorna il rate limiter basandosi sulla risposta del modello/agente.
481        Estrae automaticamente i token se presenti nella risposta.
482        
483        Parameters
484        ----------
485        user_id : str
486            ID dell'utente
487        response : Union[str, Dict[str, Any]]
488            Risposta del modello/agente (JSON string o dict)
489            
490        Returns
491        -------
492        bool
493            True se l'aggiornamento è andato a buon fine
494        """
495        # Estrai i token dalla risposta
496        tokens = self._extract_tokens_from_response(response)
497        
498        # Aggiorna ogni limite basandosi sulla sua unità
499        with self._get_connection() as conn:
500            # Pulisci le finestre vecchie
501            self._cleanup_old_windows(conn)
502            
503            for limit in self._limits:
504                window_start = self._get_current_window_start(limit)
505                
506                # Determina l'uso basandosi sull'unità del limite
507                if limit.unit == "token":
508                    usage = tokens
509                else:  # request
510                    usage = 1
511                
512                # Inserisci o aggiorna l'uso per questa finestra e limite
513                conn.execute("""
514                    INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count)
515                    VALUES (?, ?, ?, ?)
516                    ON CONFLICT(user_id, limit_name, window_start) 
517                    DO UPDATE SET usage_count = usage_count + ?
518                """, (user_id, limit.name, window_start, usage, usage))
519            
520            conn.commit()
521            return True
522
523    def check_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool:
524        """
525        Controlla se un utente può fare una richiesta basandosi sulla risposta del modello/agente.
526        Estrae automaticamente i token se presenti nella risposta.
527        
528        Parameters
529        ----------
530        user_id : str
531            ID dell'utente
532        response : Union[str, Dict[str, Any]]
533            Risposta del modello/agente (JSON string o dict)
534            
535        Returns
536        -------
537        bool
538            True se l'utente può fare richieste, False se ha superato almeno un limite
539        """
540        # Estrai i token dalla risposta
541        tokens = self._extract_tokens_from_response(response)
542        
543        # Verifica ogni limite
544        for limit in self._limits:
545            current_usage = self.get_usage(user_id, limit)
546            
547            # Determina l'uso aggiuntivo basandosi sull'unità del limite
548            if limit.unit == "token":
549                additional_usage = tokens
550            else:  # request
551                additional_usage = 1
552            
553            # Verifica se l'uso aggiuntivo supererebbe il limite
554            if current_usage + additional_usage > limit.value:
555                return False
556        
557        return True
558
559    def __repr__(self) -> str:
560        limits_str = ", ".join([str(limit) for limit in self._limits])
561        return f"RateLimiter(limits=[{limits_str}], db_path='{self._db_path}')"

Rate limiter che usa SQLite per persistenza dei dati.

Supporta multiple limiti simultanei per un singolo utente. Ad esempio: 100 richieste al giorno E 10 richieste all'ora.

Supporta diversi tipi di unità di misura:

  • 'token': limite basato sui token
  • 'request': limite basato sulle richieste

E diverse unità di tempo per il reset con valori personalizzabili:

  • 'second': reset ogni N secondi (es. reset_value=5 = ogni 5 secondi)
  • 'minute': reset ogni N minuti (es. reset_value=2 = ogni 2 minuti)
  • 'hour': reset ogni N ore (es. reset_value=3 = ogni 3 ore)
  • 'day': reset ogni N giorni (es. reset_value=1 = ogni giorno)
RateLimiter( limits: List[Limit], db_path: str = 'rate_limiter.db')
 84    def __init__(self, limits: List[Limit], db_path: str = "rate_limiter.db"):
 85        """
 86        Inizializza il rate limiter con multiple limiti.
 87        
 88        Parameters
 89        ----------
 90        limits : List[Limit]
 91            Lista di limiti da applicare simultaneamente
 92        db_path : str
 93            Percorso del database SQLite
 94        """
 95        if not limits:
 96            raise ValueError("At least one limit must be provided")
 97        
 98        self._limits = limits
 99        self._db_path = db_path
100        
101        # Crea il database e le tabelle se non esistono
102        self._init_database()

Inizializza il rate limiter con multiple limiti.

Parameters
  • limits (List[Limit]): Lista di limiti da applicare simultaneamente
  • db_path (str): Percorso del database SQLite
def update(self, user_id: str, usage: int = 1) -> bool:
153    def update(self, user_id: str, usage: int = 1) -> bool:
154        """
155        Aggiorna l'uso per un utente specifico per tutti i limiti.
156        
157        Parameters
158        ----------
159        user_id : str
160            ID dell'utente
161        usage : int
162            Quantità di uso da aggiungere (default: 1)
163            
164        Returns
165        -------
166        bool
167            True se l'aggiornamento è andato a buon fine
168        """
169        with self._get_connection() as conn:
170            # Pulisci le finestre vecchie
171            self._cleanup_old_windows(conn)
172            
173            # Aggiorna l'uso per ogni limite
174            for limit in self._limits:
175                window_start = self._get_current_window_start(limit)
176                
177                # Inserisci o aggiorna l'uso per questa finestra e limite
178                conn.execute("""
179                    INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count)
180                    VALUES (?, ?, ?, ?)
181                    ON CONFLICT(user_id, limit_name, window_start) 
182                    DO UPDATE SET usage_count = usage_count + ?
183                """, (user_id, limit.name, window_start, usage, usage))
184            
185            conn.commit()
186            return True

Aggiorna l'uso per un utente specifico per tutti i limiti.

Parameters
  • user_id (str): ID dell'utente
  • usage (int): Quantità di uso da aggiungere (default: 1)
Returns
  • bool: True se l'aggiornamento è andato a buon fine
def check(self, user_id: str) -> bool:
188    def check(self, user_id: str) -> bool:
189        """
190        Controlla se un utente ha superato uno qualsiasi dei limiti.
191        
192        Parameters
193        ----------
194        user_id : str
195            ID dell'utente
196            
197        Returns
198        -------
199        bool
200            True se l'utente può fare richieste, False se ha superato almeno un limite
201        """
202        for limit in self._limits:
203            current_usage = self.get_usage(user_id, limit)
204            if current_usage >= limit.value:
205                return False
206        return True

Controlla se un utente ha superato uno qualsiasi dei limiti.

Parameters
  • user_id (str): ID dell'utente
Returns
  • bool: True se l'utente può fare richieste, False se ha superato almeno un limite
def get_usage( self, user_id: str, limit: Optional[Limit] = None) -> int:
208    def get_usage(self, user_id: str, limit: Optional[Limit] = None) -> int:
209        """
210        Ottiene l'uso corrente per un utente nella finestra temporale corrente.
211        
212        Parameters
213        ----------
214        user_id : str
215            ID dell'utente
216        limit : Limit, optional
217            Limite specifico. Se None, restituisce l'uso totale per tutti i limiti.
218            
219        Returns
220        -------
221        int
222            Numero di richieste nella finestra temporale corrente
223        """
224        if limit is None:
225            # Restituisce l'uso totale per tutti i limiti
226            total_usage = 0
227            for limit_obj in self._limits:
228                total_usage += self.get_usage(user_id, limit_obj)
229            return total_usage
230        
231        window_start = self._get_current_window_start(limit)
232        
233        with self._get_connection() as conn:
234            cursor = conn.execute("""
235                SELECT usage_count FROM rate_limits 
236                WHERE user_id = ? AND limit_name = ? AND window_start = ?
237            """, (user_id, limit.name, window_start))
238            
239            result = cursor.fetchone()
240            return result['usage_count'] if result else 0

Ottiene l'uso corrente per un utente nella finestra temporale corrente.

Parameters
  • user_id (str): ID dell'utente
  • limit (Limit, optional): Limite specifico. Se None, restituisce l'uso totale per tutti i limiti.
Returns
  • int: Numero di richieste nella finestra temporale corrente
def get_remaining( self, user_id: str, limit: Optional[Limit] = None) -> int:
242    def get_remaining(self, user_id: str, limit: Optional[Limit] = None) -> int:
243        """
244        Ottiene il numero di richieste rimanenti per un utente.
245        
246        Parameters
247        ----------
248        user_id : str
249            ID dell'utente
250        limit : Limit, optional
251            Limite specifico. Se None, restituisce il minimo tra tutti i limiti.
252            
253        Returns
254        -------
255        int
256            Numero di richieste rimanenti
257        """
258        if limit is None:
259            # Restituisce il minimo rimanente tra tutti i limiti
260            min_remaining = float('inf')
261            for limit_obj in self._limits:
262                remaining = self.get_remaining(user_id, limit_obj)
263                min_remaining = min(min_remaining, remaining)
264            return int(min_remaining) if min_remaining != float('inf') else 0
265        
266        current_usage = self.get_usage(user_id, limit)
267        return max(0, limit.value - current_usage)

Ottiene il numero di richieste rimanenti per un utente.

Parameters
  • user_id (str): ID dell'utente
  • limit (Limit, optional): Limite specifico. Se None, restituisce il minimo tra tutti i limiti.
Returns
  • int: Numero di richieste rimanenti
def reset_user( self, user_id: str, limit: Optional[Limit] = None) -> bool:
269    def reset_user(self, user_id: str, limit: Optional[Limit] = None) -> bool:
270        """
271        Resetta l'uso per un utente specifico.
272        
273        Parameters
274        ----------
275        user_id : str
276            ID dell'utente
277        limit : Limit, optional
278            Limite specifico. Se None, resetta tutti i limiti per l'utente.
279            
280        Returns
281        -------
282        bool
283            True se il reset è andato a buon fine
284        """
285        with self._get_connection() as conn:
286            if limit is None:
287                conn.execute("DELETE FROM rate_limits WHERE user_id = ?", (user_id,))
288            else:
289                conn.execute("DELETE FROM rate_limits WHERE user_id = ? AND limit_name = ?", 
290                           (user_id, limit.name))
291            conn.commit()
292            return True

Resetta l'uso per un utente specifico.

Parameters
  • user_id (str): ID dell'utente
  • limit (Limit, optional): Limite specifico. Se None, resetta tutti i limiti per l'utente.
Returns
  • bool: True se il reset è andato a buon fine
def get_stats( self, user_id: Optional[str] = None, limit: Optional[Limit] = None) -> Dict[str, Any]:
294    def get_stats(self, user_id: Optional[str] = None, limit: Optional[Limit] = None) -> Dict[str, Any]:
295        """
296        Ottiene statistiche di utilizzo.
297        
298        Parameters
299        ----------
300        user_id : str, optional
301            ID dell'utente specifico. Se None, restituisce statistiche globali.
302        limit : Limit, optional
303            Limite specifico. Se None, restituisce statistiche per tutti i limiti.
304            
305        Returns
306        -------
307        Dict[str, Any]
308            Statistiche di utilizzo
309        """
310        with self._get_connection() as conn:
311            if user_id:
312                if limit:
313                    # Statistiche per utente e limite specifico
314                    cursor = conn.execute("""
315                        SELECT 
316                            COUNT(*) as total_windows,
317                            SUM(usage_count) as total_usage,
318                            AVG(usage_count) as avg_usage,
319                            MAX(usage_count) as max_usage
320                        FROM rate_limits 
321                        WHERE user_id = ? AND limit_name = ?
322                    """, (user_id, limit.name))
323                else:
324                    # Statistiche per utente specifico (tutti i limiti)
325                    cursor = conn.execute("""
326                        SELECT 
327                            COUNT(*) as total_windows,
328                            SUM(usage_count) as total_usage,
329                            AVG(usage_count) as avg_usage,
330                            MAX(usage_count) as max_usage
331                        FROM rate_limits 
332                        WHERE user_id = ?
333                    """, (user_id,))
334            else:
335                if limit:
336                    # Statistiche globali per limite specifico
337                    cursor = conn.execute("""
338                        SELECT 
339                            COUNT(DISTINCT user_id) as unique_users,
340                            COUNT(*) as total_windows,
341                            SUM(usage_count) as total_usage,
342                            AVG(usage_count) as avg_usage,
343                            MAX(usage_count) as max_usage
344                        FROM rate_limits
345                        WHERE limit_name = ?
346                    """, (limit.name,))
347                else:
348                    # Statistiche globali (tutti i limiti)
349                    cursor = conn.execute("""
350                        SELECT 
351                            COUNT(DISTINCT user_id) as unique_users,
352                            COUNT(*) as total_windows,
353                            SUM(usage_count) as total_usage,
354                            AVG(usage_count) as avg_usage,
355                            MAX(usage_count) as max_usage
356                        FROM rate_limits
357                    """)
358            
359            result = cursor.fetchone()
360            return dict(result) if result else {}

Ottiene statistiche di utilizzo.

Parameters
  • user_id (str, optional): ID dell'utente specifico. Se None, restituisce statistiche globali.
  • limit (Limit, optional): Limite specifico. Se None, restituisce statistiche per tutti i limiti.
Returns
  • Dict[str, Any]: Statistiche di utilizzo
def cleanup(self) -> int:
362    def cleanup(self) -> int:
363        """
364        Pulisce le finestre temporali vecchie dal database.
365        
366        Returns
367        -------
368        int
369            Numero di record rimossi
370        """
371        with self._get_connection() as conn:
372            self._cleanup_old_windows(conn)
373            conn.commit()
374            return 0  # Il conteggio è gestito in _cleanup_old_windows

Pulisce le finestre temporali vecchie dal database.

Returns
  • int: Numero di record rimossi
def get_limits(self) -> List[Limit]:
376    def get_limits(self) -> List[Limit]:
377        """
378        Restituisce la lista dei limiti configurati.
379        
380        Returns
381        -------
382        List[Limit]
383            Lista dei limiti
384        """
385        return self._limits.copy()

Restituisce la lista dei limiti configurati.

Returns
  • List[Limit]: Lista dei limiti
def get_limit_by_name(self, name: str) -> Optional[Limit]:
387    def get_limit_by_name(self, name: str) -> Optional[Limit]:
388        """
389        Restituisce un limite specifico per nome.
390        
391        Parameters
392        ----------
393        name : str
394            Nome del limite
395            
396        Returns
397        -------
398        Limit, optional
399            Il limite se trovato, None altrimenti
400        """
401        for limit in self._limits:
402            if limit.name == name:
403                return limit
404        return None

Restituisce un limite specifico per nome.

Parameters
  • name (str): Nome del limite
Returns
  • Limit, optional: Il limite se trovato, None altrimenti
def get_usage_by_limit(self, user_id: str) -> Dict[str, int]:
406    def get_usage_by_limit(self, user_id: str) -> Dict[str, int]:
407        """
408        Restituisce l'uso per ogni limite per un utente specifico.
409        
410        Parameters
411        ----------
412        user_id : str
413            ID dell'utente
414            
415        Returns
416        -------
417        Dict[str, int]
418            Dizionario con nome limite -> uso corrente
419        """
420        usage_by_limit = {}
421        for limit in self._limits:
422            usage_by_limit[limit.name] = self.get_usage(user_id, limit)
423        return usage_by_limit

Restituisce l'uso per ogni limite per un utente specifico.

Parameters
  • user_id (str): ID dell'utente
Returns
  • Dict[str, int]: Dizionario con nome limite -> uso corrente
def get_remaining_by_limit(self, user_id: str) -> Dict[str, int]:
425    def get_remaining_by_limit(self, user_id: str) -> Dict[str, int]:
426        """
427        Restituisce le richieste rimanenti per ogni limite per un utente specifico.
428        
429        Parameters
430        ----------
431        user_id : str
432            ID dell'utente
433            
434        Returns
435        -------
436        Dict[str, int]
437            Dizionario con nome limite -> richieste rimanenti
438        """
439        remaining_by_limit = {}
440        for limit in self._limits:
441            remaining_by_limit[limit.name] = self.get_remaining(user_id, limit)
442        return remaining_by_limit

Restituisce le richieste rimanenti per ogni limite per un utente specifico.

Parameters
  • user_id (str): ID dell'utente
Returns
  • Dict[str, int]: Dizionario con nome limite -> richieste rimanenti
def update_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool:
478    def update_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool:
479        """
480        Aggiorna il rate limiter basandosi sulla risposta del modello/agente.
481        Estrae automaticamente i token se presenti nella risposta.
482        
483        Parameters
484        ----------
485        user_id : str
486            ID dell'utente
487        response : Union[str, Dict[str, Any]]
488            Risposta del modello/agente (JSON string o dict)
489            
490        Returns
491        -------
492        bool
493            True se l'aggiornamento è andato a buon fine
494        """
495        # Estrai i token dalla risposta
496        tokens = self._extract_tokens_from_response(response)
497        
498        # Aggiorna ogni limite basandosi sulla sua unità
499        with self._get_connection() as conn:
500            # Pulisci le finestre vecchie
501            self._cleanup_old_windows(conn)
502            
503            for limit in self._limits:
504                window_start = self._get_current_window_start(limit)
505                
506                # Determina l'uso basandosi sull'unità del limite
507                if limit.unit == "token":
508                    usage = tokens
509                else:  # request
510                    usage = 1
511                
512                # Inserisci o aggiorna l'uso per questa finestra e limite
513                conn.execute("""
514                    INSERT INTO rate_limits (user_id, limit_name, window_start, usage_count)
515                    VALUES (?, ?, ?, ?)
516                    ON CONFLICT(user_id, limit_name, window_start) 
517                    DO UPDATE SET usage_count = usage_count + ?
518                """, (user_id, limit.name, window_start, usage, usage))
519            
520            conn.commit()
521            return True

Aggiorna il rate limiter basandosi sulla risposta del modello/agente. Estrae automaticamente i token se presenti nella risposta.

Parameters
  • user_id (str): ID dell'utente
  • response (Union[str, Dict[str, Any]]): Risposta del modello/agente (JSON string o dict)
Returns
  • bool: True se l'aggiornamento è andato a buon fine
def check_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool:
523    def check_with_response(self, user_id: str, response: Union[str, Dict[str, Any]]) -> bool:
524        """
525        Controlla se un utente può fare una richiesta basandosi sulla risposta del modello/agente.
526        Estrae automaticamente i token se presenti nella risposta.
527        
528        Parameters
529        ----------
530        user_id : str
531            ID dell'utente
532        response : Union[str, Dict[str, Any]]
533            Risposta del modello/agente (JSON string o dict)
534            
535        Returns
536        -------
537        bool
538            True se l'utente può fare richieste, False se ha superato almeno un limite
539        """
540        # Estrai i token dalla risposta
541        tokens = self._extract_tokens_from_response(response)
542        
543        # Verifica ogni limite
544        for limit in self._limits:
545            current_usage = self.get_usage(user_id, limit)
546            
547            # Determina l'uso aggiuntivo basandosi sull'unità del limite
548            if limit.unit == "token":
549                additional_usage = tokens
550            else:  # request
551                additional_usage = 1
552            
553            # Verifica se l'uso aggiuntivo supererebbe il limite
554            if current_usage + additional_usage > limit.value:
555                return False
556        
557        return True

Controlla se un utente può fare una richiesta basandosi sulla risposta del modello/agente. Estrae automaticamente i token se presenti nella risposta.

Parameters
  • user_id (str): ID dell'utente
  • response (Union[str, Dict[str, Any]]): Risposta del modello/agente (JSON string o dict)
Returns
  • bool: True se l'utente può fare richieste, False se ha superato almeno un limite
@dataclass
class Limit:
10@dataclass
11class Limit:
12    """
13    Rappresenta un singolo limite di rate limiting.
14    
15    Attributes
16    ----------
17    unit : str
18        Unità di misura ('token', 'request')
19    value : int
20        Numero massimo di unità consentite nel periodo di reset
21    reset_unit : str
22        Unità di tempo per il reset ('second', 'minute', 'hour', 'day')
23    reset_value : int
24        Valore numerico per l'unità di tempo (es. 1 per "1 giorno")
25    name : str, optional
26        Nome identificativo per il limite (default: auto-generato)
27    """
28    unit: str
29    value: int
30    reset_unit: str
31    reset_value: int
32    name: Optional[str] = None
33    
34    def __post_init__(self):
35        """Validazione e generazione automatica del nome."""
36        if self.unit not in ['token', 'request']:
37            raise ValueError("Unit must be one of: 'token', 'request'")
38        
39        if self.reset_unit not in ['second', 'minute', 'hour', 'day']:
40            raise ValueError("Reset_unit must be one of: 'second', 'minute', 'hour', 'day'")
41        
42        if self.reset_value <= 0:
43            raise ValueError("Reset_value must be a positive integer")
44        
45        if self.value <= 0:
46            raise ValueError("Value must be a positive integer")
47        
48        # Genera nome automatico se non fornito
49        if self.name is None:
50            self.name = f"{self.value}_{self.unit}_{self.reset_value}_{self.reset_unit}"
51    
52    def get_time_window_seconds(self) -> int:
53        """Converte l'unità di tempo in secondi."""
54        time_windows = {
55            'second': 1,
56            'minute': 60,
57            'hour': 3600,
58            'day': 86400
59        }
60        return time_windows[self.reset_unit] * self.reset_value
61    
62    def __repr__(self) -> str:
63        return f"Limit(name='{self.name}', unit='{self.unit}', value={self.value}, reset_unit='{self.reset_unit}', reset_value={self.reset_value})"

Rappresenta un singolo limite di rate limiting.

Attributes
  • unit (str): Unità di misura ('token', 'request')
  • value (int): Numero massimo di unità consentite nel periodo di reset
  • reset_unit (str): Unità di tempo per il reset ('second', 'minute', 'hour', 'day')
  • reset_value (int): Valore numerico per l'unità di tempo (es. 1 per "1 giorno")
  • name (str, optional): Nome identificativo per il limite (default: auto-generato)
Limit( unit: str, value: int, reset_unit: str, reset_value: int, name: Optional[str] = None)
unit: str
value: int
reset_unit: str
reset_value: int
name: Optional[str] = None
def get_time_window_seconds(self) -> int:
52    def get_time_window_seconds(self) -> int:
53        """Converte l'unità di tempo in secondi."""
54        time_windows = {
55            'second': 1,
56            'minute': 60,
57            'hour': 3600,
58            'day': 86400
59        }
60        return time_windows[self.reset_unit] * self.reset_value

Converte l'unità di tempo in secondi.