Description
What happened?
Describe the bug
When use Customized ChatAgentContainer, the factory function failed because it must match exactly the same as ChatAgentContainer
To Reproduce
test python file
from typing import Callable, List, Optional
from autogen_core.model_context import ChatCompletionContext
from autogen_agentchat.teams._group_chat._selector_group_chat import SelectorGroupChatManager
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.teams._group_chat._events import (
GroupChatAgentResponse,
GroupChatMessage,
GroupChatTermination
)
from autogen_core import MessageContext, event
import asyncio
from autogen_agentchat.base import ChatAgent, TerminationCondition
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.messages import BaseAgentEvent, BaseTextChatMessage, BaseChatMessage
from autogen_agentchat.teams._group_chat._chat_agent_container import ChatAgentContainer
from autogen_agentchat.teams._group_chat._selector_group_chat import SelectorFuncType, CandidateFuncType
from autogen_agentchat.messages import (
MessageFactory,
ModelClientStreamingChunkEvent,
)
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import ChatCompletionClient
import asyncio
from autogen_agentchat.ui import Console
from autogen_core.models import ModelFamily
import os
class UISelectorGroupChatManager(SelectorGroupChatManager):
def __init__(
self,
name: str,
group_topic_type: str,
output_topic_type: str,
participant_topic_types: List[str],
participant_names: List[str],
participant_descriptions: List[str],
output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination],
termination_condition: TerminationCondition | None,
max_turns: int | None,
message_factory: MessageFactory,
model_client: ChatCompletionClient,
selector_prompt: str,
allow_repeated_speaker: bool,
selector_func: Optional[SelectorFuncType],
max_selector_attempts: int,
candidate_func: Optional[CandidateFuncType],
emit_team_events: bool,
model_context: ChatCompletionContext | None,
model_client_streaming: bool = False,
) -> None:
super().__init__(
name,
group_topic_type,
output_topic_type,
participant_topic_types,
participant_names,
participant_descriptions,
output_message_queue,
termination_condition,
max_turns,
message_factory,
model_client,
selector_prompt,
allow_repeated_speaker,
selector_func,
max_selector_attempts,
candidate_func,
emit_team_events,
model_context,
model_client_streaming,
)
#RECORD first parameter name must be message
@event
async def handle_group_chat_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
"""Handle a start event by sending the response to the user."""
await super().handle_group_chat_message(message, ctx)
inner_message : BaseAgentEvent | BaseTextChatMessage = message.message
if isinstance(inner_message, BaseTextChatMessage):
#call chainlit to send message
pass
elif isinstance(inner_message, ModelClientStreamingChunkEvent):
#call chainlit to stream token
pass
elif isinstance(inner_message, BaseAgentEvent):
pass
@event
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
"""Handle an agent response event by passing the messages in the buffer
to the delegate agent and publish the response."""
await super().handle_agent_response(message, ctx)
#call chainlit to send message
pass
class UISelectorGroupChatAgentChatContainer(ChatAgentContainer):
"""A container for a graph flow agent chat."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@event
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
"""Handle a request event by passing the messages in the buffer
to the delegate agent and publish the response."""
await super().handle_agent_response(message, ctx)
class UISelectorGroupChat(SelectorGroupChat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _create_group_chat_manager_factory(
self,
name: str,
group_topic_type: str,
output_topic_type: str,
participant_topic_types: List[str],
participant_names: List[str],
participant_descriptions: List[str],
output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination],
termination_condition: TerminationCondition | None,
max_turns: int | None,
message_factory: MessageFactory,
) -> Callable[[], SelectorGroupChatManager]:
return lambda: UISelectorGroupChatManager(
name,
group_topic_type,
output_topic_type,
participant_topic_types,
participant_names,
participant_descriptions,
output_message_queue,
termination_condition,
max_turns,
message_factory,
self._model_client,
self._selector_prompt,
self._allow_repeated_speaker,
self._selector_func,
self._max_selector_attempts,
self._candidate_func,
self._emit_team_events,
self._model_context,
self._model_client_streaming,
)
def _create_participant_factory(
self,
parent_topic_type: str,
output_topic_type: str,
agent: ChatAgent,
message_factory: MessageFactory
) -> Callable[[], ChatAgent]:
return lambda: \
UISelectorGroupChatAgentChatContainer(parent_topic_type, output_topic_type, agent, message_factory)
async def main():
# uncomment this to use openai model
# model_client = OpenAIChatCompletionClient(
# model="gpt-4o-mini"
# )
model_client = model_client = OpenAIChatCompletionClient(
model="gemini-2.5-flash-preview-04-17",
base_url="https://generativelanguage.googleapis.com/v1beta/",
api_key=os.getenv("GEMINI_API_KEY"),
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"family": ModelFamily.GEMINI_2_5_FLASH,
"structured_output": True,
}
)
human = UserProxyAgent(
name="human"
)
agent = AssistantAgent(
name="assistant",
system_message="You are a helpful assistant",
description="a helpful assistant",
model_client=model_client,
model_client_stream=True,
)
groupchat = UISelectorGroupChat(
participants=[human, agent],
model_client=model_client,
selector_prompt="""You must switch to human when the last input is assistant,
and must switch to human when the last input is human""",
allow_repeated_speaker=False,
max_selector_attempts=3,
)
await Console(groupchat.run_stream())
model_client.close()
pass
if __name__ == "__main__":
asyncio.run(main())
Expected behavior
No error raised
Screenshots
If applicable, add screenshots to help explain your problem.
Additional context
raised error
Traceback (most recent call last):
File "e:\ZenWayne\AgentFusion\test\reproduce.py", line 200, in <module>
asyncio.run(main())
File "C:\Users\73448\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\73448\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\73448\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "e:\ZenWayne\AgentFusion\test\reproduce.py", line 195, in main
await Console(groupchat.run_stream())
File "E:\microsoft\autogen\python\packages\autogen-agentchat\src\autogen_agentchat\ui\_console.py", line 117, in Console
async for message in stream:
File "E:\microsoft\autogen\python\packages\autogen-agentchat\src\autogen_agentchat\teams\_group_chat\_base_group_chat.py", line 499, in run_stream
await self._runtime.send_message(
File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 385, in send_message
return await future
^^^^^^^^^^^^
File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 487, in _process_send
recipient_agent = await self._get_agent(recipient)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py" File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 983, in _get_agent
, line 983, in _get_agent
agent = await self._invoke_agent_factory(agent_factory, agent_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 962, in _invoke_agent_factory
agent = cast(T, await agent)
^^^^^^^^^^^
File "E:\microsoft\autogen\python\packages\autogen-core\src\autogen_core\_single_threaded_agent_runtime.py", line 907, in factory_wrapper
raise ValueError("Factory registered using the wrong type.")
ValueError: Factory registered using the wrong type.
Which packages was the bug in?
Python Core (autogen-core)
AutoGen library version.
Python dev (main branch)
Other library version.
No response
Model used
No response
Model provider
None
Other model provider
No response
Python version
None
.NET version
None
Operating system
None