diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java index e0e313aeb2b..e1592933535 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java @@ -37,7 +37,19 @@ public interface HAGroupStateListener { /** - * Called when an HA group state transition occurs. + *

+ * Called when an HA group state transition occurs. ZK Client listens to changes and sends update + * to subscribers using a single thread to guarantee ordering of events. Subscribers get the state + * transition callback through this implementation. + *

+ *

+ * For example, if subscriber has subscribed to ACTIVE_NOT_IN_SYNC state on peer cluster, and the + * state transition happens from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the subscriber will get the + * callback with the following parameters: - haGroupName: the name of the HA group that + * transitioned - fromState: ACTIVE_IN_SYNC - toState: ACTIVE_NOT_IN_SYNC - modifiedTime: the time + * the state transition occurred - clusterType: PEER\ - lastSyncStateTimeInMs: the time we were in + * sync state. + *

*

* Implementations should be fast and non-blocking to avoid impacting the HA group state * management system. If heavy processing is required, consider delegating to a separate thread. @@ -49,7 +61,7 @@ public interface HAGroupStateListener { * @param toState the new state after the transition * @param modifiedTime the time the state transition occurred * @param clusterType whether this transition occurred on the local or peer cluster - * @param lastSyncStateTimeInMs the time we were in sync state, can be null. + * @param lastSyncStateTimeInMs the time we were in sync state. * @throws Exception implementations may throw exceptions, but they will be logged and will not * prevent other listeners from being notified */ diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index 44b9b833f98..957c803f316 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -301,6 +301,22 @@ public HAGroupStoreRecord getHAGroupStoreRecord() throws IOException { public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupState) throws IOException, InvalidClusterRoleTransitionException, SQLException, StaleHAGroupStoreRecordVersionException { + setHAGroupStatusIfNeeded(haGroupState, null); + } + + /** + * Set the HA group status for the specified HA group name. Checks if the status is needed to be + * updated based on logic in isUpdateNeeded function. + * @param haGroupState the HA group state to set + * @param lastSyncTimeInMsNullable the last sync time in milliseconds, can be null if not known. + * @throws IOException if the client is not healthy or the operation + * fails + * @throws StaleHAGroupStoreRecordVersionException if the version is stale + * @throws InvalidClusterRoleTransitionException when transition is not valid + */ + public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupState, + Long lastSyncTimeInMsNullable) throws IOException, InvalidClusterRoleTransitionException, + SQLException, StaleHAGroupStoreRecordVersionException { Preconditions.checkNotNull(haGroupState, "haGroupState cannot be null"); if (!isHealthy) { throw new IOException("HAGroupStoreClient is not healthy"); @@ -323,7 +339,9 @@ public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupStat // Once state changes back to ACTIVE_IN_SYNC or the role is // NOT ACTIVE or ACTIVE_TO_STANDBY // set the time to null to mark that we are current(or we don't have any reader). - Long lastSyncTimeInMs = currentHAGroupStoreRecord.getLastSyncStateTimeInMs(); + long lastSyncTimeInMs = lastSyncTimeInMsNullable != null + ? lastSyncTimeInMsNullable + : currentHAGroupStoreRecord.getLastSyncStateTimeInMs(); ClusterRole clusterRole = haGroupState.getClusterRole(); if ( currentHAGroupStoreRecord.getHAGroupState() diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java index 30173124951..a51f730de1f 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java @@ -74,7 +74,12 @@ public class HAGroupStoreManager { /** * Functional interface for resolving target local states based on current local state when peer - * cluster transitions occur. + * cluster transitions occur. This is used in FailoverManagementListener to determine the target + * state based on the current local state. + *

+ * For example if peer transitions from AIS -> ANIS, the target state changes from STANDBY -> + * DEGRADED_STANDBY + *

*/ @FunctionalInterface private interface TargetStateResolver { @@ -487,19 +492,18 @@ public ClusterRoleRecord getClusterRoleRecord(String haGroupName) throws SQLExce /** * Subscribe to be notified when any transition to a target state occurs. * @param haGroupName the name of the HA group to monitor - * @param targetState the target state to watch for + * @param toState the target state to watch for * @param clusterType whether to monitor local or peer cluster * @param listener the listener to notify when any transition to the target state occurs * @throws IOException if unable to get HAGroupStoreClient instance */ - public void subscribeToTargetState(String haGroupName, - HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType, - HAGroupStateListener listener) throws IOException { + public void subscribeToTargetState(String haGroupName, HAGroupStoreRecord.HAGroupState toState, + ClusterType clusterType, HAGroupStateListener listener) throws IOException { HAGroupStoreClient client = getHAGroupStoreClientAndSetupFailoverManagement(haGroupName); - client.subscribeToTargetState(targetState, clusterType, listener); + client.subscribeToTargetState(toState, clusterType, listener); LOGGER.debug( "Delegated subscription to target state {} " + "for HA group {} on {} cluster to client", - targetState, haGroupName, clusterType); + toState, haGroupName, clusterType); } /** @@ -542,6 +546,19 @@ private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) throw * Helper method to get HAGroupStoreClient instance and setup failover management. NOTE: As soon * as the HAGroupStoreClient is initialized, it will setup the failover management as well. * Failover management is only set up once per HA group to prevent duplicate subscriptions. + * Failover management is responsible for handling the state transitions on the local and peer + * clusters and react accordingly. Failover management handles peer state transitions and local + * state transitions. + *

+ * Example of peer state transition: For example, if the peer cluster transitions from + * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will transition the local cluster + * from STANDBY to DEGRADED_STANDBY. + *

+ *

+ * Example of local state transition: For example, if the local cluster transitions to + * ABORT_TO_STANDBY, the failover management will transition the local cluster from + * ABORT_TO_STANDBY to STANDBY. + *

* @param haGroupName name of the HA group * @return HAGroupStoreClient instance for the specified HA group * @throws IOException when HAGroupStoreClient is not initialized @@ -564,6 +581,22 @@ private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) throw // ===== Failover Management Related Methods ===== + /** + * Setup local failover management for the given HA group. Local failover management is + * responsible for handling the state transitions on the local cluster. Local failover management + * handles local state transitions. + *

+ * Example of local state transition: For example, if the local cluster transitions to + * ABORT_TO_STANDBY, the failover management will transition the local cluster from + * ABORT_TO_STANDBY to STANDBY. + *

+ * When we subscribe to the target state, we provide a FailoverManagementListener instance. The + * FailoverManagementListener implements the HAGroupStateListener interface and overrides the + * onStateChange method. The onStateChange method is called when a state change event occurs. It + * is passed the haGroupName, fromState, toState, clusterType, and lastSyncStateTimeInMs + * parameters. It is responsible for determining the target state and transitioning the local + * cluster to the target state based on target state resolver. + */ public void setupLocalFailoverManagement(String haGroupName) throws IOException { HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); @@ -591,6 +624,17 @@ private static class FailoverManagementListener implements HAGroupStateListener this.resolver = resolver; } + /** + *

+ * Example of peer state transition: For example, if the peer cluster transitions from + * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will transition the local + * cluster from STANDBY to DEGRADED_STANDBY. + *

+ * Example input will look like this: haGroupName: test-ha-group fromState: ACTIVE_IN_SYNC + * toState: ACTIVE_NOT_IN_SYNC clusterType: PEER lastSyncStateTimeInMs: 1719859200000 Based on + * this input, the failover management will transition the local cluster from STANDBY to + * DEGRADED_STANDBY. The output state will be determined by the TargetStateResolver. + */ @Override public void onStateChange(String haGroupName, HAGroupState fromState, HAGroupState toState, long modifiedTime, ClusterType clusterType, Long lastSyncStateTimeInMs) { @@ -616,8 +660,18 @@ public void onStateChange(String haGroupName, HAGroupState fromState, HAGroupSta return; } + // If the target state is STANDBY, and we get an event from + // PEER cluster, we copy over the lastSyncTimeInMs from PEER event notification. + Long lastSyncTimeInMsNullable = null; + if ( + targetState.getClusterRole() == ClusterRoleRecord.ClusterRole.STANDBY + && clusterType == ClusterType.PEER + ) { + lastSyncTimeInMsNullable = lastSyncStateTimeInMs; + } + // Execute transition if valid - client.setHAGroupStatusIfNeeded(targetState); + client.setHAGroupStatusIfNeeded(targetState, lastSyncTimeInMsNullable); LOGGER.info( "Failover management transition: peer {} -> {}, " + "local {} -> {} for HA group: {}", @@ -636,6 +690,16 @@ public void onStateChange(String haGroupName, HAGroupState fromState, HAGroupSta } } + /** + * Setup peer failover management for the given HA group. Peer failover management is responsible + * for handling the state transitions on the peer cluster. Peer failover management handles peer + * state transitions. + *

+ * Example of peer state transition: For example, if the peer cluster transitions from + * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will transition the local cluster + * from STANDBY to DEGRADED_STANDBY. + *

+ */ public void setupPeerFailoverManagement(String haGroupName) throws IOException { HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index e941e8213d2..3f59115503c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -625,6 +625,39 @@ public void testSetHAGroupStatusIfNeededUpdateExistingRecord() throws Exception currentRecord.getHAGroupState()); } + @Test + public void testSetHAGroupStatusIfNeededWithExplicitLastSyncTimeUpdateExistingRecord() + throws Exception { + String haGroupName = testName.getMethodName(); + + // Create initial record + HAGroupStoreRecord initialRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, initialRecord); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify initial state + HAGroupStoreRecord currentRecord = haGroupStoreClient.getHAGroupStoreRecord(); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, currentRecord.getHAGroupState()); + + // Update to STANDBY (this should succeed as it's a valid transition) + long timestamp = System.currentTimeMillis(); + haGroupStoreClient.setHAGroupStatusIfNeeded( + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, timestamp); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify the record was updated + currentRecord = haGroupStoreClient.getHAGroupStoreRecord(); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, + currentRecord.getHAGroupState()); + assertEquals(timestamp, (long) currentRecord.getLastSyncStateTimeInMs()); + } + @Test public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws Exception { String haGroupName = testName.getMethodName(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java index f15243e6e34..801d7795885 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -848,6 +849,8 @@ public void testE2EStoreAndForwardWithAutomaticStateTransitions() throws Excepti assertTrue("Cluster2 record should be present", cluster2Record.isPresent()); assertEquals("Cluster2 should be in STANDBY state", HAGroupStoreRecord.HAGroupState.STANDBY, cluster2Record.get().getHAGroupState()); + assertEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs()); + assertEquals(0L, (long) cluster2Record.get().getLastSyncStateTimeInMs()); // === STEP 1: Transition to store-and-forward mode === // Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC (store-and-forward mode) @@ -869,6 +872,9 @@ public void testE2EStoreAndForwardWithAutomaticStateTransitions() throws Excepti assertTrue("Cluster2 record should be present", cluster2Record.isPresent()); assertEquals("Cluster2 should automatically transition to DEGRADED_STANDBY", HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, cluster2Record.get().getHAGroupState()); + assertNotEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs()); + assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(), + cluster1Record.get().getLastSyncStateTimeInMs()); // === STEP 3: Return to sync mode === // Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC @@ -892,5 +898,8 @@ public void testE2EStoreAndForwardWithAutomaticStateTransitions() throws Excepti assertTrue("Cluster2 record should be present", cluster2Record.isPresent()); assertEquals("Cluster2 should automatically transition back to STANDBY", HAGroupStoreRecord.HAGroupState.STANDBY, cluster2Record.get().getHAGroupState()); + assertNotEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs()); + assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(), + cluster1Record.get().getLastSyncStateTimeInMs()); } }