router.workers
- class agentopera.router.workers.ChatAgent(name: str)[source]
Bases:
RoutedAgent
A specialized agent for conducting chat via an external API.
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages, performing deep research and streaming responses.
- class agentopera.router.workers.VercelAIAgent(name: str, endpoint_url: str)[source]
Bases:
RoutedAgent
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
- class agentopera.router.workers.MessariAIAgent(name: str, endpoint_url: str = 'https://api.messari.io/ai/v1')[source]
Bases:
RoutedAgent
A RoutedAgent that utilizes Messari AI’s streaming API.
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages and streams AI responses from Messari with chunk tracking. - First chunk → TextMessage (source=”user”) - Middle chunks → ModelClientStreamingChunkEvent - Last chunk → StopMessage
- class agentopera.router.workers.CryptoSentimentAgent(name: str)[source]
Bases:
RoutedAgent
A specialized agent for conducting crypto sentiment analysis via an external API.
- name
init the agent instance
- async handle_message(message: TextMessage, ctx: MessageContext) None [source]
Handles messages, performing deep research and streaming responses.
- class agentopera.router.workers.FinanceAnalysisFlowAgent(name: str, custom_url: str)[source]
Bases:
BaseFlowAgent
A specific flow agent for finance analysis.
- class agentopera.router.workers.PaperFlowAgent(name: str, custom_url: str)[source]
Bases:
BaseFlowAgent
A specific flow agent for academic paper search functionality.
- class agentopera.router.workers.CryptoMarketAgent(name: str)[source]
Bases:
RoutedAgent
A specialized agent for conducting historical market data via an external API.
- name
init the agent instance
- async get_market_data(stock_ticket_symbol: str) List[Dict[str, Any]] [source]
Get market data for a given symbol.
- Parameters:
stock_ticket_symbol (str) – The stock ticker symbol to fetch data for.
- Returns:
Dict with market cap
Dict of historical data, where each key is a range and each value is a list of {timestamp, price}
- Return type:
List[Dict[str, Any]]
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages, performing deep research and streaming responses.
- class agentopera.router.workers.CryptoTrendAgent(name: str)[source]
Bases:
RoutedAgent
A specialized agent for conducting crypto trend analysis via an external API.
- name
init the agent instance
- async get_sorted_crypto_data(number: str = '10', sort_by: str = 'market_cap', sort_dir: str = 'desc', price_min: float | None = None, price_max: float | None = None, market_cap_min: float | None = None, market_cap_max: float | None = None, volume_24h_min: float | None = None, volume_24h_max: float | None = None, percent_change_24h_min: float | None = None, percent_change_24h_max: float | None = None) List[Dict[str, float]] [source]
Fetches and sorts cryptocurrency data from CoinMarketCap.
- Parameters:
number (str) – The number of cryptocurrencies to retrieve. Must be an integer, default is 10.
sort_by (str) – The variable to sort by. Must be one of the supported CoinMarketCap sort parameters.
following (The valid input should be one of the) – “name” “symbol” “date_added” “market_cap” “market_cap_strict”
"percent_change_7d" ("price" "circulating_supply" "total_supply" "max_supply" "num_market_pairs" "volume_24h" "percent_change_1h" "percent_change_24h") –
"volume_30d" ("market_cap_by_total_supply_strict" "volume_7d") –
sort_dir (str) – The direction to sort. Must be either ‘asc’ or ‘desc’.
price_min (float) – The minimum price to filter by.
price_max (float) – The maximum price to filter by.
market_cap_min (float) – The minimum market cap to filter by.
market_cap_max (float) – The maximum market cap to filter by.
volume_24h_min (float) – The minimum volume to filter by.
volume_24h_max (float) – The maximum volume to filter by.
percent_change_24h_min (float) – The minimum percent change to filter by.
percent_change_24h_max (float) – The maximum percent change to filter by.
- Returns:
A list of dicts with ‘name’ and the selected sorted variable.
- Return type:
List[Dict[str, Any]]
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages, performing deep research and streaming responses.
- class agentopera.router.workers.CailaAgent(name: str)[source]
Bases:
DeveloperAgent
Caila agent for weather information.
- async message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages and streams response chunks.
- async fetch_and_transform_data(user_query: str, session_id: str, ctx: MessageContext) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, None] [source]
Fetch data from the Caila agent.
- class agentopera.router.workers.MemeTrendAgent(name: str)[source]
Bases:
RoutedAgent
A specialized agent for conducting meme trend analysis via an external API.
- name
init the agent instance
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages, performing deep research and streaming responses.
- class agentopera.router.workers.OnchainTrendAgent(name: str)[source]
Bases:
RoutedAgent
A specialized agent for conducting onchain trend analysis via an external API.
- name
init the agent instance
- async my_message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages, performing deep research and streaming responses.
- class agentopera.router.workers.MessariChatAgent[source]
Bases:
DeveloperAgent
- async yield_initial_message(session_id: str, is_first_message: bool) tuple[list[TextMessage], bool] [source]
- async handle_metadata(metadata: dict, session_id: str, is_first_message: bool) tuple[list[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], bool] [source]
- async fetch_and_transform_data(user_query: str, session_id: str, ctx: MessageContext) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, None] [source]
Streams responses from an external API.
- async message_handler(message: TextMessage, ctx: MessageContext) None [source]
Handles messages and streams response chunks.