diff --git a/include/spock_exception_handler.h b/include/spock_exception_handler.h index 7997880d..01e85d06 100644 --- a/include/spock_exception_handler.h +++ b/include/spock_exception_handler.h @@ -60,6 +60,7 @@ typedef struct SpockExceptionLog HeapTuple local_tuple; char initial_error_message[1024]; char initial_operation[16]; + XLogRecPtr failed_lsn; } SpockExceptionLog; typedef enum SpockExceptionBehaviour @@ -99,4 +100,11 @@ extern void spock_disable_subscription(SpockSubscription *sub, XLogRecPtr lsn, TimestampTz ts); +extern bool discardfile_write(const char *node_name, SpockRelation *rel, + Oid remote_origin, Oid local_origin, + const char *operation, SpockTupleData *oldtup, + SpockTupleData *newtup, + TransactionId remote_xid, + const char *ddl_sql, const char *ddl_user); + #endif /* SPOCK_EXCEPTION_HANDLER_H */ diff --git a/include/spock_worker.h b/include/spock_worker.h index f3a17d8d..36365a08 100644 --- a/include/spock_worker.h +++ b/include/spock_worker.h @@ -106,6 +106,9 @@ typedef struct SpockContext /* Manages access to SpockGroupHash */ LWLock *apply_group_master_lock; + /* DISCARDFILE: lock for concurrent append by apply workers */ + LWLock *discard_file_lock; + /* Background workers. */ int total_workers; SpockWorker workers[FLEXIBLE_ARRAY_MEMBER]; diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 9cf0088d..ef4c891b 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -98,6 +98,9 @@ CREATE TABLE spock.exception_status_detail ( REFERENCES spock.exception_status ) WITH (user_catalog_table=true); +CREATE OR REPLACE FUNCTION spock.discard_read() +RETURNS SETOF jsonb VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_discard_read'; + CREATE FUNCTION spock.apply_group_progress ( OUT dbid oid, OUT node_id oid, diff --git a/src/spock_apply.c b/src/spock_apply.c index 46d7d96d..33a2ad96 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -90,6 +90,7 @@ PGDLLEXPORT void spock_apply_main(Datum main_arg); static bool in_remote_transaction = false; static bool first_begin_at_startup = true; static XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr; +static XLogRecPtr current_record_lsn = InvalidXLogRecPtr; static RepOriginId remote_origin_id = InvalidRepOriginId; static char *remote_origin_name = NULL; static TimeOffset apply_delay = 0; @@ -435,6 +436,23 @@ begin_replication_step(void) { StartTransactionCommand(); spock_apply_heap_begin(); + + /* + * In TRANSDISCARD/SUB_DISABLE mode, set the transaction + * read-only to prevent any actual DML from being applied. + * Direct catalog writes (exception_log entries) are still + * allowed. + */ + + if (MyApplyWorker->use_try_block && + (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE)) + { + set_config_option("transaction_read_only", "on", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + } + result = true; } @@ -769,8 +787,15 @@ handle_commit(StringInfo s) * be skipped and made it unavailable when re-enabling the * subscription. Skipping such transactions should be an explicit user * action via spock.sub_alter_skiplsn. + * + * For SUB_DISABLE mode during a retry (use_try_block), do not advance + * the LSN even if the replay succeeded. This allows the transaction + * to be re-applied after the user fixes the root cause and re-enables + * the subscription. */ - if (!xact_had_exception || + if ((!xact_had_exception && + !(MyApplyWorker->use_try_block && + exception_behaviour == SUB_DISABLE)) || exception_behaviour == DISCARD || exception_behaviour == TRANSDISCARD) { @@ -792,48 +817,25 @@ handle_commit(StringInfo s) exception_behaviour == SUB_DISABLE)) { SpockExceptionLog *exception_log; - char errmsg[512]; exception_log = &exception_log_ptr[my_exception_log_index]; /* - * All operations were already rolled back in subtransactions (by - * RollbackAndReleaseCurrentSubTransaction in handle_insert/ - * update/delete). Abort the parent transaction to discard it - * entirely. - */ - AbortCurrentTransaction(); - - /* - * Start a new transaction to log the discard and update progress. - */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* - * Log this transaction as discarded to the exception_log so - * there's an audit trail. Include the original error message if - * we have it. + * In TRANSDISCARD/SUB_DISABLE mode, DML operations were never + * attempted — they were skipped and logged to the discardfile. + * The only writes in the current transaction are exception_log + * entries from log_insert_exception (with full relation/tuple + * data). Let the transaction commit normally so those entries + * are preserved. */ - snprintf(errmsg, sizeof(errmsg), - "%s at LSN %X/%X%s%s", - (exception_behaviour == TRANSDISCARD) - ? "Transaction discarded in TRANSDISCARD mode" - : "Transaction failed, subscription will be disabled", - LSN_FORMAT_ARGS(end_lsn), - exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "", - exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : ""); - - add_entry_to_exception_log(remote_origin_id, - commit_time, - remote_xid, - 0, 0, - NULL, NULL, NULL, NULL, - NULL, NULL, - exception_log->initial_operation, - errmsg); - - elog(LOG, "SPOCK %s: %s", MySubscription->name, errmsg); + elog(LOG, "SPOCK %s: %s at LSN %X/%X%s%s", + MySubscription->name, + (exception_behaviour == TRANSDISCARD) + ? "Transaction discarded in TRANSDISCARD mode" + : "Transaction failed, subscription will be disabled", + LSN_FORMAT_ARGS(end_lsn), + exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "", + exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : ""); /* * Clear the exception state so we don't enter exception handling @@ -842,31 +844,9 @@ handle_commit(StringInfo s) exception_log->commit_lsn = InvalidXLogRecPtr; exception_log->initial_error_message[0] = '\0'; MySpockWorker->restart_delay = 0; - PopActiveSnapshot(); - CommitTransactionCommand(); - - /* - * For SUB_DISABLE mode, throw an error to trigger subscription - * disable in the parent PG_CATCH block. The transaction failure - * is already logged above. - */ - if (exception_behaviour == SUB_DISABLE) - { - elog(ERROR, "SPOCK %s: disabling subscription due to exception in SUB_DISABLE mode", - MySubscription->name); - } - /* - * Switch to MessageContext before continuing. The progress - * tracking code at transdiscard_skip_commit expects - * MessageContext. - */ - MemoryContextSwitchTo(MessageContext); - - /* - * Skip the normal commit path - jump to progress tracking. - */ - goto transdiscard_skip_commit; + /* Defensive check */ + Assert(XactReadOnly); } /* Have the commit code adjust our logical clock if needed */ @@ -881,24 +861,24 @@ handle_commit(StringInfo s) MemoryContextSwitchTo(TopMemoryContext); - if (xact_had_exception) + if (exception_behaviour == SUB_DISABLE && + (xact_had_exception || MyApplyWorker->use_try_block)) { /* - * If we had exception(s) and are in SUB_DISABLE mode then the - * subscription got disabled earlier in the code path. We need to - * exit here to disconnect. + * SUB_DISABLE: after committing exception_log entries, throw + * an ERROR to trigger subscription disable in the PG_CATCH + * block. This covers both the case where DML actually failed + * (xact_had_exception) and the retry path where all DML was + * skipped but the original error was logged (use_try_block). */ - if (exception_behaviour == SUB_DISABLE) - { - SpockExceptionLog *exception_log; + SpockExceptionLog *exception_log; - exception_log = &exception_log_ptr[my_exception_log_index]; - exception_log->commit_lsn = InvalidXLogRecPtr; - MySpockWorker->restart_delay = 0; + exception_log = &exception_log_ptr[my_exception_log_index]; + exception_log->commit_lsn = InvalidXLogRecPtr; + MySpockWorker->restart_delay = 0; - elog(ERROR, "SPOCK %s: exiting because subscription disabled", - MySubscription->name); - } + elog(ERROR, "SPOCK %s: disabling subscription due to exception in SUB_DISABLE mode", + MySubscription->name); } else if (MyApplyWorker->use_try_block && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') @@ -945,7 +925,6 @@ handle_commit(StringInfo s) */ maybe_advance_forwarded_origin(end_lsn, xact_had_exception); -transdiscard_skip_commit: /* Update the entry in the progress table. */ elog(DEBUG1, "SPOCK %s: updating progress table for node_id %d" \ " and remote node id %d with remote commit ts" \ @@ -1308,35 +1287,51 @@ handle_insert(StringInfo s) /* TODO: Handle multiple inserts */ if (MyApplyWorker->use_try_block) { - PG_TRY(); - { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_insert(rel, &newtup); - } - PG_CATCH(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) + /* + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. + */ + discardfile_write(local_node->node->name, rel, remote_origin_id, + local_node->node->id, "INSERT", NULL, + &newtup, remote_xid, NULL, NULL); + + /* No DML was attempted, so clear any stale local_tuple pointer. */ + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + } + else { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_insert(rel, &newtup); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ + if (failed || + current_record_lsn == + exception_log_ptr[my_exception_log_index].failed_lsn) { char *error_msg = edata ? edata->message : (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? @@ -1477,35 +1472,52 @@ handle_update(StringInfo s) if (MyApplyWorker->use_try_block == true) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); - } - PG_CATCH(); - { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) + /* + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. + */ + discardfile_write(local_node->node->name, rel, remote_origin_id, + local_node->node->id, "UPDATE", + hasoldtup ? &oldtup : NULL, &newtup, + remote_xid, NULL, NULL); + + /* No DML was attempted, so clear any stale local_tuple pointer. */ + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + } + else { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ + if (failed || + current_record_lsn == + exception_log_ptr[my_exception_log_index].failed_lsn) { char *error_msg = edata ? edata->message : (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? @@ -1584,35 +1596,51 @@ handle_delete(StringInfo s) if (MyApplyWorker->use_try_block) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_delete(rel, &oldtup); - } - PG_CATCH(); - { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); + + /* + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. + */ + discardfile_write(local_node->node->name, rel, remote_origin_id, + local_node->node->id, "DELETE", &oldtup, + NULL, remote_xid, NULL, NULL); - if (!failed) + /* No DML was attempted, so clear any stale local_tuple pointer. */ + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + } + else { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_delete(rel, &oldtup); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ + if (failed || + current_record_lsn == + exception_log_ptr[my_exception_log_index].failed_lsn) { char *error_msg = edata ? edata->message : (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? @@ -2251,31 +2279,46 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) if (MyApplyWorker->use_try_block) { - PG_TRY(); - { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - handle_sql(queued_message, tx_just_started, &sql); - } - PG_CATCH(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) - { /* - * Follow spock.exception_behavior GUC instead of restarting - * worker + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. */ - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + sql = JsonbToCString(NULL, + &queued_message->message->root, 0); + discardfile_write(local_node->node->name, NULL, + remote_origin_id, local_node->node->id, + "SQL", NULL, NULL, remote_xid, + sql, queued_message->role); + failed = false; + } + else + { + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + handle_sql(queued_message, tx_just_started, &sql); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } @@ -3007,6 +3050,7 @@ apply_work(PGconn *streamConn) last_inserted = last_received; UpdateWorkerStats(last_received, last_inserted); + current_record_lsn = last_received; replication_handler(msg); /* @@ -3223,6 +3267,14 @@ apply_work(PGconn *streamConn) sizeof(exception_log_ptr[my_exception_log_index].initial_operation), "%s", errcallback_arg.action_name ? errcallback_arg.action_name : "UNKNOWN"); + + /* + * Remember the LSN of the record that triggered the error. + * During the retry, the DML handler will call + * log_insert_exception when this LSN is reached. + */ + exception_log_ptr[my_exception_log_index].failed_lsn = + last_received; } FlushErrorState(); @@ -3251,7 +3303,6 @@ apply_work(PGconn *streamConn) if (need_replay) { MyApplyWorker->use_try_block = true; - goto stream_replay; } diff --git a/src/spock_exception_handler.c b/src/spock_exception_handler.c index 09cd2cd0..63c205e6 100644 --- a/src/spock_exception_handler.c +++ b/src/spock_exception_handler.c @@ -46,6 +46,10 @@ #include "pgstat.h" +#include "funcapi.h" +#include "storage/fd.h" +#include "utils/json.h" + #include "spock_sync.h" #include "spock_worker.h" #include "spock_conflict.h" @@ -74,6 +78,26 @@ #define CATALOG_EXCEPTION_LOG "exception_log" +#define DISCARDFILE_DIR "pg_spock" +#define DISCARDFILE_FMT DISCARDFILE_DIR "/discard_%u.log" + +/* File format version — written as a 4-byte header, checked on read */ +#define DISCARDFILE_VERSION 1 + +/* JSON field names — keep in sync between discardfile_write and discard_read */ +#define DF_XID "xid" +#define DF_NODE "node_name" +#define DF_LOG_TIME "log_time" +#define DF_RELNAME "relname" +#define DF_LOCAL_ORIGIN "local_origin" +#define DF_REMOTE_ORIGIN "remote_origin" +#define DF_OPERATION "operation" +#define DF_OLD_TUPLE "old_tuple" +#define DF_REMOTE_TUPLE "remote_tuple" +#define DF_REMOTE_XID "remote_xid" +#define DF_DDL_SQL "ddl_statement" +#define DF_DDL_USER "ddl_user" + SpockExceptionLog *exception_log_ptr = NULL; int exception_behaviour = TRANSDISCARD; int exception_logging = LOG_ALL; @@ -269,3 +293,273 @@ spock_disable_subscription(SpockSubscription *sub, if (started_tx) CommitTransactionCommand(); } + +/* + * Get the path to the DISCARDFILE for the current database. + * The caller must provide a buffer of at least MAXPGPATH bytes. + */ +static void +discardfile_path(char *path) +{ + snprintf(path, MAXPGPATH, DISCARDFILE_FMT, MyDatabaseId); +} + +/* + * Ensure the pg_spock directory exists under PGDATA. + */ +static void +discardfile_ensure_dir(void) +{ + char dirpath[MAXPGPATH]; + + snprintf(dirpath, MAXPGPATH, "%s", DISCARDFILE_DIR); + (void) MakePGDirectory(dirpath); +} + +/* + * Append a single record to the DISCARDFILE. + * + * This function is safe to call outside a transaction — it does not access + * catalog tables. Each call writes one binary length-prefixed record: a + * 32-bit native-endian length header (the StringInfoData.len field, which is + * a signed int) followed by exactly that many bytes of JSON payload. Readers + * must parse the length header first to determine where each record ends. + * + * When rel is NULL (e.g. for DDL/SQL operations that have no target relation), + * the relname JSON field is set to an empty string and tuple serialization is + * skipped. In that case the caller may pass ddl_sql / ddl_user to record the + * SQL statement and the role that executed it. + * + * Locking: acquires SpockCtx->discard_file_lock in exclusive mode to + * serialize concurrent writes from different apply workers. + * + * Memory leaking. Being executed on a per-row basis it should be executed + * inside a short living memory context - consider multiple potential memory + * allocations inside a JSON code. + * + * Returns true on success, false if the record could not be written + * (a WARNING is emitted in that case). + */ +bool +discardfile_write(const char *node_name, SpockRelation *rel, Oid remote_origin, + Oid local_origin, const char *operation, + SpockTupleData *oldtup, SpockTupleData *newtup, + TransactionId remote_xid, + const char *ddl_sql, const char *ddl_user) +{ + char path[MAXPGPATH]; + StringInfoData buf; + char *old_json = NULL; + char *new_json = NULL; + int fd; + TupleDesc tupdesc = rel ? RelationGetDescr(rel->rel) : NULL; + + Assert(SpockCtx != NULL); + + /* Serialize tuples to JSON before taking the lock */ + if (oldtup != NULL && tupdesc != NULL) + old_json = spock_tuple_to_json_cstring(oldtup, tupdesc); + if (newtup != NULL && tupdesc != NULL) + new_json = spock_tuple_to_json_cstring(newtup, tupdesc); + + /* Build the JSON record. Field names use DF_* defines from above. */ + initStringInfo(&buf); + appendStringInfo(&buf, "{\"" DF_XID "\": %u", remote_xid); + + appendStringInfoString(&buf, ", \"" DF_NODE "\": "); + escape_json(&buf, node_name ? node_name : ""); + + appendStringInfoString(&buf, ", \"" DF_LOG_TIME "\": "); + escape_json(&buf, timestamptz_to_str(GetCurrentTimestamp())); + + appendStringInfoString(&buf, ", \"" DF_RELNAME "\": "); + if (rel != NULL) + escape_json(&buf, quote_qualified_identifier(rel->nspname, + rel->relname)); + else + escape_json(&buf, ""); + + appendStringInfo(&buf, ", \"" DF_LOCAL_ORIGIN "\": %u", local_origin); + + appendStringInfo(&buf, ", \"" DF_REMOTE_ORIGIN "\": %u", remote_origin); + + appendStringInfoString(&buf, ", \"" DF_OPERATION "\": "); + escape_json(&buf, operation ? operation : ""); + + if (old_json != NULL) + appendStringInfo(&buf, ", \"" DF_OLD_TUPLE "\": %s", old_json); + + if (new_json != NULL) + appendStringInfo(&buf, ", \"" DF_REMOTE_TUPLE "\": %s", new_json); + + if (ddl_sql != NULL) + { + appendStringInfoString(&buf, ", \"" DF_DDL_SQL "\": "); + escape_json(&buf, ddl_sql); + } + + if (ddl_user != NULL) + { + appendStringInfoString(&buf, ", \"" DF_DDL_USER "\": "); + escape_json(&buf, ddl_user); + } + + appendStringInfo(&buf, ", \"" DF_REMOTE_XID "\": %u}", remote_xid); + + /* Write under lock: [uint32 length][json data] */ + LWLockAcquire(SpockCtx->discard_file_lock, LW_EXCLUSIVE); + + discardfile_ensure_dir(); + discardfile_path(path); + + fd = OpenTransientFile(path, O_WRONLY | O_CREAT | O_APPEND | PG_BINARY); + if (fd < 0) + { + LWLockRelease(SpockCtx->discard_file_lock); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open discard file \"%s\": %m", path))); + pfree(buf.data); + return false; + } + + /* Write version header if this is a new file */ + if (lseek(fd, 0, SEEK_END) == 0) + { + uint32 version = DISCARDFILE_VERSION; + + if (write(fd, &version, sizeof(version)) != sizeof(version)) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not write discard file header \"%s\": %m", + path))); + pfree(buf.data); + return false; + } + } + + if (write(fd, &buf.len, sizeof(buf.len)) != sizeof(buf.len) || + write(fd, buf.data, buf.len) != buf.len) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not write to discard file \"%s\": %m", + path))); + pfree(buf.data); + return false; + } + + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + + return true; +} + +/* + * SQL-callable function: spock.discard_read() + * + * Returns the contents of the current database's DISCARDFILE as a set of + * single-column jsonb records. Users extract fields in SQL, e.g.: + * + * SELECT rec->>'node_name', rec->>'operation', rec->'remote_tuple' + * FROM spock.discard_read() AS rec; + * + * TODO: pass through the code scrupulously and decide on safe reading and error + * processing. Too much for a single commit, though. + */ +PG_FUNCTION_INFO_V1(spock_discard_read); +Datum +spock_discard_read(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + char path[MAXPGPATH]; + int fd; + int reclen; + + InitMaterializedSRF(fcinfo, MAT_SRF_USE_EXPECTED_DESC | MAT_SRF_BLESS); + + discardfile_path(path); + + if (SpockCtx == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("spock shared memory context is not initialized"))); + + /* + * Acquire the discard-file lock in shared mode so that concurrent + * writers (which take LW_EXCLUSIVE) cannot produce a partial record + * while we are reading. + */ + LWLockAcquire(SpockCtx->discard_file_lock, LW_SHARED); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + LWLockRelease(SpockCtx->discard_file_lock); + if (errno == ENOENT) + PG_RETURN_VOID(); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open discard file \"%s\": %m", path))); + } + + /* Validate file format version */ + { + uint32 version; + + if (read(fd, &version, sizeof(version)) != sizeof(version)) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + PG_RETURN_VOID(); /* empty file */ + } + if (version != DISCARDFILE_VERSION) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("unsupported discard file version %u (expected %u)", + version, DISCARDFILE_VERSION))); + } + } + + while (read(fd, &reclen, sizeof(reclen)) == sizeof(reclen)) + { + Datum value; + bool null = false; + char *rec; + + rec = palloc(reclen + 1); + if (read(fd, rec, reclen) != reclen) + { + pfree(rec); + /* + * In case of crash and semi-written file this option allows us to + * use all the records have written before the failed operation. + */ + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("truncated record in discard file \"%s\"", + path))); + break; + } + rec[reclen] = '\0'; + + value = DirectFunctionCall1(jsonb_in, CStringGetDatum(rec)); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, + &value, &null); + pfree(rec); + } + + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + + PG_RETURN_VOID(); +} diff --git a/src/spock_shmem.c b/src/spock_shmem.c index 389dcc00..1667e692 100644 --- a/src/spock_shmem.c +++ b/src/spock_shmem.c @@ -118,6 +118,9 @@ spock_shmem_request(void) /* For SpockCtx->lock */ RequestNamedLWLockTranche("spock context lock", 1); + + /* For SpockCtx->discard_file_lock */ + RequestNamedLWLockTranche("spock discard file lock", 1); } /* @@ -165,6 +168,10 @@ spock_shmem_startup(void) SpockCtx->total_workers = nworkers; memset(SpockCtx->workers, 0, sizeof(SpockWorker) * SpockCtx->total_workers); + + /* Initialize DISCARDFILE support */ + SpockCtx->discard_file_lock = + &((GetNamedLWLockTranche("spock discard file lock")[0]).lock); } /* Initialize worker subsystem shared memory structures */ diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 398ea9ac..f77c719b 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -459,11 +459,21 @@ ORDER BY command_counter; -----------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid -(6 rows) +(5 rows) + +-- Check discard file contents (TRANSDISCARD records go here) +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + id | node_name | relname | operation | remote_tuple +----+-----------------+------------------+-----------+---------------------------------------------------- + | test_subscriber | public.spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] +(1 row) \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -576,15 +586,24 @@ ORDER BY command_counter; -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 6 | | | UPDATE | | Spock can't find relation 7 | | | UPDATE | | Spock can't find relation - 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) -(10 rows) + 9 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) +(8 rows) + +-- Check discard file contents for UPDATE tests +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + id | node_name | relname | operation | remote_tuple +----+-----------------+------------------+-----------+---------------------------------------------------- + | test_subscriber | public.spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] +(1 row) \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE'); diff --git a/tests/regress/sql/replication_set.sql b/tests/regress/sql/replication_set.sql index b9132a73..ebde188d 100644 --- a/tests/regress/sql/replication_set.sql +++ b/tests/regress/sql/replication_set.sql @@ -214,6 +214,13 @@ SELECT FROM spock.exception_log ORDER BY command_counter; +-- Check discard file contents (TRANSDISCARD records go here) +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -268,6 +275,13 @@ SELECT FROM spock.exception_log ORDER BY command_counter; +-- Check discard file contents for UPDATE tests +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE');