Source code for agentstr.nostr_agent_server

import asyncio
from collections.abc import Callable
from typing import Any

from pydantic import BaseModel
from pynostr.event import Event

from agentstr.a2a import AgentCard, ChatInput, PriceHandlerResponse, PriceHandler
from agentstr.logger import get_logger
from agentstr.nostr_client import NostrClient
from agentstr.nostr_mcp_client import NostrMCPClient

logger = get_logger(__name__)


[docs] class NoteFilters(BaseModel): """Filters for filtering Nostr notes/events.""" nostr_pubkeys: list[str] | None = None #: Filter by specific public keys nostr_tags: list[str] | None = None #: Filter by specific tags following_only: bool = False #: Only show notes from followed users (not implemented)
[docs] class NostrAgentServer: """Server that integrates an external agent with the Nostr network. Handles direct messages and optional payments, routing them to an external agent. """
[docs] def __init__(self, nostr_client: NostrClient | None = None, nostr_mcp_client: NostrMCPClient | None = None, relays: list[str] | None = None, private_key: str | None = None, nwc_str: str | None = None, agent_info: AgentCard | None = None, agent_callable: Callable[[ChatInput], str] | None = None, note_filters: NoteFilters | None = None, price_handler: PriceHandler | None = None): """Initialize the agent server. Args: nostr_client: Existing NostrClient instance (optional). nostr_mcp_client: Existing NostrMCPClient instance (optional). relays: List of Nostr relay URLs (if no client provided). private_key: Nostr private key (if no client provided). nwc_str: Nostr Wallet Connect string for payments (optional). agent_info: Agent information (optional). agent_callable: Callable to handle agent responses. note_filters: Filters for listening to Nostr notes (optional). price_handler: PriceHandler to use for determining if an agent can handle a request and calculate the cost (optional). """ self.client = nostr_client or (nostr_mcp_client.client if nostr_mcp_client else NostrClient(relays=relays, private_key=private_key, nwc_str=nwc_str)) self.agent_info = agent_info self.agent_callable = agent_callable self.note_filters = note_filters self.price_handler = price_handler
[docs] async def chat(self, message: str, thread_id: str | None = None) -> Any: """Send a message to the agent and retrieve the response. Args: message: The message to send to the agent. thread_id: Optional thread ID for conversation context. Returns: Response from the agent, or an error message. """ return await self.agent_callable(ChatInput(messages=[message], thread_id=thread_id))
async def _handle_paid_invoice(self, event: Event, message: str, invoice: str, price_handler_response: PriceHandlerResponse = None): """Handle a paid invoice.""" if price_handler_response: skills_used = ", ".join(price_handler_response.skills_used) message = f"""I'd like to follow up on our previous exchange: Your Request: {message} Your Response: {price_handler_response.user_message} Could you please proceed with the next steps or provide an update on this matter? Only use the following tools: [{skills_used}] """ logger.info("Handling paid invoice") async def on_success(): logger.info(f"Payment succeeded for {self.agent_info.name}") result = await self.chat(message, thread_id=event.pubkey) response = str(result) logger.debug(f"On success response: {response}") await self.client.send_direct_message(event.pubkey, response) async def on_failure(): response = "Payment failed. Please try again." logger.error(f"On failure response: {response}") await self.client.send_direct_message(event.pubkey, response) await self.client.nwc_relay.on_payment_success( invoice=invoice, callback=on_success, timeout=900, unsuccess_callback=on_failure, ) async def _direct_message_callback(self, event: Event, message: str): """Handle incoming direct messages for agent interaction. Args: event: The Nostr event containing the message. message: The message content. """ if message.strip().startswith("{") or message.strip().startswith("["): logger.debug("Ignoring JSON messages") return elif message.strip().startswith("lnbc") and " " not in message.strip(): logger.debug("Ignoring lightning invoices") return message = message.strip() invoice = None price_handler_response = None logger.debug(f"Agent request: {message}") try: response = None cost_sats = None if self.price_handler: price_handler_response = await self.price_handler.handle(message, self.agent_info, thread_id=event.pubkey) response = price_handler_response.user_message if price_handler_response.can_handle: cost_sats = price_handler_response.cost_sats else: await self.client.send_direct_message(event.pubkey, response) return cost_sats = cost_sats or (self.agent_info.satoshis if self.agent_info else 0) if cost_sats > 0: invoice = await self.client.nwc_relay.make_invoice(amount=cost_sats, description=f"Payment for {self.agent_info.name}") if response is not None: response = f"{response}\n\nPlease pay {cost_sats} sats: {invoice}" else: response = invoice else: result = await self.chat(message, thread_id=event.pubkey) response = str(result) except Exception as e: response = f"Error in direct message callback: {e}" logger.debug(f"Agent response: {response}") tasks = [] tasks.append(self.client.send_direct_message(event.pubkey, response)) if invoice: tasks.append(self._handle_paid_invoice(event, message, invoice, price_handler_response)) await asyncio.gather(*tasks) async def _note_callback(self, event: Event): """Handle incoming notes that match the filters. Args: event: The Nostr event containing the note. """ if not self.price_handler: logger.warning("No price handler provided. Skipping note callback.") return try: content = event.content logger.info(f"Received note from {event.pubkey}: {content}") price_handler_response = await self.price_handler.handle(content, self.agent_info, thread_id=event.pubkey) logger.info(f"Price handler response: {price_handler_response.model_dump()}") if price_handler_response.can_handle: # Formulate and send direct message to the user response = price_handler_response.user_message tasks = [] if price_handler_response.cost_sats > 0: invoice = await self.client.nwc_relay.make_invoice(amount=price_handler_response.cost_sats, description=f"Payment to {self.agent_info.name}") response = f"{response}\n\nPlease pay {price_handler_response.cost_sats} sats: {invoice}" tasks.append(self._handle_paid_invoice(event, content, invoice, price_handler_response)) tasks.append(self.client.send_direct_message(event.pubkey, response, event_ref=event.id)) await asyncio.gather(*tasks) except Exception as e: logger.error(f"Error processing note: {e}", exc_info=True)
[docs] async def start(self): """Start the agent server, updating metadata and listening for direct messages and notes.""" logger.info(f"Updating metadata for {self.client.public_key.bech32()}") if self.agent_info: await self.client.update_metadata( name="agent_server", display_name=self.agent_info.name, about=self.agent_info.model_dump_json(), ) tasks = [] # Start note listener if filters are provided (in new thread) if self.note_filters is not None: logger.info(f"Starting note listener with filters: {self.note_filters.model_dump()}") tasks.append( self.client.note_listener( callback=self._note_callback, pubkeys=self.note_filters.nostr_pubkeys, tags=self.note_filters.nostr_tags, following_only=self.note_filters.following_only, ), ) # Start direct message listener logger.info(f"Starting message listener for {self.client.public_key.bech32()}") tasks.append(self.client.direct_message_listener(callback=self._direct_message_callback)) await asyncio.gather(*tasks)