router.workers

class agentopera.router.workers.ChatAgent(name: str)[source]

Bases: RoutedAgent

A specialized agent for conducting chat via an external API.

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages, performing deep research and streaming responses.

class agentopera.router.workers.VercelAIAgent(name: str, endpoint_url: str)[source]

Bases: RoutedAgent

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]
class agentopera.router.workers.MessariAIAgent(name: str, endpoint_url: str = 'https://api.messari.io/ai/v1')[source]

Bases: RoutedAgent

A RoutedAgent that utilizes Messari AI’s streaming API.

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages and streams AI responses from Messari with chunk tracking. - First chunk → TextMessage (source=”user”) - Middle chunks → ModelClientStreamingChunkEvent - Last chunk → StopMessage

class agentopera.router.workers.CryptoSentimentAgent(name: str)[source]

Bases: RoutedAgent

A specialized agent for conducting crypto sentiment analysis via an external API.

name

init the agent instance

async get_sentiment(days: str = '7') str[source]

Get the crypto sentiment of the crypto market

async handle_message(message: TextMessage, ctx: MessageContext) None[source]

Handles messages, performing deep research and streaming responses.

class agentopera.router.workers.FinanceAnalysisFlowAgent(name: str, custom_url: str)[source]

Bases: BaseFlowAgent

A specific flow agent for finance analysis.

class agentopera.router.workers.PaperFlowAgent(name: str, custom_url: str)[source]

Bases: BaseFlowAgent

A specific flow agent for academic paper search functionality.

class agentopera.router.workers.CryptoMarketAgent(name: str)[source]

Bases: RoutedAgent

A specialized agent for conducting historical market data via an external API.

name

init the agent instance

async get_market_data(stock_ticket_symbol: str) List[Dict[str, Any]][source]

Get market data for a given symbol.

Parameters:

stock_ticket_symbol (str) – The stock ticker symbol to fetch data for.

Returns:

  • Dict with market cap

  • Dict of historical data, where each key is a range and each value is a list of {timestamp, price}

Return type:

List[Dict[str, Any]]

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages, performing deep research and streaming responses.

class agentopera.router.workers.CryptoTrendAgent(name: str)[source]

Bases: RoutedAgent

A specialized agent for conducting crypto trend analysis via an external API.

name

init the agent instance

async get_sorted_crypto_data(number: str = '10', sort_by: str = 'market_cap', sort_dir: str = 'desc', price_min: float | None = None, price_max: float | None = None, market_cap_min: float | None = None, market_cap_max: float | None = None, volume_24h_min: float | None = None, volume_24h_max: float | None = None, percent_change_24h_min: float | None = None, percent_change_24h_max: float | None = None) List[Dict[str, float]][source]

Fetches and sorts cryptocurrency data from CoinMarketCap.

Parameters:
  • number (str) – The number of cryptocurrencies to retrieve. Must be an integer, default is 10.

  • sort_by (str) – The variable to sort by. Must be one of the supported CoinMarketCap sort parameters.

  • following (The valid input should be one of the) – “name” “symbol” “date_added” “market_cap” “market_cap_strict”

  • "percent_change_7d" ("price" "circulating_supply" "total_supply" "max_supply" "num_market_pairs" "volume_24h" "percent_change_1h" "percent_change_24h") –

  • "volume_30d" ("market_cap_by_total_supply_strict" "volume_7d") –

  • sort_dir (str) – The direction to sort. Must be either ‘asc’ or ‘desc’.

  • price_min (float) – The minimum price to filter by.

  • price_max (float) – The maximum price to filter by.

  • market_cap_min (float) – The minimum market cap to filter by.

  • market_cap_max (float) – The maximum market cap to filter by.

  • volume_24h_min (float) – The minimum volume to filter by.

  • volume_24h_max (float) – The maximum volume to filter by.

  • percent_change_24h_min (float) – The minimum percent change to filter by.

  • percent_change_24h_max (float) – The maximum percent change to filter by.

Returns:

A list of dicts with ‘name’ and the selected sorted variable.

Return type:

List[Dict[str, Any]]

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages, performing deep research and streaming responses.

class agentopera.router.workers.CailaAgent(name: str)[source]

Bases: DeveloperAgent

Caila agent for weather information.

async message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages and streams response chunks.

async fetch_and_transform_data(user_query: str, session_id: str, ctx: MessageContext) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, None][source]

Fetch data from the Caila agent.

class agentopera.router.workers.MemeTrendAgent(name: str)[source]

Bases: RoutedAgent

A specialized agent for conducting meme trend analysis via an external API.

name

init the agent instance

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages, performing deep research and streaming responses.

class agentopera.router.workers.OnchainTrendAgent(name: str)[source]

Bases: RoutedAgent

A specialized agent for conducting onchain trend analysis via an external API.

name

init the agent instance

async my_message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages, performing deep research and streaming responses.

class agentopera.router.workers.MessariChatAgent[source]

Bases: DeveloperAgent

async yield_initial_message(session_id: str, is_first_message: bool) tuple[list[TextMessage], bool][source]
async handle_metadata(metadata: dict, session_id: str, is_first_message: bool) tuple[list[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], bool][source]
async fetch_and_transform_data(user_query: str, session_id: str, ctx: MessageContext) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, None][source]

Streams responses from an external API.

async message_handler(message: TextMessage, ctx: MessageContext) None[source]

Handles messages and streams response chunks.