import asyncio
import json
from collections.abc import Callable
from typing import Any
from mcp.server.fastmcp.exceptions import ToolError
from mcp.server.fastmcp.tools.tool_manager import ToolManager
from pydantic import BaseModel
from pynostr.event import Event
from agentstr.logger import get_logger
from agentstr.nostr_client import NostrClient
logger = get_logger(__name__)
[docs]
def stringify_result(result: Any) -> str:
"""Convert a result to a string."""
logger.debug(f"Stringifying result: {result}")
if isinstance(result, dict) or isinstance(result, list):
logger.debug("Result is dict or list")
return json.dumps(result)
elif isinstance(result, BaseModel):
logger.debug("Result is BaseModel")
return result.model_dump_json()
else:
logger.debug(f"Result is other type ({type(result)}): {result}")
return str(result)
[docs]
class NostrMCPServer:
"""Model Context Protocol (MCP) server running on the Nostr protocol.
Registers and manages tools that can be called by clients via direct messages,
with optional payment requirements handled through NWC.
"""
[docs]
def __init__(self, display_name: str, nostr_client: NostrClient | None = None,
relays: list[str] | None = None, private_key: str | None = None, nwc_str: str | None = None,
tools: list[Callable[..., Any]] = []):
"""Initialize the MCP server.
Args:
display_name: Display name of the server.
nostr_client: Existing NostrClient instance (optional).
relays: List of Nostr relay URLs (if no client provided).
private_key: Nostr private key (if no client provided).
nwc_str: Nostr Wallet Connect string for payments (optional).
"""
self.client = nostr_client or NostrClient(relays=relays, private_key=private_key, nwc_str=nwc_str)
self.display_name = display_name
self.tool_to_sats_map = {}
self.tool_manager = ToolManager()
for tool in tools:
self.add_tool(tool)
async def _direct_message_callback(self, event: Event, message: str):
"""Handle incoming direct messages to process tool calls or list requests.
Args:
event: The Nostr event containing the message.
message: The message content.
"""
message = message.strip()
logger.debug(f"Request: {message}")
tasks = []
try:
request = json.loads(message)
if request["action"] == "list_tools":
response = await self.list_tools()
elif request["action"] == "call_tool":
tool_name = request["tool_name"]
arguments = request["arguments"]
satoshis = self.tool_to_sats_map.get(tool_name, 0)
if satoshis > 0:
invoice = await self.client.nwc_relay.make_invoice(amount=satoshis, description=f"Payment for {tool_name} tool")
response = invoice
async def on_success():
logger.info(f"Payment succeeded for {tool_name}")
result = await self.call_tool(tool_name, arguments)
response = {"content": [{"type": "text", "text": result}]}
logger.debug(f"On success response: {response}")
await self.client.send_direct_message(event.pubkey, json.dumps(response))
async def on_failure():
response = {"error": f"Payment failed for {tool_name}"}
logger.error(f"On failure response: {response}")
await self.client.send_direct_message(event.pubkey, json.dumps(response))
# Run in background
tasks.append(asyncio.create_task(
self.client.nwc_relay.on_payment_success(
invoice=invoice,
callback=on_success,
unsuccess_callback=on_failure,
timeout=300,
),
))
else:
result = await self.call_tool(tool_name, arguments)
response = {"content": [{"type": "text", "text": str(result)}]}
else:
response = {"error": f"Invalid action: {request['action']}"}
except Exception as e:
response = {"content": [{"type": "text", "text": str(e)}]}
if not isinstance(response, str):
response = json.dumps(response)
logger.debug(f"MCP Server response: {response}")
tasks.append(self.client.send_direct_message(event.pubkey, response))
await asyncio.gather(*tasks)
[docs]
async def start(self):
"""Start the MCP server, updating metadata and listening for direct messages."""
logger.info(f"Updating metadata for {self.client.public_key.bech32()}")
await self.client.update_metadata(
name="mcp_server",
display_name=self.display_name,
about=json.dumps(await self.list_tools()),
)
logger.info(f"Starting message listener for {self.client.public_key.bech32()}")
await self.client.direct_message_listener(callback=self._direct_message_callback)