Source code for agentstr.nwc_relay

import asyncio
import json
import math
import time

from bolt11.decode import decode
from pynostr.encrypted_dm import EncryptedDirectMessage
from pynostr.event import Event
from pynostr.filters import Filters
from pynostr.key import PrivateKey

from agentstr.logger import get_logger
from agentstr.relay import EventRelay

logger = get_logger(__name__)


[docs] def encrypt(privkey: str, pubkey: str, plaintext: str) -> str: """Encrypt plaintext using ECDH shared secret. Args: privkey: Sender's private key. pubkey: Recipient's public key. plaintext: The message to encrypt. Returns: The encrypted message as a string. """ dm = EncryptedDirectMessage() dm.encrypt(privkey, cleartext_content=plaintext, recipient_pubkey=pubkey) return dm.encrypted_message
[docs] def decrypt(privkey: str, pubkey: str, ciphertext: str) -> str: """Decrypt ciphertext using ECDH shared secret. Args: privkey: Recipient's private key. pubkey: Sender's public key. ciphertext: The encrypted message to decrypt. Returns: The decrypted plaintext message. """ dm = EncryptedDirectMessage() dm.decrypt(privkey, encrypted_message=ciphertext, public_key_hex=pubkey) return dm.cleartext_content
[docs] def process_nwc_string(string: str) -> dict: """Parse Nostr Wallet Connect connection string into its components. Args: string: The NWC connection string to parse. Returns: Dictionary containing connection parameters. Raises: ValueError: If the connection string is invalid. """ if (string[0:22] != "nostr+walletconnect://"): logger.error("Your pairing string was invalid, try one that starts with this: nostr+walletconnect://") return string = string[22:] arr = string.split("&") item = arr[0].split("?") del arr[0] arr.insert(0, item[0]) arr.insert(1, item[1]) arr[0] = "wallet_pubkey=" + arr[0] arr2 = [] obj = {} for item in arr: item = item.split("=") arr2.append(item[0]) arr2.append(item[1]) for index, item in enumerate(arr2): if (item == "secret"): arr2[index] = "app_privkey" for index, item in enumerate(arr2): if (index % 2): obj[arr2[index - 1]] = item obj["app_pubkey"] = PrivateKey.from_hex(obj["app_privkey"]).public_key.hex() return obj
[docs] def get_signed_event(event: dict, private_key: str): """Create and sign a Nostr event with the given private key. Args: event: The event data as a dictionary. private_key: The private key to sign the event with. Returns: A signed Nostr event. """ logger.debug(f"Signing event in nwc_relay: {json.dumps(event)}") event = Event(**event) event.sign(private_key) return event
[docs] class NWCRelay: """Client for interacting with Nostr Wallet Connect (NWC) relays. Handles encrypted communication with wallet services over the Nostr network. """
[docs] def __init__(self, nwc_connection_string: str, relay: str | None = None): """Initialize NWC client with connection string and optional relay URL. Args: nwc_connection_string: NWC connection string (starts with 'nostr+walletconnect://') relay: Optional relay URL override """ logger.info(f"Initializing NWCRelay with connection string: {nwc_connection_string[:10]}...") try: self.nwc_info = process_nwc_string(nwc_connection_string) if relay is None: relay = self.nwc_info["relay"] logger.debug(f"Using relay: {relay}") self.event_relay = EventRelay(relay, private_key=PrivateKey.from_hex(self.nwc_info["app_privkey"])) logger.info("NWCRelay initialized successfully") except Exception as e: logger.critical(f"Failed to initialize NWCRelay: {e!s}", exc_info=True) raise
[docs] async def get_response(self, event_id: str) -> Event | None: """Get response for a specific event ID.""" filters = Filters( event_refs=[event_id], pubkey_refs=[self.nwc_info["app_pubkey"]], kinds=[23195], limit=1, ) for _ in range(10): event = await self.event_relay.get_event(filters=filters, timeout=5, close_on_eose=True) if event: return event await asyncio.sleep(0) return None
[docs] async def make_invoice(self, amount: int, description: str) -> Event | None: """Generate a new payment request. Returns: Dictionary containing invoice details """ msg = json.dumps({ "method": "make_invoice", "params": { "amount": amount * 1000, "description": description, } if amount else { "description": description, }, }) emsg = encrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], msg) obj = { "kind": 23194, "content": emsg, "tags": [["p", self.nwc_info["wallet_pubkey"]]], "created_at": math.floor(time.time()), "pubkey": self.nwc_info["app_pubkey"], } logger.debug(f"Sending invoice request: {json.dumps(obj)}") event = get_signed_event(obj, self.nwc_info["app_privkey"]) await self.event_relay.send_event(event) response = await self.get_response(event.id) if response is None: logger.error("Failed to receive invoice response") return None ersp = response.content drsp = decrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], ersp) dobj = json.loads(drsp) logger.debug(f"Received invoice response: {json.dumps(dobj)}") return dobj["result"]["invoice"]
[docs] async def check_invoice(self, invoice: str | None = None, payment_hash: str | None = None) -> dict | None: """Check the status of an invoice by its payment hash or invoice string.""" if invoice is None and payment_hash is None: raise ValueError("Either 'invoice' or 'payment_hash' must be provided") params = {} if invoice is not None: params["invoice"] = invoice if payment_hash is not None: params["payment_hash"] = payment_hash msg = json.dumps({ "method": "lookup_invoice", "params": params, }) emsg = encrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], msg) obj = { "kind": 23194, "content": emsg, "tags": [["p", self.nwc_info["wallet_pubkey"]]], "created_at": math.floor(time.time()), "pubkey": self.nwc_info["app_pubkey"], } event = get_signed_event(obj, self.nwc_info["app_privkey"]) eid = event.id await self.event_relay.send_event(event) response = await self.get_response(eid) if response is None: return None ersp = response.content drsp = decrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], ersp) dobj = json.loads(drsp) return dobj
[docs] async def did_payment_succeed(self, invoice: str) -> bool: """Check if a payment was successful. Returns: True if payment was successful, False otherwise """ invoice_info = await self.check_invoice(invoice=invoice) if (invoice_info and "error" not in invoice_info and ("result" in invoice_info) and ( "preimage" in invoice_info["result"])): return invoice_info.get("result", {}).get("settled_at") or 0 > 0 return False
[docs] async def try_pay_invoice(self, invoice: str, amount: int | None = None) -> dict | None: """Attempt to pay a BOLT11 invoice. Returns: Dictionary with payment status and details """ decoded = decode(invoice) if decoded.amount_msat and amount: if decoded.amount_msat != amount * 1000: # convert to msats raise RuntimeError(f"Amount in invoice [{decoded.amount_msat}] does not match amount provided [{amount}]") elif not decoded.amount_msat and not amount: raise RuntimeError("No amount provided in invoice and no amount provided to pay") msg = { "method": "pay_invoice", "params": { "invoice": invoice, }, } if amount: msg["params"]["amount"] = amount * 1000 msg = json.dumps(msg) emsg = encrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], msg) obj = { "kind": 23194, "content": emsg, "tags": [["p", self.nwc_info["wallet_pubkey"]]], "created_at": math.floor(time.time()), "pubkey": self.nwc_info["app_pubkey"], } event = get_signed_event(obj, self.nwc_info["app_privkey"]) await self.event_relay.send_event(event)
[docs] async def get_info(self) -> dict: """Get wallet service information and capabilities.""" msg = { "method": "get_info", } msg = json.dumps(msg) emsg = encrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], msg) obj = { "kind": 23194, "content": emsg, "tags": [["p", self.nwc_info["wallet_pubkey"]]], "created_at": math.floor(time.time()), "pubkey": self.nwc_info["app_pubkey"], } event = get_signed_event(obj, self.nwc_info["app_privkey"]) await self.event_relay.send_event(event) response = await self.get_response(event.id) if response is None: return None ersp = response.content drsp = decrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], ersp) dobj = json.loads(drsp) return dobj
[docs] async def list_transactions(self, params: dict | None = None) -> list[dict]: """List recent transactions matching the given parameters.""" if params is None: params = {} msg = { "method": "list_transactions", "params": params, } msg = json.dumps(msg) emsg = encrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], msg) obj = { "kind": 23194, "content": emsg, "tags": [["p", self.nwc_info["wallet_pubkey"]]], "created_at": math.floor(time.time()), "pubkey": self.nwc_info["app_pubkey"], } event = get_signed_event(obj, self.nwc_info["app_privkey"]) await self.event_relay.send_event(event) response = await self.get_response(event.id) if response is None: return None ersp = response.content drsp = decrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], ersp) dobj = json.loads(drsp) return dobj.get("result", {}).get("transactions", [])
[docs] async def get_balance(self) -> int | None: """Get current wallet balance.""" msg = { "method": "get_balance", } msg = json.dumps(msg) emsg = encrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], msg) obj = { "kind": 23194, "content": emsg, "tags": [["p", self.nwc_info["wallet_pubkey"]]], "created_at": math.floor(time.time()), "pubkey": self.nwc_info["app_pubkey"], } event = get_signed_event(obj, self.nwc_info["app_privkey"]) await self.event_relay.send_event(event) response = await self.get_response(event.id) if response is None: return None ersp = response.content drsp = decrypt(self.nwc_info["app_privkey"], self.nwc_info["wallet_pubkey"], ersp) dobj = json.loads(drsp) return dobj.get("result", {}).get("balance")
[docs] async def on_payment_success(self, invoice: str, callback=None, unsuccess_callback=None, timeout: int = 300, interval: int = 2): """Listen for payment success for a given invoice. This method continuously checks for payment success until either the payment is confirmed or the timeout is reached. Args: invoice (str): The BOLT11 invoice string to listen for. callback (callable, optional): A function to call when payment succeeds. unsuccess_callback (callable, optional): A function to call if payment fails. timeout (int, optional): Maximum time to wait in seconds (default: 300). interval (int, optional): Time between checks in seconds (default: 2). Raises: Exception: If the callback function raises an exception. """ start_time = time.time() success = False while True: if await self.did_payment_succeed(invoice): success = True if callback: try: await callback() except Exception as e: logger.error(f"Error in callback: {e}", exc_info=True) raise e break if time.time() - start_time > timeout: break await asyncio.sleep(interval) if not success: if unsuccess_callback: await unsuccess_callback()