o
    i>Q                     @  s`  d Z ddlmZ ddlZddlmZ ddlmZmZ ddl	Z	ddl
mZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZmZ ddlmZmZ ddlmZ ddlmZmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&m'Z' e$e(Z)G dd deddZ*G dd deZ+eddddG dd dZ,G dd deZ-G dd deZ.G d d! d!eZ/dS )"z*TokenVerifier implementations for FastMCP.    )annotationsN)	dataclass)Anycast)
JsonWebKeyJsonWebToken)	JoseError)serialization)rsa)
AnyHttpUrl	SecretStrfield_validator)BaseSettingsSettingsConfigDict)	TypedDict)AccessTokenTokenVerifier)ENV_FILEparse_scopes)
get_logger)NotSetNotSetTc                   @  sR   e Zd ZU dZded< ded< ded< ded< ded< ded< d	ed
< ded< dS )JWKDatazJSON Web Key data structure.strktykidusealgne	list[str]x5cx5tN__name__
__module____qualname____doc____annotations__ r*   r*   g/var/www/html/karishye-ai-python/venv/lib/python3.10/site-packages/fastmcp/server/auth/providers/jwt.pyr      s   
 r   F)totalc                   @  s   e Zd ZU dZded< dS )JWKSDataz JSON Web Key Set data structure.zlist[JWKData]keysNr$   r*   r*   r*   r+   r-   (   s   
 r-   T)frozenkw_onlyreprc                   @  sJ   e Zd ZU dZded< ded< edddZ			
					ddddZdS )
RSAKeyPairzRSA key pair for JWT testing.r   private_keyr   
public_keyreturnc                 C  s`   t jddd}|jtjjtjjt d	d}|
 jtjjtjjd	d}| t||dS )zt
        Generate an RSA key pair for testing.

        Returns:
            RSAKeyPair: Generated key pair
        i  i   )public_exponentkey_size)encodingformatencryption_algorithmutf-8)r8   r9   )r3   r4   )r
   generate_private_keyprivate_bytesr	   EncodingPEMPrivateFormatPKCS8NoEncryptiondecoder4   public_bytesPublicFormatSubjectPublicKeyInfor   )clsr3   private_pem
public_pemr*   r*   r+   generate5   s,   		zRSAKeyPair.generatefastmcp-userhttps://fastmcp.example.comN  subjectissueraudiencestr | list[str] | Nonescopeslist[str] | Noneexpires_in_secondsintadditional_claimsdict[str, Any] | Noner   
str | Nonec                 C  s   ddi}|r
||d< ||t t t t | d}	|r!||	d< |r*d||	d< |r1|	| tdg}
|
||	| j }|dS )	a  
        Generate a test JWT token for testing purposes.

        Args:
            subject: Subject claim (usually user ID)
            issuer: Issuer claim
            audience: Audience claim - can be a string or list of strings (optional)
            scopes: List of scopes to include
            expires_in_seconds: Token expiration time in seconds
            additional_claims: Any additional claims to include
            kid: Key ID to include in header
        r   RS256r   )subissiatexpaud scoper;   )	rU   timejoinupdater   encoder3   get_secret_valuerC   )selfrN   rO   rP   rR   rT   rV   r   headerpayloadjwt_libtoken_bytesr*   r*   r+   create_tokenY   s&   



zRSAKeyPair.create_token)r5   r2   )rK   rL   NNrM   NN)rN   r   rO   r   rP   rQ   rR   rS   rT   rU   rV   rW   r   rX   r5   r   )r%   r&   r'   r(   r)   classmethodrJ   rk   r*   r*   r*   r+   r2   .   s   
 %r2   c                   @  s   e Zd ZU dZededdZdZded< dZ	ded< dZ
d	ed
< dZded< dZd	ed< dZded< dZded< edddedd ZdS )JWTVerifierSettingsz$Settings for JWT token verification.FASTMCP_SERVER_AUTH_JWT_ignore)
env_prefixenv_fileextraNrX   r4   jwks_urirQ   rO   	algorithmrP   rS   required_scopeszAnyHttpUrl | str | Nonebase_urlbefore)modec                 C  s   t |S Nr   )rG   vr*   r*   r+   _parse_scopes   s   z!JWTVerifierSettings._parse_scopes)r%   r&   r'   r(   r   r   model_configr4   r)   rs   rO   rt   rP   ru   rv   r   rl   r{   r*   r*   r*   r+   rm      s"   
 
rm   c                      sf   e Zd ZdZeeeeeeedd# fddZd$ddZd%ddZd&ddZd'dd Z	d'd!d"Z
  ZS )(JWTVerifiera  
    JWT token verifier supporting both asymmetric (RSA/ECDSA) and symmetric (HMAC) algorithms.

    This verifier validates JWT tokens using various signing algorithms:
    - **Asymmetric algorithms** (RS256/384/512, ES256/384/512, PS256/384/512):
      Uses public/private key pairs. Ideal for external clients and services where
      only the authorization server has the private key.
    - **Symmetric algorithms** (HS256/384/512): Uses a shared secret for both
      signing and verification. Perfect for internal microservices and trusted
      environments where the secret can be securely shared.

    Use this when:
    - You have JWT tokens issued by an external service (asymmetric)
    - You need JWKS support for automatic key rotation (asymmetric)
    - You have internal microservices sharing a secret key (symmetric)
    - Your tokens contain standard OAuth scopes and claims
    r4   rs   rO   rP   rt   ru   rv   r4   str | NotSetT | Noners   rO    str | list[str] | NotSetT | NonerP   rt   ru   list[str] | NotSetT | Nonerv   !AnyHttpUrl | str | NotSetT | Nonec          	        s   t dd |||||||d D }|js|jstd|jr(|jr(td|jp,d}|dvr9td| d	t j|j	|j
d
 || _|j| _|j| _|j| _|j| _t| jg| _tt| _i | _d| _d| _dS )a  
        Initialize a JWTVerifier configured to validate JWTs using either a static key or a JWKS endpoint.

        Parameters:
            public_key (str | NotSetT | None): PEM-encoded public key for asymmetric algorithms or shared secret for symmetric algorithms.
            jwks_uri (str | NotSetT | None): URI to fetch a JSON Web Key Set; used when verifying tokens with remote JWKS.
            issuer (str | list[str] | NotSetT | None): Expected issuer claim value or list of allowed issuer values.
            audience (str | list[str] | NotSetT | None): Expected audience claim value or list of allowed audience values.
            algorithm (str | NotSetT | None): JWT signing algorithm to accept (default: "RS256"). Supported: HS256/384/512, RS256/384/512, ES256/384/512, PS256/384/512.
            required_scopes (list[str] | NotSetT | None): Scopes that must be present in validated tokens.
            base_url (AnyHttpUrl | str | NotSetT | None): Base URL passed to the parent TokenVerifier.

        Raises:
            ValueError: If neither or both of `public_key` and `jwks_uri` are provided, or if `algorithm` is unsupported.
        c                 S  s   i | ]\}}|t ur||qS r*   )r   ).0krz   r*   r*   r+   
<dictcomp>   s
    	
z(JWTVerifier.__init__.<locals>.<dictcomp>r~   z.Either public_key or jwks_uri must be providedz/Provide either public_key or jwks_uri, not bothrY   >   ES256ES384ES512HS256HS384HS512PS256PS384PS512rY   RS384RS512zUnsupported algorithm: .)rv   ru   r   rM   N)rm   model_validateitemsr4   rs   
ValueErrorrt   super__init__rv   ru   rO   rP   r   jwtr   r%   logger_jwks_cache_jwks_cache_time
_cache_ttl)	rf   r4   rs   rO   rP   rt   ru   rv   settings	__class__r*   r+   r      sD   


zJWTVerifier.__init__tokenr   r5   c              
     s   | j r| j S z1ddl}ddl}|dd }|ddt|d   7 }|||}|d}| |I dH W S  t	yL } zt
d| |d}~ww )z'Get the verification key for the token.r   Nr   =   r   z%Failed to extract key ID from token: )r4   base64jsonsplitlenloadsurlsafe_b64decodeget_get_jwks_key	Exceptionr   )rf   r   r   r   
header_b64rg   r   r    r*   r*   r+   _get_verification_key  s   
z!JWTVerifier._get_verification_keyr   rX   c              
     s  | j stdt }|| j | jk r2|r || jv r | j| S |s2t| jdkr2tt| j	 S zt
 4 I dH }|| j I dH }|  | }W d  I dH  n1 I dH s]w   Y  i | _|dg D ]}|d}t|}| }	|r|	| j|< qk|	| jd< qk|| _|r|| jvr| jd| td| d	| j| W S t| jdkrtt| j	 W S t| jdkrtd
td t
jy }
 ztd|
 |
d}
~
w ty }
 z| jd|
  td|
 |
d}
~
ww )z(Fetch key from JWKS with simple caching.zJWKS URI not configured   Nr.   r   _defaultz-JWKS key lookup failed: key ID '%s' not foundzKey ID 'z' not found in JWKSz2Multiple keys in JWKS but no key ID (kid) in tokenzNo keys found in JWKSzFailed to fetch JWKS: zJWKS fetch failed: )rs   r   ra   r   r   r   r   nextitervalueshttpxAsyncClientr   raise_for_statusr   r   
import_keyget_public_keyr   debug	HTTPErrorr   )rf   r   current_timeclientresponse	jwks_datakey_datakey_kidjwkr4   r    r*   r*   r+   r      s\   

(


zJWTVerifier._get_jwks_keyclaimsdict[str, Any]r!   c                 C  sN   dD ]"}||v r$t || tr||    S t || tr$||   S qg S )z
        Extract scopes from JWT claims. Supports both 'scope' and 'scp'
        claims.

        Checks the `scope` claim first (standard OAuth2 claim), then the `scp`
        claim (used by some Identity Providers).
        )r`   scp)
isinstancer   r   list)rf   r   claimr*   r*   r+   _extract_scopes^  s   zJWTVerifier._extract_scopesAccessToken | Nonec              
     sF  z|  |I dH }| j||}|dp!|dp!|dp!d}|d}|r@|t k r@| jd| | jd| W dS | jrn|d	}d
}t	| jt
rV|| jv }n|| jk}|sn| jd| | jd| W dS | jr|d d
}t	| jt
rt	 t
rt fdd| jD }n tt
| jv }nt	 t
r| j v }n | jk}|s| jd| | jd| W dS | |}	| jrt|	}
t| j}||
s| jd|
| | jd| W dS t|t||	|rt|nd|dW S  ty   | jd Y dS  ty" } z| jdt| W Y d}~dS d}~ww )a  
        Validate a JWT bearer token and return an AccessToken when the token is valid.

        Parameters:
            token (str): The JWT bearer token string to validate.

        Returns:
            AccessToken | None: An AccessToken populated from token claims if the token is valid; `None` if the token is expired, has an invalid signature or format, fails issuer/audience/scope validation, or any other validation error occurs.
        N	client_idazprZ   unknownr]   z4Token validation failed: expired token for client %sz#Bearer token rejected for client %sr[   Fz6Token validation failed: issuer mismatch for client %sr^   c                 3  s    | ]}| v V  qd S ry   r*   )r   expectedr^   r*   r+   	<genexpr>  s    
z0JWTVerifier.load_access_token.<locals>.<genexpr>z8Token validation failed: audience mismatch for client %sz4Token missing required scopes. Has: %s, Required: %sr   r   rR   
expires_atr   z5Token validation failed: JWT signature/format invalidzToken validation failed: %s)r   r   rC   r   ra   r   r   inforO   r   r   rP   anyr   r   ru   setissubsetr   r   rU   r   r   )rf   r   verification_keyr   r   r]   r[   issuer_validaudience_validrR   token_scopesru   r    r*   r   r+   load_access_tokeno  s   












zJWTVerifier.load_access_tokenc                   s   |  |I dH S )a\  
        Verify a bearer token and return access info if valid.

        This method implements the TokenVerifier protocol by delegating
        to our existing load_access_token method.

        Args:
            token: The JWT token string to validate

        Returns:
            AccessToken object if valid, None if invalid or expired
        N)r   )rf   r   r*   r*   r+   verify_token  s   zJWTVerifier.verify_token)r4   r   rs   r   rO   r   rP   r   rt   r   ru   r   rv   r   )r   r   r5   r   )r   rX   r5   r   )r   r   r5   r!   r   r   r5   r   )r%   r&   r'   r(   r   r   r   r   r   r   r   __classcell__r*   r*   r   r+   r}      s    
T

>
vr}   c                      s0   e Zd ZdZ	dd fddZdddZ  ZS )StaticTokenVerifiera  
    Simple static token verifier for testing and development.

    This verifier validates tokens against a predefined dictionary of valid token
    strings and their associated claims. When a token string matches a key in the
    dictionary, the verifier returns the corresponding claims as if the token was
    validated by a real authorization server.

    Use this when:
    - You're developing or testing locally without a real OAuth server
    - You need predictable tokens for automated testing
    - You want to simulate different users/scopes without complex setup
    - You're prototyping and need simple API key-style authentication

    WARNING: Never use this in production - tokens are stored in plain text!
    Ntokensdict[str, dict[str, Any]]ru   rS   c                   s   t  j|d || _dS )a  
        Initialize the static token verifier.

        Args:
            tokens: Dict mapping token strings to token metadata
                   Each token should have: client_id, scopes, expires_at (optional)
            required_scopes: Required scopes for all tokens
        )ru   N)r   r   r   )rf   r   ru   r   r*   r+   r     s   
zStaticTokenVerifier.__init__r   r   r5   r   c                   s   | j |}|sdS |d}|dur|t k rdS |dg }| jr@t|}t| j}||s@td| d|  dS t||d |||dS )z-Verify token against static token dictionary.Nr   rR   z$Token missing required scopes. Has: z, Required: r   r   )	r   r   ra   ru   r   r   r   r   r   )rf   r   
token_datar   rR   r   ru   r*   r*   r+   r     s.   


z StaticTokenVerifier.verify_tokenry   )r   r   ru   rS   r   )r%   r&   r'   r(   r   r   r   r*   r*   r   r+   r     s
    r   )0r(   
__future__r   ra   dataclassesr   typingr   r   r   authlib.joser   r   authlib.jose.errorsr   cryptography.hazmat.primitivesr	   )cryptography.hazmat.primitives.asymmetricr
   pydanticr   r   r   pydantic_settingsr   r   typing_extensionsr   fastmcp.server.authr   r   fastmcp.settingsr   fastmcp.utilities.authr   fastmcp.utilities.loggingr   fastmcp.utilities.typesr   r   r%   r   r   r-   r2   rm   r}   r   r*   r*   r*   r+   <module>   s8    _  R