o
    i>                  	   @  s  d Z ddlmZ ddlZddlmZmZ ddlmZm	Z	m
Z
 ddlmZmZ ddlmZ ddlmZmZmZmZmZmZmZ dd	lmZmZmZ dd
lmZmZ ddlm Z m!Z! ddl"m#Z#m$Z$m%Z% ddl&m'Z' ddl(m)Z)m*Z*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1 ddl2m3Z3 ddl4m5Z5m6Z6m7Z7 ddl8m9Z9m:Z:m;Z;m<Z< ddl=m>Z>m?Z? ddl@mAZAmBZBmCZCmDZDmEZEmFZFmGZG ddlHmIZImJZJmKZKmLZLmMZMmNZN ddlOmPZPmQZQmRZR ddlSmTZTmUZU ddlVmWZWmXZX edddZYedddZZedddZ[edddZ\edddZ]ed eWeeef dd!Z^ed"ddZ_ed#ddZ`ed$ddZaed%ddZbed&d'G d(d) d)eeYeZe`eaf ZcdOd0d1ZddPd4d5ZedPd6d7ZfdQd;d<ZgdRd?d@ZhdSdAdBZidTdDdEZjdUdIdJZkdVdMdNZldS )WzGraph builder for constructing executable graph definitions.

This module provides the GraphBuilder class and related utilities for
constructing typed, executable graph definitions with steps, joins,
decisions, and edge routing.
    )annotationsN)Counterdefaultdict)AsyncIterableCallableIterable)	dataclassreplace)NoneType)AnyGenericLiteralcast
get_originget_type_hintsoverload)NeverTypeAliasTypeTypeVar)_utils
exceptions)UNSETUnset)DecisionDecisionBranchDecisionBranchBuilder)Graph)ForkIDJoinIDNodeIDgenerate_placeholder_node_idreplace_placeholder_id)JoinJoinNodeReducerFunction)build_mermaid_graph)EndNodeFork	StartNode)AnyDestinationNodeAnyNodeDestinationNode
SourceNode)
ParentForkParentForkFinder)BroadcastMarkerDestinationMarkerEdgePathEdgePathBuilder	MapMarkerPathPathBuilder)NodeStepStepStepContextStepFunctionStepNodeStreamFunction)TypeOrTypeExpressionget_callable_nameunpack_type_expression)GraphBuildingErrorGraphValidationError)BaseNodeEndStateTT)infer_varianceDepsTInputTOutputTSourceTSourceNodeT)boundrD   SourceOutputTGraphInputTGraphOutputTTF)initc                   @  sR  e Zd ZU dZded< 	 ded< 	 ded< 	 ded	< 	 d
ed< 	 ded< 	 ded< 	 ded< 	 ded< 	 edeeee	f e	fdZ
edeeeef efdZdeeeedddddZedddZedd!d"Zeddd#dd'd(Zeddd#dd,d(Z	dddd#dd/d(Zeddd#dd1d2Zeddd#dd5d2Ze	dddd#dd8d2Z	dddd#dd9d2Zeddd:d;ddDdEZeddd:d;ddHdEZeeddd:dIddLdEZddPdQZddRddWdXZdddddYddbdcZddgdhZdddiddldmZddnddsdtZddnddwdxZdd{d|ZdddZ dddZ!ddddZ"dS )GraphBuildera  A builder for constructing executable graph definitions.

    GraphBuilder provides a fluent interface for defining nodes, edges, and
    routing in a graph workflow. It supports typed state, dependencies, and
    input/output validation.

    Type Parameters:
        StateT: The type of the graph state
        DepsT: The type of the dependencies
        GraphInputT: The type of the graph input data
        GraphOutputT: The type of the graph output data
    
str | NonenameTypeOrTypeExpression[StateT]
state_typeTypeOrTypeExpression[DepsT]	deps_type!TypeOrTypeExpression[GraphInputT]
input_type"TypeOrTypeExpression[GraphOutputT]output_typeboolauto_instrumentdict[NodeID, AnyNode]_nodesdict[NodeID, list[Path]]_edges_by_sourceint_decision_indexSource)type_paramsDestinationNT)rR   rT   rV   rX   rZ   r\   c                C  sV   || _ || _|| _|| _|| _|| _i | _tt| _	d| _
tt  | _tt  | _dS )a  Initialize a graph builder.

        Args:
            name: Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
            state_type: The type of the graph state
            deps_type: The type of the dependencies
            input_type: The type of the graph input data
            output_type: The type of the graph output data
            auto_instrument: Whether to automatically create instrumentation spans
           N)rR   rT   rV   rX   rZ   r\   r^   r   listr`   rb   r(   rL   _start_noder&   rM   	_end_node)selfrR   rT   rV   rX   rZ   r\    rk   g/var/www/html/karishye-ai-python/venv/lib/python3.10/site-packages/pydantic_graph/beta/graph_builder.py__init__m   s   
zGraphBuilder.__init__returnStartNode[GraphInputT]c                 C     | j S )z}Get the start node for the graph.

        Returns:
            The start node that receives the initial graph input
        )rh   rj   rk   rk   rl   
start_node      zGraphBuilder.start_nodeEndNode[GraphOutputT]c                 C  rp   )zxGet the end node for the graph.

        Returns:
            The end node that produces the final graph output
        )ri   rq   rk   rk   rl   end_node   rs   zGraphBuilder.end_node)node_idlabelrv   rw   ^Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]c                C     d S Nrk   rj   rv   rw   rk   rk   rl   step   s   zGraphBuilder.stepcall,StepFunction[StateT, DepsT, InputT, OutputT]$Step[StateT, DepsT, InputT, OutputT]c                C  ry   rz   rk   rj   r}   rv   rw   rk   rk   rl   r|         3StepFunction[StateT, DepsT, InputT, OutputT] | NoneStep[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]c                  sL   |du rd	 fdd}|S pt |tttttf t| d}|S )
a  Create a step from a step function.

        This method can be used as a decorator or called directly to create
        a step node from an async function.

        Args:
            call: The step function to wrap
            node_id: Optional ID for the node
            label: Optional human-readable label

        Returns:
            Either a Step instance or a decorator function
        Nfuncr~   rn   r   c                      j |  dS Nr}   rv   rw   r|   r   rw   rv   rj   rk   rl   	decorator      z$GraphBuilder.step.<locals>.decorator)idr}   rw   )r   r~   rn   r   )r=   r7   rC   rE   rF   rG   r   )rj   r}   rv   rw   r   r|   rk   r   rl   r|      s   oCallable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]c                C  ry   rz   rk   r{   rk   rk   rl   stream   s   zGraphBuilder.stream.StreamFunction[StateT, DepsT, InputT, OutputT]3Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]c                C  ry   rz   rk   r   rk   rk   rl   r      r   5StreamFunction[StateT, DepsT, InputT, OutputT] | NoneStep[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]c                C  ry   rz   rk   r   rk   rk   rl   r      s   c                  s<    du rdfdd}|S d fd
d}j |dS )a  Create a step from an async iterator (which functions like a "stream").

        This method can be used as a decorator or called directly to create
        a step node from an async function.

        Args:
            call: The step function to wrap
            node_id: Optional ID for the node
            label: Optional human-readable label

        Returns:
            Either a Step instance or a decorator function
        Nr   r   rn   r   c                   r   r   )r   r   r   rk   rl   r     r   z&GraphBuilder.stream.<locals>.decoratorctx"StepContext[StateT, DepsT, InputT]c                   s
    | S rz   rk   )r   )r}   rk   rl   wrapper  s   z$GraphBuilder.stream.<locals>.wrapperr   )r   r   rn   r   )r   r   r   )rj   r}   rv   rw   r   r   rk   )r}   rw   rv   rj   rl   r      s
   farthest)rv   parent_fork_idpreferred_parent_forkreducer/ReducerFunction[StateT, DepsT, InputT, OutputT]initialrG   r   r   Literal['farthest', 'closest']$Join[StateT, DepsT, InputT, OutputT]c                C  ry   rz   rk   )rj   r   r   rv   r   r   rk   rk   rl   join!     	zGraphBuilder.joininitial_factoryCallable[[], OutputT]c                C  ry   rz   rk   )rj   r   r   rv   r   r   rk   rk   rl   r   +  r   )r   r   rv   r   r   OutputT | UnsetCallable[[], OutputT] | Unsetc                  sj   |t u r
 fdd}tttttf tt|ptt	||t
tg tf ||d ur0t||dS d |dS )Nc                     s    S rz   rk   rk   r   rk   rl   <lambda>A  s    z#GraphBuilder.join.<locals>.<lambda>)r   r   r   r   r   )r   r"   rC   rE   rF   rG   r   r   r    r=   r   r   r   )rj   r   r   r   rv   r   r   rk   r   rl   r   6  s   
edgesEdgePath[StateT, DepsT]Nonec           	   	     s  dfddd fdd t t  g |D ]&}|jD ]}| j|j |j q!|jD ]} | q6|j qD ];}t	|t
rQt	|trRqEtt }t|j|d	d
}z|d }W n	 typ   Y qEw ||}|dur| qEdS )zAdd one or more edge paths to the graph.

        This method processes edge paths and automatically creates any necessary
        fork nodes for broadcasts and maps.

        Args:
            *edges: The edge paths to add to the graph
        pr4   c                   s   | j D ]G}t|tr,tttf |jddd}| |jD ]} tg |j d qqt|t	rDtttf |jd|j
d}| qt|trJ	 qdS )zvProcess a path and create necessary fork nodes.

            Args:
                p: The path to process
            FNr   is_mapdownstream_join_iditemsT)r   
isinstancer/   r'   r   fork_id_insert_nodepathsr4   r3   r   r0   )r   itemnew_nodepath)_handle_pathrj   rk   rl   r   V  s   





z&GraphBuilder.add.<locals>._handle_pathdr)   c                   sn   t | v rd S t |  |  |  t| tr3| jD ]}|j |jD ]} | q+q!d S d S rz   )	r   addappendr   r   r   branchesr   destinations)r   branchd2_handle_destination_noder   destination_idsr   rj   rk   rl   r   h  s   






z2GraphBuilder.add.<locals>._handle_destination_nodeTlocalnsinclude_extrasrn   N)r   r4   )r   r)   )setra   sourcesr   r`   r   r   r   r   r   r7   r6   r   get_parent_namespaceinspectcurrentframer   r}   KeyError_edge_from_return_hintr   )	rj   r   edgesource_nodedestination_nodedestinationparent_namespace
type_hintsreturn_hintrk   r   rl   r   L  s4   






zGraphBuilder.add)rw   source	Source[T]r   Destination[T]c                C  s0   |  |}|dur||}| || dS )zAdd a simple edge between two nodes.

        Args:
            source: The source node
            destination: The destination node
            label: Optional label for the edge
        N)	edge_fromrw   r   to)rj   r   r   rw   builderrk   rk   rl   add_edge  s   

zGraphBuilder.add_edge)pre_map_labelpost_map_labelr   r   Source[Iterable[T]]map_tor   r   r   ForkID | Noner   JoinID | Nonec                C  sP   |  |}|dur||}|j||d}|dur||}| || dS )ax  Add an edge that maps iterable data across parallel paths.

        Args:
            source: The source node that produces iterable data
            map_to: The destination node that receives individual items
            pre_map_label: Optional label before the map operation
            post_map_label: Optional label after the map operation
            fork_id: Optional ID for the fork node produced for this map operation
            downstream_join_id: Optional ID of a join node that will always be downstream of this map.
                Specifying this ensures correct handling if you try to map an empty iterable.
        N)r   r   )r   rw   mapr   r   )rj   r   r   r   r   r   r   r   rk   rk   rl   add_mapping_edge  s   


zGraphBuilder.add_mapping_edger   Source[SourceOutputT]-EdgePathBuilder[StateT, DepsT, SourceOutputT]c                 G  s   t tttf |tg ddS )zCreate an edge path builder starting from the given source nodes.

        Args:
            *sources: The source nodes to start the edge path from

        Returns:
            An EdgePathBuilder for constructing the complete edge path
        working_items)r   path_builder)r2   rC   rE   rK   r5   )rj   r   rk   rk   rl   r     s   	
zGraphBuilder.edge_from)noterv   r   Decision[StateT, DepsT, Never]c                C  s   t t|ptdg |dS )a  Create a new decision node.

        Args:
            note: Optional note to describe the decision logic
            node_id: Optional ID for the node produced for this decision logic

        Returns:
            A new Decision node with no branches
        decisionr   r   r   )r   r   r    )rj   r   rv   rk   rk   rl   r     s   
zGraphBuilder.decision)matchesTypeOrTypeExpression[SourceT]r   Callable[[Any], bool] | None=DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]c                C  sH   t td}ttttf |g dd}ttttf g d}t||||dS )a  Create a decision branch matcher.

        Args:
            source: The type or type expression to match against
            matches: Optional custom matching function

        Returns:
            A DecisionBranchBuilder for constructing the branch
        match_decisionNr   r   )r   r   r   r   )	r   r    r   rC   rE   r   r5   rH   r   )rj   r   r   rv   r   new_path_builderrk   rk   rl   match  s   zGraphBuilder.matchtype[SourceNodeT]DecisionBranch[SourceNodeT]c                C  s,   t |}tt|jgd}t||||gdS )a~  Create a decision branch for BaseNode subclasses.

        This is similar to match() but specifically designed for matching
        against BaseNode types from the v1 system.

        Args:
            source: The BaseNode subclass to match against
            matches: Optional custom matching function

        Returns:
            A DecisionBranch for the BaseNode type
        r   )r   r   r   r   )r6   r4   r0   r   r   )rj   r   r   noder   rk   rk   rl   
match_node  s   zGraphBuilder.match_node	node_type+type[BaseNode[StateT, DepsT, GraphOutputT]]c              
   C  s   t t }t|j|dd}z|d }W n ty+ } z
td| d|d}~ww t	|}| 
||}|sAtd| d|S )a  Create an edge path from a BaseNode class.

        This method integrates v1-style BaseNode classes into the v2 graph
        system by analyzing their type hints and creating appropriate edges.

        Args:
            node_type: The BaseNode subclass to integrate

        Returns:
            An EdgePath representing the node and its connections

        Raises:
            GraphSetupError: If the node type is missing required type hints
        Tr   rn   Node z2 is missing a return type hint on its `run` methodN)r   r   r   r   r   runr   r   GraphSetupErrorr6   r   )rj   r   r   r   r   er   r   rk   rk   rl   r     s"   
zGraphBuilder.noder   r*   c                 C  sr   | j |j}|du r|| j |j< dS t|tr%t|tr%|j|ju r%dS ||ur7td|jd| d| dS )zInsert a node into the graph, checking for ID conflicts.

        Args:
            node: The node to insert

        Raises:
            ValueError: If a different node with the same ID already exists
        Nz%All nodes must have unique node IDs. z was the ID for z and )r^   getr   r   r6   r   r?   )rj   r   existingrk   rk   rl   r   #  s   	 zGraphBuilder._insert_nodeSourceNode[StateT, DepsT, Any]r   TypeOrTypeExpression[Any]EdgePath[StateT, DepsT] | Nonec                 C  s  g }t |}|D ]}t |\}}t|p|}|tu r#|| j q	|tu r0t	d| d|t
u r\ttttttf dB tdd |D d}|du rVt	d| d|| q	|tu rttttttf dB tdd |D d}	|	du rt	d| d||	 q	t|rt|tr|t| q	t|t|k rdS | |}
t|d	kr|
|d
 S |  }|D ]}|| t|}q|
|S )a  Create edges from a return type hint.

        This method analyzes return type hints from step functions or node methods
        to automatically create appropriate edges in the graph.

        Args:
            node: The source node
            return_hint: The return type hint to analyze

        Returns:
            An EdgePath if edges can be inferred, None otherwise

        Raises:
            GraphSetupError: If the return type hint is invalid or incomplete
        r   z return type hint includes a plain `BaseNode`. Edge inference requires each possible returned `BaseNode` subclass to be listed explicitly.Nc                 s      | ]
}t |tr|V  qd S rz   )r   r7   .0ark   rk   rl   	<genexpr>W      z6GraphBuilder._edge_from_return_hint.<locals>.<genexpr>z return type hint includes a `StepNode` without a `Step` annotation. When returning `my_step.as_node()`, use `Annotated[StepNode[StateT, DepsT], my_step]` as the return type hint.c                 s  r  rz   r   r"   r  rk   rk   rl   r  b  r  z return type hint includes a `JoinNode` without a `Join` annotation. When returning `my_join.as_node()`, use `Annotated[JoinNode[StateT, DepsT], my_join]` as the return type hint.rf   r   )r   get_union_argsunpack_annotatedr   rB   r   ru   rA   r   r   r:   r   r7   rC   rE   r   nextr#   r"   r   isclass
issubclassr6   lenr   r   r   r   r   r
   )rj   r   r   r   
union_argsreturn_typer   return_type_originr|   r   r   r   r   rk   rk   rl   r   6  sV   





z#GraphBuilder._edge_from_return_hintvalidate_graph_structure/Graph[StateT, DepsT, GraphInputT, GraphOutputT]c                 C  s   | j }| j}t||\}}t||\}}t||\}}|r"t|| t||}t||}tt	t
ttf | jt| jt| jt| jt| j||||| jd
S )a^  Build the final executable graph from the accumulated nodes and edges.

        This method performs validation, normalization, and analysis of the graph
        structure to create a complete, executable graph instance.

        Args:
            validate_graph_structure: whether to perform validation of the graph structure
                See the docstring of _validate_graph_structure below for more details.

        Returns:
            A complete Graph instance ready for execution

        Raises:
            ValueError: If the graph structure is invalid (e.g., join without parent fork)
        )
rR   rT   rV   rX   rZ   nodesedges_by_sourceparent_forksintermediate_join_nodesr\   )r^   r`   _replace_placeholder_node_ids_flatten_paths_normalize_forks_validate_graph_structure_collect_dominating_forks _compute_intermediate_join_nodesr   rC   rE   rL   rM   rR   r>   rT   rV   rX   rZ   r\   )rj   r  r  r  r  r  rk   rk   rl   build|  s*   


zGraphBuilder.build)rR   rQ   rT   rS   rV   rU   rX   rW   rZ   rY   r\   r[   )rn   ro   )rn   rt   )rv   rQ   rw   rQ   rn   rx   )r}   r~   rv   rQ   rw   rQ   rn   r   rz   )r}   r   rv   rQ   rw   rQ   rn   r   )rv   rQ   rw   rQ   rn   r   )r}   r   rv   rQ   rw   rQ   rn   r   )r}   r   rv   rQ   rw   rQ   rn   r   )r   r   r   rG   rv   rQ   r   rQ   r   r   rn   r   )r   r   r   r   rv   rQ   r   rQ   r   r   rn   r   )r   r   r   r   r   r   rv   rQ   r   rQ   r   r   rn   r   )r   r   rn   r   )r   r   r   r   rw   rQ   rn   r   )r   r   r   r   r   rQ   r   rQ   r   r   r   r   rn   r   )r   r   rn   r   )r   rQ   rv   rQ   rn   r   )r   r   r   r   rn   r   )r   r   r   r   rn   r   )r   r   rn   r   )r   r*   rn   r   )r   r   r   r   rn   r   )T)r  r[   rn   r  )#__name__
__module____qualname____doc____annotations__r   r,   rC   rE   rG   rc   r+   rF   re   r
   rm   propertyrr   ru   r   r|   r   r   r   r   r   r   r   r   r   r   r   r   r   r  rk   rk   rk   rl   rP   @   s   
 %	&)	
B


$
FrP   r  r]   r  r_   rn   r   c                   s
  d}t   d fdd}| D ]}|D ]}|| qq|  D ]}t|tr2|jD ]}||j q*q |tjg }|sBt	d| t
j vrMt	d	| g }	|  D ]+\}
}t|t
r]qS|
|v oht||
 d
k}t|trw|pvt|jd
k}|s~|	|
 qS|	rt	d|	 d| tjhtjg}|r| }||g D ]}|jD ]}t|tr|jvr|j ||j qq| |}t|tr|jD ]}|jjD ]}t|tr|jvr|j ||j qq|sfdd| D }|rt	d| d| dS )a  Validate the graph structure for common issues.

    This function raises an error if any of the following criteria are not met:
    1. There are edges from the start node
    2. There are edges to the end node
    3. No non-End node is a dead end (no outgoing edges)
    4. The end node is reachable from the start node
    5. All nodes are reachable from the start node

    Note 1: Under some circumstances it may be reasonable to build a graph that violates one or more of
    the above conditions. We may eventually add support for more granular control over validation,
    but today, if you want to build a graph that violates any of these assumptions you need to pass
    `validate_graph_structure=False` to the call to `GraphBuilder.build`.

    Note 2: Some of the earlier items in the above list are redundant with the later items.
    I've included the earlier items in the list as a reminder to ourselves if/when we add more granular validation
    because you might want to check the earlier items but not the later items, as described in Note 1.

    Args:
        nodes: The nodes in the graph
        edges_by_source: The edges by source node

    Raises:
        GraphBuildingError: If any of the aforementioned structural issues are found.
    z If this is intentional, you can suppress this error by passing `validate_graph_structure=False` to the call to `GraphBuilder.build`.r   r4   rn   r   c                   s&   | j D ]}t|tr |j qd S rz   )r   r   r0   r   destination_id)r   r   )all_destinationsrk   rl   _collect_destinations_from_path  s
   

zB_validate_graph_structure.<locals>._collect_destinations_from_pathz+The graph has no edges from the start node.z'The graph has no edges to the end node.r   z,The following nodes have no outgoing edges: .c                   s   g | ]}| vr|qS rk   rk   )r  rv   )	reachablerk   rl   
<listcomp>  s    z-_validate_graph_structure.<locals>.<listcomp>z;The following nodes are not reachable from the start node: N)r   r4   rn   r   )r   valuesr   r   r   r   r   r(   r   r@   r&   r   r  r   popr0   r$  r   )r  r  how_to_suppressr&  r   r   r   r   start_edgesdead_end_nodesrv   	has_edgesto_visit
current_idr   current_nodeunreachable_nodesrk   )r%  r(  rl   r    st   














r  r   6tuple[dict[NodeID, AnyNode], dict[NodeID, list[Path]]]c                   s      }tt}g }d fdd}| D ]}t|tr1|jD ]}||j\}}	||_||	 qq|	 D ]\}
}|D ]	}|
|
|f q<q6|rc| \}
}||\}}	||
 
| ||	 |sI|t|fS )Nr   r4   rn   &tuple[Path, list[tuple[NodeID, Path]]]c                   s   t | jD ]e\} t tr= jv sJ dtt| jd | t jg }t| j|d d  }| j|fgf  S t trj jv sKJ dtt| jd | t jg }| fdd j	D f  S q| g fS )Nz?This should have been added to the node during GraphBuilder.addrf   c                   s   g | ]} j |fqS rk   )r   r  r   r   rk   rl   r)  &      z@_flatten_paths.<locals>._split_at_first_fork.<locals>.<listcomp>)
	enumerater   r   r3   r   r4   rg   r0   r/   r   )r   iupstream
downstreamr  r7  rl   _split_at_first_fork  s   
"
"z,_flatten_paths.<locals>._split_at_first_fork)r   r4   rn   r5  )copyr   rg   r*  r   r   r   r   extendr   r   r+  dict)r  r   	new_nodes	new_edgespaths_to_handler>  r   r   r;  downstreams	source_idedges_from_sourcer   rk   r=  rl   r    s,   


r  c           	      C  s   |   }i }g }| D ]N\}}|| | | }t|tr&|js&|||< qt|dkr1|||< qtttf tt	|j
 dddd}|||j
< tt|j
gdg||< |||j
< q||fS )a  Normalize the graph structure so only broadcast forks have multiple outgoing edges.

    This function ensures that any node with multiple outgoing edges is converted
    to use an explicit broadcast fork, simplifying the graph execution model.

    Args:
        nodes: The nodes in the graph
        edges: The edges by source node

    Returns:
        A tuple of normalized nodes and edges
    rf   _broadcast_forkFNr   r   )r?  r   r@  r   r'   r   r  r   r   r   r   r4   r0   )	r  r   rB  rC  rD  rF  rG  r   new_forkrk   rk   rl   r  =  s"   
&
r  graph_nodesgraph_edges_by_source dict[JoinID, ParentFork[NodeID]]c                   s"  t | }tjh}tt t |}|D ];}|}| |}t|tr&||j d fdd}t|t	r?|j
D ]}	||	j| q5q||g D ]}
||
| qEqt||| d}dd	 |  D }i }|D ]+}|j|j|j|jd
kd}|du rt| | }td| d|jd|||j< qc|S )a  Find the dominating fork for each join node in the graph.

    This function analyzes the graph structure to find the parent fork that
    dominates each join node, which is necessary for proper synchronization
    during graph execution.

    Args:
        graph_nodes: All nodes in the graph
        graph_edges_by_source: Edges organized by source node

    Returns:
        A mapping from join IDs to their parent fork information

    Raises:
        ValueError: If any join node lacks a dominating fork
    r   r4   last_source_idr   c                   s0   | j D ]}t|tr | |j  dS qdS )zProcess a path and collect edges and fork information.

            Args:
                path: The path to process
                last_source_id: The current source node ID
            N)r   r   r0   r   r$  )r   rM  r   r   rk   rl   r     s   

z/_collect_dominating_forks.<locals>._handle_path)r  	start_idsfork_idsr   c                 S  s   g | ]	}t |tr|qS rk   r  )r  r   rk   rk   rl   r)    s    z-_collect_dominating_forks.<locals>.<listcomp>closest)r   prefer_closestNa  A node in the graph is missing a dominating fork.

For every Join J in the graph, there must be a Fork F between the StartNode and J satisfying:
* Every path from the StartNode to J passes through F
* There are no cycles in the graph including J that don't pass through F.
In this case, F is called a "dominating fork" for J.

This is used to determine when all tasks upstream of this Join are complete and we can proceed with execution.

Mermaid diagram:
z

Join z4 in this graph has no dominating fork in this graph.)r   r4   rM  r   )r   r(   r   r   rg   r   r   r'   r   r   r   r   r.   r*  find_parent_forkr   r   r%   renderr?   )rJ  rK  r  rO  rP  rF  working_source_idr   r   r   r   finderjoinsdominating_forksr   dominating_forkrendered_mermaid_graphrk   rN  rl   r  c  sL   





r  r  dict[JoinID, set[JoinID]]c                 C  sZ   i }|  D ]$\}}tt  }|jD ]}| |}t|tr%|t| q|||< q|S )ag  Compute which joins have other joins as intermediate nodes.

    A join J1 is an intermediate node of join J2 if J1 appears in J2's intermediate_nodes
    (as computed relative to J2's parent fork).

    This information is used to determine:
    1. Which joins are "final" (have no other joins in their intermediate_nodes)
    2. When selecting which reducer to proceed with when there are no active tasks

    Args:
        nodes: All nodes in the graph
        parent_forks: Parent fork information for each join

    Returns:
        A mapping from each join to the set of joins that are intermediate to it
    )r   r   r   intermediate_nodesr   r   r"   r   )r  r  r  join_idparent_forkintermediate_joinsintermediate_node_idintermediate_noderk   rk   rl   r    s   




r  c                   s<   t |   fdd|  D } fdd| D }||fS )Nc                   s$   i | ]\}}  ||t| qS rk   )r   _update_node_with_id_remapping)r  rR   r   node_id_remappingrk   rl   
<dictcomp>  s    z1_replace_placeholder_node_ids.<locals>.<dictcomp>c                   s,   i | ]\}}  || fd d|D qS )c                      g | ]}t | qS rk   _update_path_with_id_remappingr6  rc  rk   rl   r)    r8  z<_replace_placeholder_node_ids.<locals>.<dictcomp>.<listcomp>)r   )r  r   r   rc  rk   rl   re    s    )$_build_placeholder_node_id_remappingr   )r  r  replaced_nodesreplaced_edges_by_sourcerk   rc  rl   r    s   

r  dict[NodeID, NodeID]c                 C  sh   t t  }i }|  D ]&}t|}||krq|| d  ||< }t|dkr,| d| n|||< q|S )ac  The determinism of the generated remapping here is dependent on the determinism of the ordering of the `nodes` dict.

    Note: If we want to generate more interesting names, we could try to make use of information about the edges
    into/out of the relevant nodes. I'm not sure if there's a good use case for that though so I didn't bother for now.
    rf   _)r   strkeysr!   r   )r  counter	remappingrv   replaced_node_idcountrk   rk   rl   ri    s   
$ri  r   r*   rd  c                   s   t | tr | j| j| _| S t | tr"t | j| j| _| S t | trDt | j| j| _| jd urBt | j| j| _| S t | t	r] | j| j| _ fdd| j
D | _
| S )Nc                   s    g | ]}t |t|j d qS ))r   )r	   rh  r   )r  r   rc  rk   rl   r)    s    z2_update_node_with_id_remapping.<locals>.<listcomp>)r   r7   r   r   r"   r   r'   r   r   r   r   )r   rd  rk   rc  rl   rb    s"   





rb  r   r4   c                   s   | j D ]M}t|tr&|j}|d urt |||_t |j|j|_qt|trBt |j|j|_ fdd|j	D |_	qt|t
rP |j|j|_q| S )Nc                   rf  rk   rg  r6  rc  rk   rl   r)    r8  z2_update_path_with_id_remapping.<locals>.<listcomp>)r   r   r3   r   r   r   r   r   r/   r   r0   r$  )r   rd  r   r   rk   rc  rl   rh    s   



rh  )r  r]   r  r_   rn   r   )r  r]   r   r_   rn   r4  )rJ  r]   rK  r_   rn   rL  )r  r]   r  rL  rn   r[  )r  r]   r  r_   )r  r]   rn   rl  )r   r*   rd  rl  rn   r*   )r   r4   rd  rl  rn   r4   )mr!  
__future__r   r   collectionsr   r   collections.abcr   r   r   dataclassesr   r	   typesr
   typingr   r   r   r   r   r   r   typing_extensionsr   r   r   pydantic_graphr   r   pydantic_graph._utilsr   r   pydantic_graph.beta.decisionr   r   r   pydantic_graph.beta.graphr   pydantic_graph.beta.id_typesr   r   r   r    r!   pydantic_graph.beta.joinr"   r#   r$   pydantic_graph.beta.mermaidr%   pydantic_graph.beta.noder&   r'   r(   pydantic_graph.beta.node_typesr)   r*   r+   r,    pydantic_graph.beta.parent_forksr-   r.   pydantic_graph.beta.pathsr/   r0   r1   r2   r3   r4   r5   pydantic_graph.beta.stepr6   r7   r8   r9   r:   r;   pydantic_graph.beta.utilr<   r=   r>   pydantic_graph.exceptionsr?   r@   pydantic_graph.nodesrA   rB   rC   rE   rF   rG   rH   rI   rK   rL   rM   rN   rP   r  r  r  r  r  r  ri  rb  rh  rk   rk   rk   rl   <module>   sb    $$ 	    
h
n
*
&
V
"

