Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/spock.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ extern bool spock_include_ddl_repset;
extern bool allow_ddl_from_functions;
extern int restart_delay_default;
extern int restart_delay_on_exception;
extern int spock_replay_queue_size; /* Deprecated - no longer used */
extern int spock_replay_queue_size;
extern int spock_feedback_frequency;
extern bool check_all_uc_indexes;
extern bool spock_enable_quiet_mode;
Expand Down
9 changes: 4 additions & 5 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ bool spock_include_ddl_repset = false;
bool allow_ddl_from_functions = false;
int restart_delay_default;
int restart_delay_on_exception;
int spock_replay_queue_size; /* Deprecated - no longer used */
int spock_replay_queue_size;
int spock_feedback_frequency;
bool check_all_uc_indexes = false;
bool spock_enable_quiet_mode = false;
Expand Down Expand Up @@ -1150,11 +1150,10 @@ _PG_init(void)
NULL);

DefineCustomIntVariable("spock.exception_replay_queue_size",
"DEPRECATED: apply-worker replay queue size (no longer used)",
"This setting is deprecated and has no effect. "
"The replay queue now dynamically allocates memory as needed.",
"apply-worker replay queue size for exception",
NULL,
&spock_replay_queue_size,
4,
16,
0,
MAX_KILOBYTES / 1024,
PGC_SIGHUP,
Expand Down
52 changes: 31 additions & 21 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ static ApplyReplayEntry * apply_replay_head = NULL;
static ApplyReplayEntry * apply_replay_tail = NULL;
static ApplyReplayEntry * apply_replay_next = NULL;
static int apply_replay_bytes = 0;
static bool apply_replay_overflow = false;

/* Number of tuples inserted after which we switch to multi-insert. */
#define MIN_MULTI_INSERT_TUPLES 5
Expand Down Expand Up @@ -2268,10 +2269,6 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started)

if (!failed)
{
/*
* Follow spock.exception_behavior GUC instead of restarting
* worker
*/
if (exception_behaviour == TRANSDISCARD ||
exception_behaviour == SUB_DISABLE)
RollbackAndReleaseCurrentSubTransaction();
Expand Down Expand Up @@ -2971,24 +2968,33 @@ apply_work(PGconn *streamConn)
last_received = end_lsn;

/*
* Append the entry to the end of the replay queue if we
* read it from the stream. Dynamic allocation means no
* fixed size limit - queue grows as needed. Note:
* spock_replay_queue_size is deprecated and no longer
* checked.
* Append the entry to the end of the replay queue
* if we read it from the stream but check for overflow.
*/
if (queue_append)
{
apply_replay_bytes += msg->len;

if (apply_replay_head == NULL)
/*
* spock_replay_queue_size is stored in KB (GUC_UNIT_MB
* converts the user-facing MB value to KB internally),
* so multiply by 1024 to compare against bytes.
*/
if (apply_replay_bytes < spock_replay_queue_size * 1024)
{
apply_replay_head = apply_replay_tail = entry;
if (apply_replay_head == NULL)
{
apply_replay_head = apply_replay_tail = entry;
}
else
{
apply_replay_tail->next = entry;
apply_replay_tail = entry;
}
}
else
{
apply_replay_tail->next = entry;
apply_replay_tail = entry;
apply_replay_overflow = true;
}
}

Expand All @@ -3009,10 +3015,10 @@ apply_work(PGconn *streamConn)

replication_handler(msg);

/*
* Note: No overflow handling needed - dynamic allocation
* used
*/
if (queue_append && apply_replay_overflow)
{
apply_replay_entry_free(entry);
}
}
else if (c == 'k')
{
Expand Down Expand Up @@ -3186,11 +3192,14 @@ apply_work(PGconn *streamConn)
}

/*
* Note: Replay queue overflow handling removed - dynamic allocation
* prevents overflow. We no longer kill and restart apply workers for
* queue overflow. Exception handling now follows
* spock.exception_behavior setting.
* If we ran out of queue space we also need to bail out.
*/
if (apply_replay_overflow)
{
elog(LOG, "SPOCK: caught exception after replay queue overrun "
"- forcing apply worker restart");
PG_RE_THROW();
}

/*
* Reaching this point means that we are dealing with the first
Expand Down Expand Up @@ -3940,6 +3949,7 @@ apply_replay_queue_reset(void)
apply_replay_tail = NULL;
apply_replay_next = NULL;
apply_replay_bytes = 0;
apply_replay_overflow = false;

MemoryContextReset(ApplyReplayContext);
}
Expand Down
Loading