From 7204973597595043dc8dec0a2544e650aa91285d Mon Sep 17 00:00:00 2001 From: selenaalpha77-sketch Date: Sat, 25 Apr 2026 15:09:21 +0000 Subject: [PATCH 1/2] feat: add DataSync plugin with adapter pattern Implements DataSyncPlugin following maintainer feedback on PR #75: - Adapter pattern: PostgresSync and MySQLSync classes extend DataSyncAdapter - Configurable trackingColumn per table (no hardcoded column names) - Strips public. schema prefix in SQLite table creation and beforeQuery hook - Metadata table named tmp_data_sync_metadata (tmp_ prefix convention) - Uses Durable Object alarms (not setInterval) via dataSource.rpc.setAlarm - 24 tests covering plugin lifecycle, query rewriting, sync logic, and type mapping Closes #72 --- dist/plugins.ts | 6 + plugins/data-sync/index.test.ts | 414 ++++++++++++++++++++ plugins/data-sync/index.ts | 661 ++++++++++++++++++++++++++++++++ 3 files changed, 1081 insertions(+) create mode 100644 plugins/data-sync/index.test.ts create mode 100644 plugins/data-sync/index.ts diff --git a/dist/plugins.ts b/dist/plugins.ts index 7dd252a..fa32725 100644 --- a/dist/plugins.ts +++ b/dist/plugins.ts @@ -6,3 +6,9 @@ export { ChangeDataCapturePlugin } from '../plugins/cdc' export { QueryLogPlugin } from '../plugins/query-log' export { ResendPlugin } from '../plugins/resend' export { ClerkPlugin } from '../plugins/clerk' +export { + DataSyncPlugin, + DataSyncAdapter, + PostgresSync, + MySQLSync, +} from '../plugins/data-sync' diff --git a/plugins/data-sync/index.test.ts b/plugins/data-sync/index.test.ts new file mode 100644 index 0000000..aba61d2 --- /dev/null +++ b/plugins/data-sync/index.test.ts @@ -0,0 +1,414 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { + DataSyncPlugin, + DataSyncAdapter, + PostgresSync, + MySQLSync, + ColumnInfo, + TableSyncConfig, +} from './index' +import { StarbaseApp } from '../../src/handler' +import { DataSource } from '../../src/types' + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +class MockAdapter extends DataSyncAdapter { + public connectCalled = false + public disconnectCalled = false + public columns: ColumnInfo[] = [ + { name: 'id', nativeType: 'integer', sqliteType: 'INTEGER' }, + { name: 'name', nativeType: 'varchar', sqliteType: 'TEXT' }, + { name: 'created_at', nativeType: 'timestamp', sqliteType: 'TEXT' }, + ] + public rows: Record[] = [ + { id: 1, name: 'Alice', created_at: '2024-01-01T00:00:00Z' }, + { id: 2, name: 'Bob', created_at: '2024-01-02T00:00:00Z' }, + ] + + override async connect() { + this.connectCalled = true + } + override async disconnect() { + this.disconnectCalled = true + } + override mapType(nativeType: string): string { + if (nativeType.includes('int')) return 'INTEGER' + return 'TEXT' + } + override async getColumns(_tableName: string): Promise { + return this.columns + } + override async fetchRows(_opts: { + tableName: string + trackingColumn: string + lastValue: string | null + }): Promise[]> { + return this.rows + } +} + +function makeMockDataSource(): DataSource { + return { + rpc: { + executeQuery: vi.fn().mockResolvedValue([]), + setAlarm: vi.fn().mockResolvedValue(undefined), + getAlarm: vi.fn().mockResolvedValue(null), + deleteAlarm: vi.fn().mockResolvedValue(undefined), + getStatistics: vi.fn().mockResolvedValue({}), + }, + } as unknown as DataSource +} + +function makeMockApp(dataSource: DataSource): StarbaseApp { + return { + use: vi.fn( + async (middleware: Function) => + await middleware({ get: vi.fn(() => dataSource) }, vi.fn()) + ), + post: vi.fn(), + get: vi.fn(), + } as unknown as StarbaseApp +} + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('DataSyncPlugin - constructor', () => { + it('should instantiate with required options', () => { + const adapter = new MockAdapter() + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [{ tableName: 'users', trackingColumn: 'id' }], + }) + expect(plugin).toBeInstanceOf(DataSyncPlugin) + expect(plugin.name).toBe('starbasedb:data-sync') + }) + + it('should use default syncIntervalMs when not specified', () => { + const adapter = new MockAdapter() + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [], + }) + // Default is 5 minutes + expect((plugin as any).syncIntervalMs).toBe(5 * 60 * 1000) + }) + + it('should use provided syncIntervalMs', () => { + const adapter = new MockAdapter() + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [], + syncIntervalMs: 60_000, + }) + expect((plugin as any).syncIntervalMs).toBe(60_000) + }) +}) + +describe('DataSyncPlugin - register()', () => { + it('should create metadata table on register', async () => { + const adapter = new MockAdapter() + const dataSource = makeMockDataSource() + const app = makeMockApp(dataSource) + + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [], + }) + + await plugin.register(app) + + // Middleware is invoked by the mock app.use(); wait a tick for async ops + await new Promise((r) => setTimeout(r, 0)) + + expect(dataSource.rpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('tmp_data_sync_metadata'), + }) + ) + }) + + it('should schedule a DO alarm via scheduleNextAlarm()', async () => { + const adapter = new MockAdapter() + const dataSource = makeMockDataSource() + + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [], + syncIntervalMs: 10_000, + }) + // Set dataSource directly and call the private method + ;(plugin as any).dataSource = dataSource + await (plugin as any).scheduleNextAlarm() + + expect(dataSource.rpc.setAlarm).toHaveBeenCalledTimes(1) + const calledWith = (dataSource.rpc.setAlarm as any).mock.calls[0][0] + expect(calledWith).toBeGreaterThan(Date.now()) + }) +}) + +describe('DataSyncPlugin - beforeQuery() schema rewriting', () => { + let plugin: DataSyncPlugin + + beforeEach(() => { + plugin = new DataSyncPlugin({ + source: new MockAdapter(), + tables: [], + }) + }) + + it('strips "public." prefix from table references', async () => { + const result = await plugin.beforeQuery({ + sql: 'SELECT * FROM public.users', + }) + expect(result.sql).toBe('SELECT * FROM users') + }) + + it('strips "public." from multiple references in one query', async () => { + const result = await plugin.beforeQuery({ + sql: 'SELECT u.id, o.total FROM public.users u JOIN public.orders o ON u.id = o.user_id', + }) + expect(result.sql).toBe( + 'SELECT u.id, o.total FROM users u JOIN orders o ON u.id = o.user_id' + ) + }) + + it('is case-insensitive for PUBLIC. prefix', async () => { + const result = await plugin.beforeQuery({ + sql: 'SELECT * FROM PUBLIC.users', + }) + expect(result.sql).toBe('SELECT * FROM users') + }) + + it('leaves non-public schema prefixes untouched', async () => { + const result = await plugin.beforeQuery({ + sql: 'SELECT * FROM myschema.events', + }) + expect(result.sql).toBe('SELECT * FROM myschema.events') + }) + + it('passes params through unchanged', async () => { + const params = [42, 'test'] + const result = await plugin.beforeQuery({ + sql: 'SELECT * FROM public.users WHERE id = ?', + params, + }) + expect(result.params).toEqual(params) + }) +}) + +describe('DataSyncPlugin - afterQuery()', () => { + it('returns the original result unchanged', async () => { + const plugin = new DataSyncPlugin({ + source: new MockAdapter(), + tables: [], + }) + const result = [{ id: 1 }] + const out = await plugin.afterQuery({ + sql: 'SELECT 1', + result, + isRaw: false, + }) + expect(out).toBe(result) + }) +}) + +describe('DataSyncPlugin - runSync()', () => { + it('connects to adapter, syncs tables, then disconnects', async () => { + const adapter = new MockAdapter() + const dataSource = makeMockDataSource() + + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [{ tableName: 'public.users', trackingColumn: 'id' }], + }) + ;(plugin as any).dataSource = dataSource + + await plugin.runSync() + + expect(adapter.connectCalled).toBe(true) + expect(adapter.disconnectCalled).toBe(true) + }) + + it('creates local table and upserts rows', async () => { + const adapter = new MockAdapter() + const dataSource = makeMockDataSource() + + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [{ tableName: 'public.users', trackingColumn: 'id' }], + }) + ;(plugin as any).dataSource = dataSource + + await plugin.runSync() + + const calls = (dataSource.rpc.executeQuery as any).mock.calls.map( + (c: any) => c[0].sql as string + ) + + // Should include CREATE TABLE for "users" (public. stripped) + expect( + calls.some((s: string) => + s.includes('CREATE TABLE IF NOT EXISTS "users"') + ) + ).toBe(true) + + // Should include INSERT OR REPLACE + expect( + calls.some((s: string) => + s.includes('INSERT OR REPLACE INTO "users"') + ) + ).toBe(true) + + // Should update metadata + expect( + calls.some((s: string) => s.includes('tmp_data_sync_metadata')) + ).toBe(true) + }) + + it('schedules next alarm after sync', async () => { + const adapter = new MockAdapter() + const dataSource = makeMockDataSource() + + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [], + syncIntervalMs: 30_000, + }) + ;(plugin as any).dataSource = dataSource + + await plugin.runSync() + + expect(dataSource.rpc.setAlarm).toHaveBeenCalled() + }) +}) + +describe('DataSyncPlugin - stripSchemaPrefix (private)', () => { + it('strips public. from table name', () => { + const plugin = new DataSyncPlugin({ + source: new MockAdapter(), + tables: [], + }) + expect((plugin as any).stripSchemaPrefix('public.users')).toBe('users') + expect((plugin as any).stripSchemaPrefix('PUBLIC.users')).toBe('users') + expect((plugin as any).stripSchemaPrefix('users')).toBe('users') + expect((plugin as any).stripSchemaPrefix('myschema.events')).toBe( + 'myschema.events' + ) + }) +}) + +describe('PostgresSync - mapType()', () => { + it('maps integer types to INTEGER', () => { + const adapter = new PostgresSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('integer')).toBe('INTEGER') + expect(adapter.mapType('bigint')).toBe('INTEGER') + expect(adapter.mapType('boolean')).toBe('INTEGER') + expect(adapter.mapType('bool')).toBe('INTEGER') + }) + + it('maps floating-point types to REAL', () => { + const adapter = new PostgresSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('float')).toBe('REAL') + expect(adapter.mapType('numeric')).toBe('REAL') + expect(adapter.mapType('decimal')).toBe('REAL') + }) + + it('maps bytea to BLOB', () => { + const adapter = new PostgresSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('bytea')).toBe('BLOB') + }) + + it('maps text types to TEXT', () => { + const adapter = new PostgresSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('varchar')).toBe('TEXT') + expect(adapter.mapType('text')).toBe('TEXT') + expect(adapter.mapType('timestamp')).toBe('TEXT') + }) +}) + +describe('MySQLSync - mapType()', () => { + it('maps int types to INTEGER', () => { + const adapter = new MySQLSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('int')).toBe('INTEGER') + expect(adapter.mapType('bigint')).toBe('INTEGER') + expect(adapter.mapType('tinyint')).toBe('INTEGER') + }) + + it('maps float types to REAL', () => { + const adapter = new MySQLSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('float')).toBe('REAL') + expect(adapter.mapType('decimal')).toBe('REAL') + }) + + it('maps blob types to BLOB', () => { + const adapter = new MySQLSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('blob')).toBe('BLOB') + expect(adapter.mapType('binary')).toBe('BLOB') + }) + + it('maps varchar to TEXT', () => { + const adapter = new MySQLSync({ + host: 'localhost', + user: 'u', + password: 'p', + database: 'db', + }) + expect(adapter.mapType('varchar')).toBe('TEXT') + expect(adapter.mapType('text')).toBe('TEXT') + }) +}) + +describe('DataSyncAdapter - configurable trackingColumn', () => { + it('uses the user-specified tracking column, not a hardcoded one', async () => { + const adapter = new MockAdapter() + const fetchRowsSpy = vi.spyOn(adapter, 'fetchRows') + + const dataSource = makeMockDataSource() + const plugin = new DataSyncPlugin({ + source: adapter, + tables: [{ tableName: 'events', trackingColumn: 'event_time' }], + }) + ;(plugin as any).dataSource = dataSource + + await plugin.runSync() + + expect(fetchRowsSpy).toHaveBeenCalledWith( + expect.objectContaining({ trackingColumn: 'event_time' }) + ) + }) +}) diff --git a/plugins/data-sync/index.ts b/plugins/data-sync/index.ts new file mode 100644 index 0000000..9ff9430 --- /dev/null +++ b/plugins/data-sync/index.ts @@ -0,0 +1,661 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource } from '../../src/types' + +// ─── SQL DDL / DML ──────────────────────────────────────────────────────────── + +const SQL_QUERIES = { + CREATE_METADATA_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_data_sync_metadata ( + table_name TEXT NOT NULL PRIMARY KEY, + tracking_col TEXT NOT NULL, + last_value TEXT, + synced_at TEXT DEFAULT (datetime('now')) + ) + `, + GET_METADATA: ` + SELECT table_name, tracking_col, last_value + FROM tmp_data_sync_metadata + WHERE table_name = ? + `, + UPSERT_METADATA: ` + INSERT INTO tmp_data_sync_metadata (table_name, tracking_col, last_value, synced_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(table_name) DO UPDATE SET + last_value = excluded.last_value, + synced_at = excluded.synced_at + `, +} + +// ─── Public Configuration Types ─────────────────────────────────────────────── + +/** + * Per-table sync configuration. + * + * @example + * { tableName: 'public.users', trackingColumn: 'id' } + */ +export interface TableSyncConfig { + /** + * Fully-qualified or plain table name in the external source. + * Schema prefix (e.g. `public.`) is stripped when creating the + * equivalent SQLite table. + */ + tableName: string + + /** + * Column used as an append-only cursor. New rows are fetched + * where `trackingColumn > lastValue`. Choose a monotonically + * increasing column such as `id` or `created_at`. + */ + trackingColumn: string +} + +export interface DataSyncPluginOptions { + /** Database-specific adapter that handles schema introspection and row fetching. */ + source: DataSyncAdapter + + /** + * Tables to synchronise, each with an explicit tracking column so + * no column name is ever hardcoded. + */ + tables: TableSyncConfig[] + + /** + * How often to schedule the next sync alarm (milliseconds). + * Defaults to 5 minutes. + */ + syncIntervalMs?: number +} + +// ─── Adapter Interface ──────────────────────────────────────────────────────── + +/** + * Column descriptor returned by an adapter's `getColumns()` method. + */ +export interface ColumnInfo { + name: string + /** Database-native type string, e.g. `"varchar"`, `"int4"`, `"datetime"`. */ + nativeType: string + /** Mapped SQLite affinity: TEXT | INTEGER | REAL | BLOB | NUMERIC */ + sqliteType: string +} + +/** + * Base class for database-specific adapters. Subclass this and override + * `getColumns()`, `fetchRows()`, and `mapType()` for each database engine. + */ +export abstract class DataSyncAdapter { + /** + * Return column metadata for the given table. + * `tableName` is the *stripped* (no schema prefix) name. + */ + abstract getColumns(tableName: string): Promise + + /** + * Fetch rows from the external source where `trackingColumn > lastValue`. + * Return an ordered array of plain objects. + */ + abstract fetchRows(opts: { + tableName: string + trackingColumn: string + lastValue: string | null + limit?: number + }): Promise[]> + + /** + * Map a native database type to a SQLite storage class. + * Override in subclasses for engine-specific type systems. + */ + abstract mapType(nativeType: string): string + + /** Optional: called once to establish/verify the connection. */ + async connect(): Promise {} + + /** Optional: called once to release the connection. */ + async disconnect(): Promise {} +} + +// ─── PostgresSync Adapter ───────────────────────────────────────────────────── + +export interface PostgresSyncOptions { + host: string + port?: number + user: string + password: string + database: string + ssl?: boolean +} + +/** + * Adapter that syncs data from a PostgreSQL source. + * + * Uses the `pg` driver which is already a project dependency. + */ +export class PostgresSync extends DataSyncAdapter { + private opts: PostgresSyncOptions + private client: any | null = null + + constructor(opts: PostgresSyncOptions) { + super() + this.opts = opts + } + + override async connect(): Promise { + // Dynamic import so the module is only loaded when the adapter is used + const { Client } = await import('pg') + this.client = new Client({ + host: this.opts.host, + port: this.opts.port ?? 5432, + user: this.opts.user, + password: this.opts.password, + database: this.opts.database, + ssl: this.opts.ssl ? { rejectUnauthorized: false } : undefined, + }) + await this.client.connect() + } + + override async disconnect(): Promise { + if (this.client) { + await this.client.end() + this.client = null + } + } + + override mapType(nativeType: string): string { + const t = nativeType.toLowerCase() + if ( + t.includes('int') || + t === 'serial' || + t === 'bigserial' || + t === 'smallserial' || + t === 'boolean' || + t === 'bool' + ) { + return 'INTEGER' + } + if ( + t.includes('float') || + t.includes('double') || + t.includes('real') || + t.includes('numeric') || + t.includes('decimal') || + t === 'money' + ) { + return 'REAL' + } + if (t === 'bytea') { + return 'BLOB' + } + return 'TEXT' + } + + override async getColumns(tableName: string): Promise { + if (!this.client) await this.connect() + + // Support schema-qualified names in the external source + let schema = 'public' + let table = tableName + if (tableName.includes('.')) { + const parts = tableName.split('.') + schema = parts[0] + table = parts[1] + } + + const result = await this.client.query( + `SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = $1 + AND table_name = $2 + ORDER BY ordinal_position`, + [schema, table] + ) + + return result.rows.map( + (row: { column_name: string; data_type: string }) => ({ + name: row.column_name, + nativeType: row.data_type, + sqliteType: this.mapType(row.data_type), + }) + ) + } + + override async fetchRows(opts: { + tableName: string + trackingColumn: string + lastValue: string | null + limit?: number + }): Promise[]> { + if (!this.client) await this.connect() + + const { tableName, trackingColumn, lastValue, limit = 1000 } = opts + + // Use the original (possibly schema-qualified) name for the external query + let query: string + let params: unknown[] + + if (lastValue !== null && lastValue !== undefined) { + query = `SELECT * FROM "${tableName}" WHERE "${trackingColumn}" > $1 ORDER BY "${trackingColumn}" ASC LIMIT $2` + params = [lastValue, limit] + } else { + query = `SELECT * FROM "${tableName}" ORDER BY "${trackingColumn}" ASC LIMIT $1` + params = [limit] + } + + const result = await this.client.query(query, params) + return result.rows + } +} + +// ─── MySQLSync Adapter ──────────────────────────────────────────────────────── + +export interface MySQLSyncOptions { + host: string + port?: number + user: string + password: string + database: string + ssl?: boolean +} + +/** + * Adapter that syncs data from a MySQL / MariaDB source. + * + * Uses the `mysql2` driver which is already a project dependency. + */ +export class MySQLSync extends DataSyncAdapter { + private opts: MySQLSyncOptions + private connection: any | null = null + + constructor(opts: MySQLSyncOptions) { + super() + this.opts = opts + } + + override async connect(): Promise { + const mysql = await import('mysql2/promise') + this.connection = await mysql.createConnection({ + host: this.opts.host, + port: this.opts.port ?? 3306, + user: this.opts.user, + password: this.opts.password, + database: this.opts.database, + ssl: this.opts.ssl ? {} : undefined, + }) + } + + override async disconnect(): Promise { + if (this.connection) { + await this.connection.end() + this.connection = null + } + } + + override mapType(nativeType: string): string { + const t = nativeType.toLowerCase() + if ( + t.includes('int') || + t === 'tinyint' || + t === 'smallint' || + t === 'mediumint' || + t === 'bigint' || + t === 'bit' || + t === 'boolean' || + t === 'bool' + ) { + return 'INTEGER' + } + if ( + t === 'float' || + t === 'double' || + t.includes('decimal') || + t.includes('numeric') || + t === 'real' + ) { + return 'REAL' + } + if (t === 'blob' || t.includes('binary') || t === 'varbinary') { + return 'BLOB' + } + return 'TEXT' + } + + override async getColumns(tableName: string): Promise { + if (!this.connection) await this.connect() + + // MySQL uses backtick-quoted names; strip schema if provided + let schema = this.opts.database + let table = tableName + if (tableName.includes('.')) { + const parts = tableName.split('.') + schema = parts[0] + table = parts[1] + } + + const [rows] = await this.connection.execute( + `SELECT COLUMN_NAME as column_name, DATA_TYPE as data_type + FROM information_schema.COLUMNS + WHERE TABLE_SCHEMA = ? + AND TABLE_NAME = ? + ORDER BY ORDINAL_POSITION`, + [schema, table] + ) + + return (rows as Array<{ column_name: string; data_type: string }>).map( + (row) => ({ + name: row.column_name, + nativeType: row.data_type, + sqliteType: this.mapType(row.data_type), + }) + ) + } + + override async fetchRows(opts: { + tableName: string + trackingColumn: string + lastValue: string | null + limit?: number + }): Promise[]> { + if (!this.connection) await this.connect() + + const { tableName, trackingColumn, lastValue, limit = 1000 } = opts + + let query: string + let params: unknown[] + + if (lastValue !== null && lastValue !== undefined) { + query = `SELECT * FROM \`${tableName}\` WHERE \`${trackingColumn}\` > ? ORDER BY \`${trackingColumn}\` ASC LIMIT ?` + params = [lastValue, limit] + } else { + query = `SELECT * FROM \`${tableName}\` ORDER BY \`${trackingColumn}\` ASC LIMIT ?` + params = [limit] + } + + const [rows] = await this.connection.execute(query, params) + return rows as Record[] + } +} + +// ─── DataSyncPlugin ─────────────────────────────────────────────────────────── + +/** + * Plugin that incrementally pulls data from an external PostgreSQL or MySQL + * database into the Cloudflare Durable Object SQLite store. + * + * @example + * ```ts + * new DataSyncPlugin({ + * source: new PostgresSync({ host: 'db.example.com', user: 'app', password: 'secret', database: 'prod' }), + * tables: [ + * { tableName: 'public.users', trackingColumn: 'id' }, + * { tableName: 'public.events', trackingColumn: 'created_at' }, + * ], + * syncIntervalMs: 5 * 60 * 1000, + * }) + * ``` + */ +export class DataSyncPlugin extends StarbasePlugin { + private pluginOpts: DataSyncPluginOptions + private dataSource?: DataSource + private syncIntervalMs: number + + constructor(opts: DataSyncPluginOptions) { + super('starbasedb:data-sync', { requiresAuth: true }) + this.pluginOpts = opts + this.syncIntervalMs = opts.syncIntervalMs ?? 5 * 60 * 1000 + } + + // ── Helpers ───────────────────────────────────────────────────────────── + + /** + * Strip a PostgreSQL-style `public.` schema prefix so the local SQLite + * table uses only the bare table name. + */ + private stripSchemaPrefix(tableName: string): string { + // Strip "public." prefix (case-insensitive) + if (/^public\./i.test(tableName)) { + return tableName.slice(tableName.indexOf('.') + 1) + } + // For any other schema (e.g. "myschema.users") keep as-is so there is + // no silent data loss; callers can override via the beforeQuery hook. + return tableName + } + + // ── Plugin lifecycle ──────────────────────────────────────────────────── + + override async register(app: StarbaseApp): Promise { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + await this.init() + // Schedule the first alarm so syncs begin automatically + await this.scheduleNextAlarm() + await next() + }) + + // Manual trigger endpoint (admin-only) + app.post('/_internal/data-sync/trigger', async (c) => { + const config = c.get('config') + if (config?.role !== 'admin') { + return new Response('Unauthorized', { status: 401 }) + } + try { + await this.runSync() + return new Response(JSON.stringify({ success: true }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }) + } catch (err) { + return new Response( + JSON.stringify({ success: false, error: String(err) }), + { + status: 500, + headers: { 'Content-Type': 'application/json' }, + } + ) + } + }) + + // Status endpoint (admin-only) + app.get('/_internal/data-sync/status', async (c) => { + const config = c.get('config') + if (config?.role !== 'admin') { + return new Response('Unauthorized', { status: 401 }) + } + const status = await this.getStatus() + return new Response(JSON.stringify(status), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }) + }) + } + + // ── Initialization ────────────────────────────────────────────────────── + + private async init(): Promise { + if (!this.dataSource) return + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_METADATA_TABLE, + params: [], + }) + } + + // ── Alarm scheduling ──────────────────────────────────────────────────── + + /** + * Schedule the Durable Object alarm for the next sync cycle. + * Uses DO alarms (not setInterval) as required by Cloudflare. + */ + private async scheduleNextAlarm(): Promise { + if (!this.dataSource) return + const nextTime = Date.now() + this.syncIntervalMs + await this.dataSource.rpc.setAlarm(nextTime) + } + + // ── Sync logic ─────────────────────────────────────────────────────────── + + /** + * Run a full sync cycle: for each configured table, fetch new rows from + * the external source and upsert them into the local SQLite table. + */ + public async runSync(): Promise { + if (!this.dataSource) return + + const adapter = this.pluginOpts.source + + try { + await adapter.connect() + + for (const tableConfig of this.pluginOpts.tables) { + try { + await this.syncTable(tableConfig) + } catch (err) { + console.error( + `[DataSyncPlugin] Failed to sync table "${tableConfig.tableName}":`, + err + ) + } + } + } finally { + await adapter.disconnect() + } + + // Re-schedule next alarm after a successful (or partially-successful) run + await this.scheduleNextAlarm() + } + + private async syncTable(tableConfig: TableSyncConfig): Promise { + const { tableName, trackingColumn } = tableConfig + const localTableName = this.stripSchemaPrefix(tableName) + + // Retrieve last cursor value from metadata + const metaRows = (await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.GET_METADATA, + params: [localTableName], + })) as Array<{ + table_name: string + tracking_col: string + last_value: string | null + }> + + const lastValue = metaRows.length > 0 ? metaRows[0].last_value : null + + // Fetch new rows from the external source + const rows = await this.pluginOpts.source.fetchRows({ + tableName, + trackingColumn, + lastValue, + }) + + if (rows.length === 0) return + + // Ensure the local table exists with the correct schema + const columns = await this.pluginOpts.source.getColumns(tableName) + await this.ensureLocalTable(localTableName, columns) + + // Upsert rows into the local SQLite table + for (const row of rows) { + await this.upsertRow(localTableName, columns, row) + } + + // Update the cursor to the last row's tracking-column value + const lastRow = rows[rows.length - 1] + const newLastValue = String(lastRow[trackingColumn] ?? '') + + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_METADATA, + params: [localTableName, trackingColumn, newLastValue], + }) + } + + /** + * Create the local SQLite mirror table if it does not already exist. + */ + private async ensureLocalTable( + localTableName: string, + columns: ColumnInfo[] + ): Promise { + if (!columns.length) return + + const colDefs = columns + .map((c) => `"${c.name}" ${c.sqliteType}`) + .join(', ') + + const sql = `CREATE TABLE IF NOT EXISTS "${localTableName}" (${colDefs})` + await this.dataSource!.rpc.executeQuery({ sql, params: [] }) + } + + /** + * Insert or replace a single row into the local SQLite table. + */ + private async upsertRow( + localTableName: string, + columns: ColumnInfo[], + row: Record + ): Promise { + const colNames = columns.map((c) => `"${c.name}"`).join(', ') + const placeholders = columns.map(() => '?').join(', ') + const values = columns.map((c) => { + const v = row[c.name] + if (v === null || v === undefined) return null + if (v instanceof Date) return v.toISOString() + if (typeof v === 'object') return JSON.stringify(v) + return v + }) + + const sql = `INSERT OR REPLACE INTO "${localTableName}" (${colNames}) VALUES (${placeholders})` + await this.dataSource!.rpc.executeQuery({ sql, params: values }) + } + + // ── Status ─────────────────────────────────────────────────────────────── + + private async getStatus(): Promise> { + if (!this.dataSource) return { error: 'not initialised' } + + const rows = (await this.dataSource.rpc.executeQuery({ + sql: 'SELECT * FROM tmp_data_sync_metadata', + params: [], + })) as Array> + + const nextAlarm = await this.dataSource.rpc.getAlarm() + + return { + tables: rows, + nextAlarmMs: nextAlarm, + syncIntervalMs: this.syncIntervalMs, + } + } + + // ── Query hooks ────────────────────────────────────────────────────────── + + /** + * Rewrite SQL so that `public.` references resolve to the bare + * `
` name used in the local SQLite store. + * + * e.g. `SELECT * FROM public.users` → `SELECT * FROM users` + */ + override async beforeQuery(opts: { + sql: string + params?: unknown[] + dataSource?: DataSource + config?: StarbaseDBConfiguration + }): Promise<{ sql: string; params?: unknown[] }> { + // Replace all occurrences of `public.tablename` (with optional quotes) + // using a case-insensitive regex. + const rewritten = opts.sql.replace( + /\bpublic\."?([A-Za-z_][A-Za-z0-9_]*)"?/gi, + '$1' + ) + + return { sql: rewritten, params: opts.params } + } + + override async afterQuery(opts: { + sql: string + result: any + isRaw: boolean + dataSource?: DataSource + config?: StarbaseDBConfiguration + }): Promise { + return opts.result + } +} From 7b0bef0d3462babbd826fb055872a73637533fb7 Mon Sep 17 00:00:00 2001 From: selenaalpha77-sketch Date: Sat, 25 Apr 2026 16:44:08 +0000 Subject: [PATCH 2/2] docs: add README and meta.json to DataSync plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follows the standard plugin structure used by all other StarbaseDB plugins (cron, stripe, resend, etc.) — every plugin ships with README.md and meta.json. --- plugins/data-sync/README.md | 87 +++++++++++++++++++++++++++++++++++++ plugins/data-sync/meta.json | 22 ++++++++++ 2 files changed, 109 insertions(+) create mode 100644 plugins/data-sync/README.md create mode 100644 plugins/data-sync/meta.json diff --git a/plugins/data-sync/README.md b/plugins/data-sync/README.md new file mode 100644 index 0000000..a2bc319 --- /dev/null +++ b/plugins/data-sync/README.md @@ -0,0 +1,87 @@ +# DataSync Plugin + +Incrementally replicate rows from an external database into your StarbaseDB (SQLite) instance using an append-only cursor column. Each supported engine ships as a **DataSyncAdapter** subclass — so adding a new source means subclassing one interface, not touching the plugin core. + +## Supported Sources + +| Adapter | Import | +|---------|--------| +| PostgreSQL | `PostgresSync` | +| MySQL / MariaDB | `MySQLSync` | + +## Installation + +```typescript +import { DataSyncPlugin, PostgresSync } from '../plugins/data-sync' + +const plugins = [ + // ... other plugins + new DataSyncPlugin({ + source: new PostgresSync({ + host: 'db.example.com', + port: 5432, + user: 'readonly', + password: process.env.PG_PASSWORD!, + database: 'production', + }), + tables: [ + { tableName: 'public.users', trackingColumn: 'id' }, + { tableName: 'public.orders', trackingColumn: 'created_at' }, + ], + syncIntervalMs: 60_000, // poll every minute (default: 5 min) + }), +] satisfies StarbasePlugin[] +``` + +## Configuration Options + +### `DataSyncPluginOptions` + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `source` | `DataSyncAdapter` | — | Database adapter (PostgresSync, MySQLSync, …) | +| `tables` | `TableSyncConfig[]` | — | List of tables to replicate | +| `syncIntervalMs` | `number` | `300000` | Poll interval in milliseconds | + +### `TableSyncConfig` + +| Option | Type | Description | +|--------|------|-------------| +| `tableName` | `string` | Fully-qualified (`public.users`) or plain (`users`) table name | +| `trackingColumn` | `string` | Monotonically increasing column used as the sync cursor (`id`, `created_at`, …) | + +### PostgresSync options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `host` | `string` | — | PostgreSQL host | +| `port` | `number` | `5432` | PostgreSQL port | +| `user` | `string` | — | Database username | +| `password` | `string` | — | Database password | +| `database` | `string` | — | Database name | +| `ssl` | `boolean` | `false` | Enable SSL/TLS | + +### MySQLSync options + +Same as PostgresSync but defaults `port` to `3306`. + +## How It Works + +1. **Schema discovery** — on first sync the adapter calls `getColumns()` and the plugin creates a matching SQLite table (schema prefix stripped). +2. **Cursor tracking** — progress is stored in `tmp_data_sync_metadata` so syncs resume where they left off after restarts. +3. **Incremental fetch** — only rows where `trackingColumn > lastValue` are fetched, keeping payloads small. +4. **Alarm-based scheduling** — Cloudflare Durable Object alarms reschedule the next sync automatically. + +## Custom Adapters + +Extend `DataSyncAdapter` and implement three methods: + +```typescript +import { DataSyncAdapter, ColumnInfo } from '../plugins/data-sync' + +export class MyDBSync extends DataSyncAdapter { + async getColumns(tableName: string): Promise { /* ... */ } + async fetchRows(opts: { tableName: string; trackingColumn: string; lastValue: string | null; limit?: number }): Promise[]> { /* ... */ } + mapType(nativeType: string): string { /* ... */ } +} +``` diff --git a/plugins/data-sync/meta.json b/plugins/data-sync/meta.json new file mode 100644 index 0000000..3bd3b4d --- /dev/null +++ b/plugins/data-sync/meta.json @@ -0,0 +1,22 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_data_sync_metadata": [ + "table_name", + "tracking_col", + "last_value", + "synced_at" + ] + }, + "secrets": {}, + "variables": { + "syncIntervalMs": "How often to poll for new rows (milliseconds, default 300000)" + } + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +}