Relay

This module provides the core relay functionality for the Agentstr SDK, enabling communication between Nostr clients and agents. It handles message routing, event forwarding, and connection management to ensure reliable message delivery across the Nostr network.

pydantic model agentstr.relay.DecryptedMessage[source]

Bases: BaseModel

A decrypted message from a Nostr relay.

Show JSON schema
{
   "title": "DecryptedMessage",
   "description": "A decrypted message from a Nostr relay.",
   "type": "object",
   "properties": {
      "event": {
         "$ref": "#/$defs/Event"
      },
      "message": {
         "title": "Message",
         "type": "string"
      }
   },
   "$defs": {
      "Event": {
         "properties": {
            "content": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Content"
            },
            "pubkey": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Pubkey"
            },
            "created_at": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Created At"
            },
            "kind": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 1,
               "title": "Kind"
            },
            "tags": {
               "default": null,
               "items": {
                  "items": {
                     "type": "string"
                  },
                  "type": "array"
               },
               "title": "Tags",
               "type": "array"
            },
            "id": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Id"
            },
            "sig": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Sig"
            }
         },
         "title": "Event",
         "type": "object"
      }
   },
   "required": [
      "event",
      "message"
   ]
}

Fields:
field event: Event [Required]

The Nostr event containing the message.

field message: str [Required]

The decrypted message content.

agentstr.relay.create_subscription(filters: Filters) list[str][source]

Create a subscription for the given filters.

Parameters:

filters – The filters to apply to the subscription.

Returns:

A list containing the subscription request components.

class agentstr.relay.EventRelay(relay: str, private_key: PrivateKey | None = None, public_key: PublicKey | None = None)[source]

Bases: object

Handles communication with a single Nostr relay.

Parameters:
  • relay – WebSocket URL of the Nostr relay.

  • private_key – Private key for signing events.

  • public_key – Optional public key (derived from private_key if not provided).

__init__(relay: str, private_key: PrivateKey | None = None, public_key: PublicKey | None = None)[source]
async get_events(filters: Filters, limit: int = 10, timeout: int = 30, close_on_eose: bool = True) list[Event][source]

Fetch events matching the given filters from this relay.

Parameters:
  • filters – The filters to apply when fetching events.

  • limit – Maximum number of events to return. Defaults to 10.

  • timeout – Maximum time to wait for events in seconds. Defaults to 30.

  • close_on_eose – Whether to close the subscription after EOSE. Defaults to True.

Returns:

A list of up to limit events that match the filters, or an empty list if none found.

Note

Times out after timeout seconds if no matching events are found.

async get_event(filters: Filters, timeout: int = 30, close_on_eose: bool = True) Event | None[source]

Get a single event matching the filters or None if not found.

async send_event(event: Event)[source]

Publish an event to this relay.

decrypt_message(event: Event) DecryptedMessage | None[source]
async send_message(message: str | dict, recipient_pubkey: str, event_ref: str | None = None) Event[source]
async receive_message(author_pubkey: str, timestamp: int | None = None, timeout: int = 30) DecryptedMessage | None[source]

Wait for and return the next direct message from the specified author.

async send_receive_message(message: str | dict, recipient_pubkey: str, timeout: int = 3, event_ref: str | None = None) DecryptedMessage | None[source]
async event_listener(filters: Filters, callback: Callable[[Event], None], event_cache: ExpiringDict, lock: Lock)[source]

Continuously listen for events matching filters and call the callback for each one.

async direct_message_listener(filters: Filters, callback: Callable[[Event, str], None], event_cache: ExpiringDict, lock: Lock)[source]

Listen for direct messages and call the callback with decrypted content.