Skip to content

Customized ChatAgentContainer not support #6730

Closed
@ZenWayne

Description

@ZenWayne

What happened?

Describe the bug
When use Customized ChatAgentContainer, the factory function failed because it must match exactly the same as ChatAgentContainer

To Reproduce
test python file

from typing import Callable, List, Optional
from autogen_core.model_context import ChatCompletionContext
from autogen_agentchat.teams._group_chat._selector_group_chat import SelectorGroupChatManager
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.teams._group_chat._events import ( 
    GroupChatAgentResponse, 
    GroupChatMessage,
    GroupChatTermination
)
from autogen_core import MessageContext, event
import asyncio
from autogen_agentchat.base import ChatAgent, TerminationCondition
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.messages import BaseAgentEvent, BaseTextChatMessage, BaseChatMessage
from autogen_agentchat.teams._group_chat._chat_agent_container import ChatAgentContainer
from autogen_agentchat.teams._group_chat._selector_group_chat import SelectorFuncType, CandidateFuncType
from autogen_agentchat.messages import (
    MessageFactory,  
    ModelClientStreamingChunkEvent, 
)
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import ChatCompletionClient
import asyncio
from autogen_agentchat.ui import Console
from autogen_core.models import ModelFamily
import os

class UISelectorGroupChatManager(SelectorGroupChatManager):

    def __init__(
        self,
        name: str,
        group_topic_type: str,
        output_topic_type: str,
        participant_topic_types: List[str],
        participant_names: List[str],
        participant_descriptions: List[str],
        output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination],
        termination_condition: TerminationCondition | None,
        max_turns: int | None,
        message_factory: MessageFactory,
        model_client: ChatCompletionClient,
        selector_prompt: str,
        allow_repeated_speaker: bool,
        selector_func: Optional[SelectorFuncType],
        max_selector_attempts: int,
        candidate_func: Optional[CandidateFuncType],
        emit_team_events: bool,
        model_context: ChatCompletionContext | None,
        model_client_streaming: bool = False,
    ) -> None:
        super().__init__(
            name,
            group_topic_type,
            output_topic_type,
            participant_topic_types,
            participant_names,
            participant_descriptions,
            output_message_queue,
            termination_condition,
            max_turns,
            message_factory,
            model_client,
            selector_prompt,
            allow_repeated_speaker,
            selector_func,
            max_selector_attempts,
            candidate_func,
            emit_team_events,
            model_context,
            model_client_streaming,
        )

    #RECORD first parameter name must be message
    @event
    async def handle_group_chat_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
        """Handle a start event by sending the response to the user."""
        await super().handle_group_chat_message(message, ctx)
        inner_message : BaseAgentEvent | BaseTextChatMessage = message.message
        if isinstance(inner_message, BaseTextChatMessage):
            #call chainlit to send message
            pass
        elif isinstance(inner_message, ModelClientStreamingChunkEvent):
            #call chainlit to stream token
            pass
        elif isinstance(inner_message, BaseAgentEvent):
            pass
    
    @event
    async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
        """Handle an agent response event by passing the messages in the buffer
        to the delegate agent and publish the response."""
        await super().handle_agent_response(message, ctx)
        #call chainlit to send message
        pass


class UISelectorGroupChatAgentChatContainer(ChatAgentContainer):
    """A container for a graph flow agent chat."""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    @event
    async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
        """Handle a request event by passing the messages in the buffer
        to the delegate agent and publish the response."""
        await super().handle_agent_response(message, ctx)

class UISelectorGroupChat(SelectorGroupChat):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _create_group_chat_manager_factory(
            self,
            name: str,
            group_topic_type: str,
            output_topic_type: str,
            participant_topic_types: List[str],
            participant_names: List[str],
            participant_descriptions: List[str],
            output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination],
            termination_condition: TerminationCondition | None,
            max_turns: int | None,
            message_factory: MessageFactory,
        ) -> Callable[[], SelectorGroupChatManager]:
        return lambda: UISelectorGroupChatManager(
            name,
            group_topic_type,
            output_topic_type,
            participant_topic_types,
            participant_names,
            participant_descriptions,
            output_message_queue,
            termination_condition,
            max_turns,
            message_factory,
            self._model_client,
            self._selector_prompt,
            self._allow_repeated_speaker,
            self._selector_func,
            self._max_selector_attempts,
            self._candidate_func,
            self._emit_team_events,
            self._model_context,
            self._model_client_streaming,
        )

    def _create_participant_factory(        
            self,
            parent_topic_type: str,
            output_topic_type: str,
            agent: ChatAgent,
            message_factory: MessageFactory
        ) -> Callable[[], ChatAgent]:
        return lambda: \
            UISelectorGroupChatAgentChatContainer(parent_topic_type, output_topic_type, agent, message_factory)
    

async def main():
    # uncomment this to use openai model
    # model_client = OpenAIChatCompletionClient(
    #     model="gpt-4o-mini"
    #     )
    model_client = model_client = OpenAIChatCompletionClient(
            model="gemini-2.5-flash-preview-04-17",
            base_url="https://generativelanguage.googleapis.com/v1beta/",
            api_key=os.getenv("GEMINI_API_KEY"),
             model_info={
                "vision": False,
                "function_calling": True,
                "json_output": True,
                "family": ModelFamily.GEMINI_2_5_FLASH,
                "structured_output": True,
            }
    )
    human = UserProxyAgent(
        name="human"
    )
    agent = AssistantAgent(
        name="assistant",
        system_message="You are a helpful assistant",
        description="a helpful assistant",
        model_client=model_client,
        model_client_stream=True,
    )
    
    groupchat = UISelectorGroupChat(
        participants=[human, agent],
        model_client=model_client,
        selector_prompt="""You must switch to human when the last input is assistant, 
and must switch to human when the last input is human""",
        allow_repeated_speaker=False,
        max_selector_attempts=3,
    )
    await Console(groupchat.run_stream())
    model_client.close()
    pass

if __name__ == "__main__":
    asyncio.run(main())

Expected behavior
No error raised

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
raised error

Traceback (most recent call last):
  File "e:\ZenWayne\AgentFusion\test\reproduce.py", line 200, in <module>
    asyncio.run(main())
  File "C:\Users\73448\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run     
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\73448\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run     
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\73448\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "e:\ZenWayne\AgentFusion\test\reproduce.py", line 195, in main
    await Console(groupchat.run_stream())
  File "E:\microsoft\autogen\python\packages\autogen-agentchat\src\autogen_agentchat\ui\_console.py", line 117, in Console
    async for message in stream:
  File "E:\microsoft\autogen\python\packages\autogen-agentchat\src\autogen_agentchat\teams\_group_chat\_base_group_chat.py", line 499, in run_stream
    await self._runtime.send_message(
  File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 385, in send_message
    return await future
           ^^^^^^^^^^^^
  File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 487, in _process_send
    recipient_agent = await self._get_agent(recipient)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py"  File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 983, in _get_agent
, line 983, in _get_agent
    agent = await self._invoke_agent_factory(agent_factory, agent_id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 962, in _invoke_agent_factory
    agent = cast(T, await agent)
                    ^^^^^^^^^^^
  File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 907, in factory_wrapper
    raise ValueError("Factory registered using the wrong type.")
ValueError: Factory registered using the wrong type.

Which packages was the bug in?

Python Core (autogen-core)

AutoGen library version.

Python dev (main branch)

Other library version.

No response

Model used

No response

Model provider

None

Other model provider

No response

Python version

None

.NET version

None

Operating system

None

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions