diff --git a/include/spock.h b/include/spock.h index 891d616b..a0fe5cf6 100644 --- a/include/spock.h +++ b/include/spock.h @@ -50,7 +50,7 @@ extern bool spock_include_ddl_repset; extern bool allow_ddl_from_functions; extern int restart_delay_default; extern int restart_delay_on_exception; -extern int spock_replay_queue_size; /* Deprecated - no longer used */ +extern int spock_replay_queue_size; extern int spock_feedback_frequency; extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; diff --git a/src/spock.c b/src/spock.c index 2aa74433..2829eb10 100644 --- a/src/spock.c +++ b/src/spock.c @@ -139,7 +139,7 @@ bool spock_include_ddl_repset = false; bool allow_ddl_from_functions = false; int restart_delay_default; int restart_delay_on_exception; -int spock_replay_queue_size; /* Deprecated - no longer used */ +int spock_replay_queue_size; int spock_feedback_frequency; bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; @@ -1150,11 +1150,10 @@ _PG_init(void) NULL); DefineCustomIntVariable("spock.exception_replay_queue_size", - "DEPRECATED: apply-worker replay queue size (no longer used)", - "This setting is deprecated and has no effect. " - "The replay queue now dynamically allocates memory as needed.", + "apply-worker replay queue size for exception", + NULL, &spock_replay_queue_size, - 4, + 16, 0, MAX_KILOBYTES / 1024, PGC_SIGHUP, diff --git a/src/spock_apply.c b/src/spock_apply.c index 46d7d96d..f319fb6b 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -124,6 +124,7 @@ static ApplyReplayEntry * apply_replay_head = NULL; static ApplyReplayEntry * apply_replay_tail = NULL; static ApplyReplayEntry * apply_replay_next = NULL; static int apply_replay_bytes = 0; +static bool apply_replay_overflow = false; /* Number of tuples inserted after which we switch to multi-insert. */ #define MIN_MULTI_INSERT_TUPLES 5 @@ -2268,10 +2269,6 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) if (!failed) { - /* - * Follow spock.exception_behavior GUC instead of restarting - * worker - */ if (exception_behaviour == TRANSDISCARD || exception_behaviour == SUB_DISABLE) RollbackAndReleaseCurrentSubTransaction(); @@ -2971,24 +2968,33 @@ apply_work(PGconn *streamConn) last_received = end_lsn; /* - * Append the entry to the end of the replay queue if we - * read it from the stream. Dynamic allocation means no - * fixed size limit - queue grows as needed. Note: - * spock_replay_queue_size is deprecated and no longer - * checked. + * Append the entry to the end of the replay queue + * if we read it from the stream but check for overflow. */ if (queue_append) { apply_replay_bytes += msg->len; - if (apply_replay_head == NULL) + /* + * spock_replay_queue_size is stored in KB (GUC_UNIT_MB + * converts the user-facing MB value to KB internally), + * so multiply by 1024 to compare against bytes. + */ + if (apply_replay_bytes < spock_replay_queue_size * 1024) { - apply_replay_head = apply_replay_tail = entry; + if (apply_replay_head == NULL) + { + apply_replay_head = apply_replay_tail = entry; + } + else + { + apply_replay_tail->next = entry; + apply_replay_tail = entry; + } } else { - apply_replay_tail->next = entry; - apply_replay_tail = entry; + apply_replay_overflow = true; } } @@ -3009,10 +3015,10 @@ apply_work(PGconn *streamConn) replication_handler(msg); - /* - * Note: No overflow handling needed - dynamic allocation - * used - */ + if (queue_append && apply_replay_overflow) + { + apply_replay_entry_free(entry); + } } else if (c == 'k') { @@ -3186,11 +3192,14 @@ apply_work(PGconn *streamConn) } /* - * Note: Replay queue overflow handling removed - dynamic allocation - * prevents overflow. We no longer kill and restart apply workers for - * queue overflow. Exception handling now follows - * spock.exception_behavior setting. + * If we ran out of queue space we also need to bail out. */ + if (apply_replay_overflow) + { + elog(LOG, "SPOCK: caught exception after replay queue overrun " + "- forcing apply worker restart"); + PG_RE_THROW(); + } /* * Reaching this point means that we are dealing with the first @@ -3940,6 +3949,7 @@ apply_replay_queue_reset(void) apply_replay_tail = NULL; apply_replay_next = NULL; apply_replay_bytes = 0; + apply_replay_overflow = false; MemoryContextReset(ApplyReplayContext); }