Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions plugins/replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Replication Plugin

Pull-based data replication plugin for StarbaseDB. Replicates data from an external data source (e.g., PostgreSQL on Supabase) into the internal Durable Object SQLite, turning the StarbaseDB instance into a close-to-edge read replica.

## Features

- **Pull-based replication**: Periodically polls external data sources for new or updated rows
- **Configurable intervals**: Set custom sync intervals per table (in seconds)
- **Append-only tracking**: Uses a cursor column (`id`, `created_at`, etc.) to efficiently fetch only new data
- **Multi-table support**: Replicate specific tables, not necessarily your entire database
- **REST API**: Configure replication via HTTP endpoints
- **Event callbacks**: Programmatically subscribe to replication events

## Configuration

### Via REST API

**Add a table to replication:**

```bash
curl -X POST https://your-endpoint/replication/config \
-H "Authorization: Bearer YOUR_ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source_table": "users",
"target_table": "users",
"cursor_column": "id",
"interval_seconds": 60
}'
```

**List replicated tables:**

```bash
curl https://your-endpoint/replication/config \
-H "Authorization: Bearer YOUR_ADMIN_TOKEN"
```

**Remove a table from replication:**

```bash
curl -X DELETE https://your-endpoint/replication/config/users \
-H "Authorization: Bearer YOUR_ADMIN_TOKEN"
```

**Trigger immediate sync:**

```bash
curl -X POST https://your-endpoint/replication/sync \
-H "Authorization: Bearer YOUR_ADMIN_TOKEN"
```

**Check replication status:**

```bash
curl https://your-endpoint/replication/status \
-H "Authorization: Bearer YOUR_ADMIN_TOKEN"
```

### Via Code (in `src/index.ts`)

```typescript
import { ReplicationPlugin } from '../plugins/replication'

const replicationPlugin = new ReplicationPlugin()

// Subscribe to replication events
replicationPlugin.onEvent(async ({ source_table, rows_synced }) => {
console.log(`Synced ${rows_synced} rows from ${source_table}`)
}, ctx)

// Add to plugins array
const plugins = [
// ...existing plugins,
replicationPlugin,
]
```

## How It Works

1. On each sync interval (driven via the Cron plugin or Durable Object alarms):
- For each configured table, the plugin queries the external data source for rows where `cursor_column > last_cursor_value`
- Fetched rows are inserted into the internal SQLite database using `INSERT OR REPLACE`
- The cursor value is updated to track the latest synced position
2. The plugin creates mirrored tables in SQLite automatically by inspecting the external schema
3. All replication state is persisted in `tmp_replication_config` and `tmp_replication_state` tables

## Requirements

- An external data source must be configured in `wrangler.toml`
- The external data source must be one of: PostgreSQL, MySQL, Turso, StarbaseDB, or Cloudflare D1
84 changes: 84 additions & 0 deletions plugins/replication/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'
import { ReplicationPlugin } from './index'

describe('ReplicationPlugin', () => {
let plugin: ReplicationPlugin

beforeEach(() => {
plugin = new ReplicationPlugin()
})

it('should have the correct plugin name', () => {
expect(plugin.name).toBe('starbasedb:replication')
})

it('should require authentication', () => {
expect(plugin.opts.requiresAuth).toBe(true)
})

it('should have the correct route prefix', () => {
expect(plugin.prefix).toBe('/replication')
})

it('should register event callbacks', () => {
const callback = vi.fn()
plugin.onEvent(callback)

// Verify the callback is registered by checking it doesn't throw
expect(() => plugin.onEvent(callback)).not.toThrow()
})

it('should throw error when syncTable is called without initialization', async () => {
await expect(plugin.syncTable('test_table')).rejects.toThrow(
'ReplicationPlugin not initialized'
)
})

it('should throw error when syncAll is called without initialization', async () => {
// syncAll calls getConfigs which requires dataSource
// Since dataSource is undefined, getConfigs returns []
// so syncAll should return empty array
const results = await plugin.syncAll()
expect(results).toEqual([])
})
})

describe('ReplicationPlugin SQL Queries', () => {
it('should define all required SQL queries', () => {
// Verify the plugin can be instantiated without errors
// indicating all internal SQL query constants are properly defined
const plugin = new ReplicationPlugin()
expect(plugin).toBeDefined()
expect(plugin.name).toBe('starbasedb:replication')
})
})

describe('ReplicationPlugin Event System', () => {
let plugin: ReplicationPlugin

beforeEach(() => {
plugin = new ReplicationPlugin()
})

it('should support multiple event callbacks', () => {
const callback1 = vi.fn()
const callback2 = vi.fn()

plugin.onEvent(callback1)
plugin.onEvent(callback2)

// Both callbacks should be registered without errors
expect(() => {
plugin.onEvent(callback1)
plugin.onEvent(callback2)
}).not.toThrow()
})

it('should support async event callbacks', () => {
const asyncCallback = async () => {
await new Promise((resolve) => setTimeout(resolve, 10))
}

expect(() => plugin.onEvent(asyncCallback)).not.toThrow()
})
})
Loading