o
    i"                     @  s,  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
 d dlmZmZmZ d dlmZmZmZ d dlmZmZ d dlZd dlZd dlmZ d dlmZ d d	lmZ erbd d
lmZ ed3ddZ d4ddZ!eddddd5d%d&Z"e		'	(	d6d7d-d.Z#ed/d0 Z$G d1d2 d2eZ%dS )8    )annotationsN)AsyncGeneratorCallable	Generator)asynccontextmanagercontextmanagersuppress)TYPE_CHECKINGAnyLiteral)parse_qsurlparse)settings)OAuth)find_available_port)FastMCPkwargsr
   c                  k  sr    t t}z#|  D ]
\}}t|| qdV  W | D ]}t||| qdS | D ]}t||| q,w )a  
    Temporarily override FastMCP setting values.

    Args:
        **kwargs: The settings to override, including nested settings.

    Example:
        Temporarily override a setting:
        ```python
        import fastmcp
        from fastmcp.utilities.tests import temporary_settings

        with temporary_settings(log_level='DEBUG'):
            assert fastmcp.settings.log_level == 'DEBUG'
        assert fastmcp.settings.log_level == 'INFO'
        ```
    N)copydeepcopyr   itemsset_settingget_setting)r   old_settingsattrvalue r   ]/var/www/html/karishye-ai-python/venv/lib/python3.10/site-packages/fastmcp/utilities/tests.pytemporary_settings   s   
r   
mcp_serverr   	transportLiteral['sse']portintreturnNonec                 C  sL   |dkr| j dd}ntd| tjtj|d|dddd}|  d S )	Nsse)r   zInvalid transport: 	127.0.0.1errorzwebsockets-sansio)apphostr!   	log_levelws)config)http_app
ValueErroruvicornServerConfigrun)r   r   r!   r(   uvicorn_serverr   r   r   _run_server9   s   	r4   Tr&   )provide_host_and_portr)   r!   	server_fnCallable[..., None]argsr5   boolr)   str
int | NoneGenerator[str, None, None]c          
      o  sb   |du rt  }|r|||dO }tj| ||dd}|  d}d}||k r|| r|z#ttjtj}	|	||f 	 W d   W n@1 sGw   Y  W n& t	ys   |dk r^t
d n|d	k rht
d
 nt
d |d7 }Y nw ||k r|| s*td| dd| d| V  |  |jdd | r|  |jdd | rtddS dS )a  
    Context manager that runs a FastMCP server in a separate process and
    returns the server URL. When the context manager is exited, the server process is killed.

    Args:
        server_fn: The function that runs a FastMCP server. FastMCP servers are
            not pickleable, so we need a function that creates and runs one.
        *args: Arguments to pass to the server function.
        provide_host_and_port: Whether to provide the host and port to the server function as kwargs.
        host: Host to bind the server to (default: "127.0.0.1").
        port: Port to bind the server to (default: find available port).
        **kwargs: Keyword arguments to pass to the server function.

    Returns:
        The server URL.
    N)r)   r!   T)targetr8   r   daemon   r      g?   皙?g?   zServer failed to start after z	 attemptshttp://:)timeout   z2Server process failed to terminate even after kill)r   multiprocessingProcessstartis_alivesocketAF_INETSOCK_STREAMconnectConnectionRefusedErrortimesleepRuntimeError	terminatejoinkill)
r6   r5   r)   r!   r8   r   procmax_attemptsattemptsr   r   r   run_server_in_processK   sJ   $
r[   http/mcpserver)Literal['http', 'streamable-http', 'sse']pathAsyncGenerator[str, None]c                 C s   ddl }|du rt }|dI dH  || j||||dd}|dI dH  z-d| d| | V  W |  t|j |I dH  W d   dS 1 sPw   Y  dS |  t|j |I dH  W d   w 1 spw   Y  w )	a_  
    Start a FastMCP server as an asyncio task for in-process async testing.

    This is the recommended way to test FastMCP servers. It runs the server
    as an async task in the same process, eliminating subprocess coordination,
    sleeps, and cleanup issues.

    Args:
        server: FastMCP server instance
        port: Port to bind to (default: find available port)
        transport: Transport type ("http", "streamable-http", or "sse")
        path: URL path for the server (default: "/mcp")
        host: Host to bind to (default: "127.0.0.1")

    Yields:
        Server URL string

    Example:
        ```python
        import pytest
        from fastmcp import FastMCP, Client
        from fastmcp.client.transports import StreamableHttpTransport
        from fastmcp.utilities.tests import run_server_async

        @pytest.fixture
        async def server():
            mcp = FastMCP("test")

            @mcp.tool()
            def greet(name: str) -> str:
                return f"Hello, {name}!"

            async with run_server_async(mcp) as url:
                yield url

        async def test_greet(server: str):
            async with Client(StreamableHttpTransport(server)) as client:
                result = await client.call_tool("greet", {"name": "World"})
                assert result.content[0].text == "Hello, World!"
        ```
    r   Ng{Gz?F)r)   r!   r   r`   show_bannerrB   rD   rE   )asyncior   rR   create_taskrun_http_asynccancelr   CancelledError)r^   r!   r   r`   r)   rc   server_taskr   r   r   run_server_async   s2   1"ri   c              	   c  sH    |    td}|| j z| V  W || j dS || j w )zWContext manager to capture logs from FastMCP loggers even when propagation is disabled.fastmcpN)clearlogging	getLogger
addHandlerhandlerremoveHandler)caplogloggerr   r   r   caplog_for_fastmcp   s   
rs   c                      s6   e Zd ZdZd fddZdd	d
ZdddZ  ZS )HeadlessOAuthz
    OAuth provider that bypasses browser interaction for testing.

    This simulates the complete OAuth flow programmatically by making HTTP requests
    instead of opening a browser and running a callback server. Useful for automated testing.
    mcp_urlr:   c                   s   d| _ t j|fi | dS )z7Initialize HeadlessOAuth with stored response tracking.N)_stored_responsesuper__init__)selfru   r   	__class__r   r   rx      s   zHeadlessOAuth.__init__authorization_urlr#   r$   c              	     s^   t  4 I dH }|j|ddI dH }|| _W d  I dH  dS 1 I dH s(w   Y  dS )zOMake HTTP request to authorization URL and store response for callback handler.NF)follow_redirects)httpxAsyncClientgetrv   )ry   r|   clientresponser   r   r   redirect_handler   s
   .zHeadlessOAuth.redirect_handlertuple[str, str | None]c           	        s   | j std| j }|jdkrN|jd }t|}t|j}d|v r;|d d }|ddgd }td| d	| |d
 d }|ddgd }||fS td|j )z4Parse stored response and return (auth_code, state).zHNo authorization response stored. redirect_handler must be called first.i.  locationr'   r   error_descriptionzUnknown errorzOAuth authorization failed: z - codestateNzAuthorization failed: )rv   rS   status_codeheadersr   r   queryr   )	ry   r   redirect_urlparsedquery_paramsr'   
error_desc	auth_coder   r   r   r   callback_handler   s(   


zHeadlessOAuth.callback_handler)ru   r:   )r|   r:   r#   r$   )r#   r   )__name__
__module____qualname____doc__rx   r   r   __classcell__r   r   rz   r   rt      s
    
rt   )r   r
   )r   r   r   r    r!   r"   r#   r$   )r6   r7   r8   r
   r5   r9   r)   r:   r!   r;   r   r
   r#   r<   )Nr\   r]   r&   )r^   r   r!   r;   r   r_   r`   r:   r)   r:   r#   ra   )&
__future__r   r   rl   rH   rL   rQ   collections.abcr   r   r   
contextlibr   r   r   typingr	   r
   r   urllib.parser   r   r~   r/   rj   r   fastmcp.client.auth.oauthr   fastmcp.utilities.httpr   fastmcp.server.serverr   r   r4   r[   ri   rs   rt   r   r   r   r   <module>   sD    
 CO
