Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/spockbench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
echo "TAP_CT_NAME=$TAP_CT_NAME" >> "$GITHUB_ENV"
docker run --name "$TAP_CT_NAME" -e PGVER=${{ matrix.pgver }} --workdir=/home/pgedge/spock/tests/tap \
spock /home/pgedge/spock/tests/tap/run_tests.sh
timeout-minutes: 45

- name: Collect TAP artifacts (from container)
if: ${{ always() }}
Expand Down
63 changes: 63 additions & 0 deletions .github/workflows/zodan_sync.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Keep its logic in sync with spockbench.yml!

name: Z0DAN Sync long-lasting test
run-name: Add new node into highly loaded multi-master configuration
on:
workflow_dispatch:
schedule:
- cron: '0 22 * * 6' # Saturday, 22:00 (UTC)

permissions:
contents: read

jobs:
pull-and-test:
strategy:
fail-fast: false
matrix:
pgver: [15, 16, 17, 18]

runs-on: ubuntu-24.04-x64

steps:
- name: Checkout spock
uses: actions/checkout@v4
with:
ref: ${{ github.ref }}

- name: Add permissions
run: |
sudo chmod -R a+w ${GITHUB_WORKSPACE}

- name: Set up Docker
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3

- name: Build docker container
run: |
cd ${GITHUB_WORKSPACE}/
docker build \
--build-arg PGVER=${{ matrix.pgver }} \
-t spock -f tests/docker/Dockerfile.el9 .

- name: Run picked TAP tests
run: |
TAP_CT_NAME="spock-tap-${{ matrix.pgver }}-${{ github.run_id }}-${{ github.run_attempt }}"
echo "TAP_CT_NAME=$TAP_CT_NAME" >> "$GITHUB_ENV"
docker run --name "$TAP_CT_NAME" -e PGVER=${{ matrix.pgver }} spock /home/pgedge/run-spock-tap.sh "t/011_zodan_sync_third.pl" 10
timeout-minutes: 1440

- name: Collect TAP artifacts (from container)
if: ${{ always() }}
run: |
docker cp "$TAP_CT_NAME":/home/pgedge/spock/ "$GITHUB_WORKSPACE/results" || true
docker rm -f "$TAP_CT_NAME" || true

- name: Upload TAP artifacts
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: tap-${{ matrix.pgver }}
path: |
${{ github.workspace }}/results
if-no-files-found: ignore
retention-days: 7
14 changes: 7 additions & 7 deletions samples/Z0DAN/wait_subscription.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ BEGIN

SELECT end_time - clock_timestamp() INTO time_remained;
IF time_remained < '0 second' THEN
RETURN state.lag;
RETURN lag;
END IF;

-- NOTE: Remember, an apply group may contain more than a single worker.
SELECT
MAX(remote_insert_lsn) AS remote_write_lsn,
MAX(received_lsn) AS received_lsn
MAX(last_received_lsn) AS last_received_lsn
FROM spock.lag_tracker
WHERE origin_name = remote_node_name AND receiver_name = local_node_name
INTO state;

-- Special case: nothing arrived yet
IF (state.received_lsn = '0/0'::pg_lsn) THEN
IF (state.last_received_lsn = '0/0'::pg_lsn) THEN
IF report_it = true THEN
raise NOTICE 'Replication % -> %: waiting WAL ... . Time remained: % (HH24:MI:SS)',
remote_node_name, local_node_name,
Expand All @@ -59,28 +59,28 @@ BEGIN
IF (state.remote_write_lsn = '0/0'::pg_lsn) THEN
IF report_it = true THEN
raise NOTICE 'Replication % -> %: waiting anything substantial ... Received LSN: %. Time remained: % (HH24:MI:SS)',
remote_node_name, local_node_name, state.received_lsn,
remote_node_name, local_node_name, state.last_received_lsn,
to_char(time_remained, 'HH24:MI:SS');
PERFORM pg_sleep(delay);
CONTINUE;
END IF;

-- Check any progress
IF (state.received_lsn = prev_received_lsn) THEN
IF (state.last_received_lsn = prev_received_lsn) THEN
raise EXCEPTION 'Replication % -> %: publisher seems get stuck into something',
remote_node_name, local_node_name;
END IF;

-- We have a progress, wait further.
prev_received_lsn = state.received_lsn;
prev_received_lsn = state.last_received_lsn;
-- To be sure we get a 'keepalive' message
PERFORM pg_sleep(wal_sender_timeout * 2);

PERFORM pg_sleep(delay);
CONTINUE;
END IF;

SELECT MAX(remote_insert_lsn - received_lsn) FROM spock.lag_tracker
SELECT MAX(remote_insert_lsn - last_received_lsn) FROM spock.lag_tracker
WHERE origin_name = remote_node_name AND receiver_name = local_node_name
INTO lag;

Expand Down
19 changes: 10 additions & 9 deletions samples/Z0DAN/zodan.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def trigger_sync_on_other_nodes_and_wait_on_source(self, src_node_name: str, src
self.notice(f" OK: Triggering sync event on node {rec['node_name']} (LSN: {sync_lsn})...")

# Wait for sync event on source
self.wait_for_sync_event(src_dsn, True, rec['node_name'], sync_lsn, 1200)
self.wait_for_sync_event(src_dsn, True, rec['node_name'], sync_lsn, self.sync_event_timeout)
self.notice(f" OK: Waiting for sync event from {rec['node_name']} on source node {src_node_name}...")

def trigger_source_sync_and_wait_on_new_node(self, src_node_name: str, src_dsn: str,
Expand All @@ -543,8 +543,6 @@ def trigger_source_sync_and_wait_on_new_node(self, src_node_name: str, src_dsn:

# Trigger sync event on source node and wait for it on new node
sync_lsn = None
timeout_ms = 1200 # 20 minutes timeout

try:
# Trigger sync event on new node
sql = "SELECT spock.sync_event();"
Expand All @@ -563,7 +561,7 @@ def trigger_source_sync_and_wait_on_new_node(self, src_node_name: str, src_dsn:

try:
# Wait for sync event on new node
sql = f"CALL spock.wait_for_sync_event(true, '{src_node_name}', '{sync_lsn}'::pg_lsn, {timeout_ms});"
sql = f"CALL spock.wait_for_sync_event(true, '{src_node_name}', '{sync_lsn}'::pg_lsn, {self.sync_event_timeout});"
if self.verbose:
self.info(f" Remote SQL for wait_for_sync_event on new node {new_node_name}: {sql}")

Expand Down Expand Up @@ -748,14 +746,13 @@ def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str,

# Wait for the sync event that was captured when subscription was created
# This ensures the subscription starts replicating from the correct sync point
timeout_ms = 1200 # 20 minutes
sync_lsn = self.sync_lsns[rec['node_name']]['sync_lsn'] # Use stored sync LSN from Phase 3

if sync_lsn:
self.notice(f" OK: Using stored sync event from origin node {rec['node_name']} (LSN: {sync_lsn})...")

# Wait for this sync event on the new node where the subscription exists
sql = f"CALL spock.wait_for_sync_event(true, '{rec['node_name']}', '{sync_lsn}'::pg_lsn, {timeout_ms});"
sql = f"CALL spock.wait_for_sync_event(true, '{rec['node_name']}', '{sync_lsn}'::pg_lsn, {self.sync_event_timeout});"
if self.verbose:
self.info(f" Remote SQL for wait_for_sync_event on new node {new_node_name}: {sql}")

Expand Down Expand Up @@ -979,12 +976,13 @@ def check_spock_version_compatibility(self, src_dsn: str, new_node_dsn: str):

def add_node(self, src_node_name: str, src_dsn: str, new_node_name: str, new_node_dsn: str,
new_node_location: str = "NY", new_node_country: str = "USA",
new_node_info: str = "{}"):
new_node_info: str = "{}", sync_event_timeout: int = 180):
"""Main add_node procedure - matches zodan.sql exactly"""

# Store DSN values as instance attributes to avoid hard-coding
self.src_dsn = src_dsn
self.new_node_dsn = new_node_dsn
self.sync_event_timeout = sync_event_timeout

# Phase 0: Check Spock version compatibility across all nodes
# Example: Ensure all nodes are running the same Spock version before proceeding
Expand Down Expand Up @@ -1332,8 +1330,10 @@ def main():
add_node_parser.add_argument('--new-node-location', default='NY', help='New node location (default: NY)')
add_node_parser.add_argument('--new-node-country', default='USA', help='New node country (default: USA)')
add_node_parser.add_argument('--new-node-info', default='{}', help='New node info JSON (default: {})')
add_node_parser.add_argument('--sync-event-timeout', type=int, default=180,
help='Timeout in seconds for wait_for_sync_event (default: 180)')
add_node_parser.add_argument('--verbose', action='store_true', help='Enable verbose output')

args = parser.parse_args()

if not args.command:
Expand Down Expand Up @@ -1365,7 +1365,8 @@ def main():
args.new_node_dsn,
args.new_node_location,
args.new_node_country,
args.new_node_info
args.new_node_info,
args.sync_event_timeout
)
except Exception as e:
print(f"ERROR: {e}")
Expand Down
Loading
Loading