SSE
#
This document explains the payload schema of event data and how we achieve SSE.
Payload Schema
#
All the payloads are JSON serialized with following schema.
| Key |
Type |
Description |
| total_count |
unsigned int |
Total number of events for the event. |
| events |
EntityEvent[] |
Array of event data. |
EntityEvent
#
| Key |
Type |
Description |
| event_id |
string |
ID of the event formatted as “-”. e.g. 1730515412345-0 |
| data |
EntityEventData |
Data of the event. |
EntityEventData
#
| Key |
Type |
Description |
| resource_id |
string |
ID of resource. e.g. invoice ID. |
| resource_type |
EntityEventResourceType (string) |
Resource type of the event. |
| event_type |
EntityEventType (string) |
Event type of the event. |
| timestamp |
unsigned int |
Timestamp of the event. millisecs since epoch. |
| entity_id |
string |
Entity ID. |
| organization_id |
string |
Organization ID. |
| extra |
any json serializable |
Extra data for the event. Only used for self-prompting currently. |
EntityEventResourceType
#
| Type |
Description |
| INVOICE |
Change in invoices. Invoice data, invoice pages, invoice lines, and invoice data activity are included in this type. |
| DOCUMENT |
Change in documents. |
| SELF_PROMPT |
Change in self prompting progress. |
EntityEventType
#
| Type |
Description |
| CREATE |
Create event. When a resource is created in the database. |
| UPDATE |
Update event. When a resource is updated in the database. |
| DELETE |
Delete event. When a resource is “physically” deleted in the database. If it is a logical deletion, the event type will be UPDATE. e.g. Changing status of an invoice to DELETED |
SSE Design
#
We are using Redis Stream to achieve real time event notifications. Clients can specify Last-Event-Id in the header for SSE endpoint. BonsAPI immediately returns all the events from the event ID specified and continue waiting for new real time events. The EventSource API in JS manages the Last-Event-Id internally for you.
Note that we only keep upto 500 events. Old events are removed from the stream if stream is full.
- Endpoint:
GET /api/v1/entity-updates?entity_id=<entity_id>
N = Max number of events in the stream = 500
M = Time to wait for the next event in millisec. Pretty much like polling interval.
┌────────────────────────────────┐
│ Our backend │
│ (Rust, Python workers, etc.) │
└─────────────┬──────────────────┘
│
│ ① Append Event
│ XADD <organization_id>:<entity_id>:entity-updates
│ MAXLEN ~ N * data <json>
│ (e.g., {"event_type":"update","resource_id":"123", ....})
│
▼
┌──────────────────────────────────────────────────────────┐
│ Redis Streams (durable log) │
│----------------------------------------------------------│
│ Stream key: <organization_id>:<entity_id>:entity-updates │
│----------------------------------------------------------│
│ 1730515412345-0 → {"data": "{...json...}"} │
│ 1730515412346-0 → {"data": "{...json...}"} │
│ 1730515412377-0 → {"data": "{...json...}"} │
│ ... (trimmed when MAXLEN ~ N) │
└──────────────────────────────────────────────────────────┘
▲
│
│ ② Fetch events from the event ID specified and continue for real time events.
│ XREAD BLOCK M <organization_id>:<entity_id>:entity-updates <event_id>
│
┌────────────────────┴────────────────────┐
│ Actix-Web SSE Endpoint │
│ (async task per connected client) │
└────────────────────┬────────────────────┘
│
│ ③ On connect:
│ - Check "Last-Event-ID" header (fallbacks to "$", which means head of the stream)
│
▼
┌────────────────────────────────────────┐
│ Client (Browser) │
└────────────────────────────────────────┘