Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,19 @@
public interface HAGroupStateListener {

/**
* Called when an HA group state transition occurs.
* <p>
* Called when an HA group state transition occurs. ZK Client listens to changes and sends update
* to subscribers using a single thread to guarantee ordering of events. Subscribers get the state
* transition callback through this implementation.
* </p>
* <p>
* For example, if subscriber has subscribed to ACTIVE_NOT_IN_SYNC state on peer cluster, and the
* state transition happens from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the subscriber will get the
* callback with the following parameters: - haGroupName: the name of the HA group that
* transitioned - fromState: ACTIVE_IN_SYNC - toState: ACTIVE_NOT_IN_SYNC - modifiedTime: the time
* the state transition occurred - clusterType: PEER\ - lastSyncStateTimeInMs: the time we were in
* sync state.
* </p>
* <p>
* Implementations should be fast and non-blocking to avoid impacting the HA group state
* management system. If heavy processing is required, consider delegating to a separate thread.
Expand All @@ -49,7 +61,7 @@ public interface HAGroupStateListener {
* @param toState the new state after the transition
* @param modifiedTime the time the state transition occurred
* @param clusterType whether this transition occurred on the local or peer cluster
* @param lastSyncStateTimeInMs the time we were in sync state, can be null.
* @param lastSyncStateTimeInMs the time we were in sync state.
* @throws Exception implementations may throw exceptions, but they will be logged and will not
* prevent other listeners from being notified
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,22 @@ public HAGroupStoreRecord getHAGroupStoreRecord() throws IOException {
public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupState)
throws IOException, InvalidClusterRoleTransitionException, SQLException,
StaleHAGroupStoreRecordVersionException {
setHAGroupStatusIfNeeded(haGroupState, null);
}

/**
* Set the HA group status for the specified HA group name. Checks if the status is needed to be
* updated based on logic in isUpdateNeeded function.
* @param haGroupState the HA group state to set
* @param lastSyncTimeInMsNullable the last sync time in milliseconds, can be null if not known.
* @throws IOException if the client is not healthy or the operation
* fails
* @throws StaleHAGroupStoreRecordVersionException if the version is stale
* @throws InvalidClusterRoleTransitionException when transition is not valid
*/
public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupState,
Long lastSyncTimeInMsNullable) throws IOException, InvalidClusterRoleTransitionException,
SQLException, StaleHAGroupStoreRecordVersionException {
Preconditions.checkNotNull(haGroupState, "haGroupState cannot be null");
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
Expand All @@ -323,7 +339,9 @@ public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupStat
// Once state changes back to ACTIVE_IN_SYNC or the role is
// NOT ACTIVE or ACTIVE_TO_STANDBY
// set the time to null to mark that we are current(or we don't have any reader).
Long lastSyncTimeInMs = currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
long lastSyncTimeInMs = lastSyncTimeInMsNullable != null
? lastSyncTimeInMsNullable
: currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
ClusterRole clusterRole = haGroupState.getClusterRole();
if (
currentHAGroupStoreRecord.getHAGroupState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ public class HAGroupStoreManager {

/**
* Functional interface for resolving target local states based on current local state when peer
* cluster transitions occur.
* cluster transitions occur. This is used in FailoverManagementListener to determine the target
* state based on the current local state.
* <p>
* For example if peer transitions from AIS -> ANIS, the target state changes from STANDBY ->
* DEGRADED_STANDBY
* </p>
*/
@FunctionalInterface
private interface TargetStateResolver {
Expand Down Expand Up @@ -487,19 +492,18 @@ public ClusterRoleRecord getClusterRoleRecord(String haGroupName) throws SQLExce
/**
* Subscribe to be notified when any transition to a target state occurs.
* @param haGroupName the name of the HA group to monitor
* @param targetState the target state to watch for
* @param toState the target state to watch for
* @param clusterType whether to monitor local or peer cluster
* @param listener the listener to notify when any transition to the target state occurs
* @throws IOException if unable to get HAGroupStoreClient instance
*/
public void subscribeToTargetState(String haGroupName,
HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
HAGroupStateListener listener) throws IOException {
public void subscribeToTargetState(String haGroupName, HAGroupStoreRecord.HAGroupState toState,
ClusterType clusterType, HAGroupStateListener listener) throws IOException {
HAGroupStoreClient client = getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
client.subscribeToTargetState(targetState, clusterType, listener);
client.subscribeToTargetState(toState, clusterType, listener);
LOGGER.debug(
"Delegated subscription to target state {} " + "for HA group {} on {} cluster to client",
targetState, haGroupName, clusterType);
toState, haGroupName, clusterType);
}

/**
Expand Down Expand Up @@ -542,6 +546,19 @@ private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) throw
* Helper method to get HAGroupStoreClient instance and setup failover management. NOTE: As soon
* as the HAGroupStoreClient is initialized, it will setup the failover management as well.
* Failover management is only set up once per HA group to prevent duplicate subscriptions.
* Failover management is responsible for handling the state transitions on the local and peer
* clusters and react accordingly. Failover management handles peer state transitions and local
* state transitions.
* <p>
* Example of peer state transition: For example, if the peer cluster transitions from
* ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will transition the local cluster
* from STANDBY to DEGRADED_STANDBY.
* </p>
* <p>
* Example of local state transition: For example, if the local cluster transitions to
* ABORT_TO_STANDBY, the failover management will transition the local cluster from
* ABORT_TO_STANDBY to STANDBY.
* </p>
* @param haGroupName name of the HA group
* @return HAGroupStoreClient instance for the specified HA group
* @throws IOException when HAGroupStoreClient is not initialized
Expand All @@ -564,6 +581,22 @@ private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) throw

// ===== Failover Management Related Methods =====

/**
* Setup local failover management for the given HA group. Local failover management is
* responsible for handling the state transitions on the local cluster. Local failover management
* handles local state transitions.
* <p>
* Example of local state transition: For example, if the local cluster transitions to
* ABORT_TO_STANDBY, the failover management will transition the local cluster from
* ABORT_TO_STANDBY to STANDBY.
* </p>
* When we subscribe to the target state, we provide a FailoverManagementListener instance. The
* FailoverManagementListener implements the HAGroupStateListener interface and overrides the
* onStateChange method. The onStateChange method is called when a state change event occurs. It
* is passed the haGroupName, fromState, toState, clusterType, and lastSyncStateTimeInMs
* parameters. It is responsible for determining the target state and transitioning the local
* cluster to the target state based on target state resolver.
*/
public void setupLocalFailoverManagement(String haGroupName) throws IOException {
HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);

Expand Down Expand Up @@ -591,6 +624,17 @@ private static class FailoverManagementListener implements HAGroupStateListener
this.resolver = resolver;
}

/**
* <p>
* Example of peer state transition: For example, if the peer cluster transitions from
* ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will transition the local
* cluster from STANDBY to DEGRADED_STANDBY.
* </p>
* Example input will look like this: haGroupName: test-ha-group fromState: ACTIVE_IN_SYNC
* toState: ACTIVE_NOT_IN_SYNC clusterType: PEER lastSyncStateTimeInMs: 1719859200000 Based on
* this input, the failover management will transition the local cluster from STANDBY to
* DEGRADED_STANDBY. The output state will be determined by the TargetStateResolver.
*/
@Override
public void onStateChange(String haGroupName, HAGroupState fromState, HAGroupState toState,
long modifiedTime, ClusterType clusterType, Long lastSyncStateTimeInMs) {
Expand All @@ -616,8 +660,18 @@ public void onStateChange(String haGroupName, HAGroupState fromState, HAGroupSta
return;
}

// If the target state is STANDBY, and we get an event from
// PEER cluster, we copy over the lastSyncTimeInMs from PEER event notification.
Long lastSyncTimeInMsNullable = null;
if (
targetState.getClusterRole() == ClusterRoleRecord.ClusterRole.STANDBY
&& clusterType == ClusterType.PEER
) {
lastSyncTimeInMsNullable = lastSyncStateTimeInMs;
}

// Execute transition if valid
client.setHAGroupStatusIfNeeded(targetState);
client.setHAGroupStatusIfNeeded(targetState, lastSyncTimeInMsNullable);

LOGGER.info(
"Failover management transition: peer {} -> {}, " + "local {} -> {} for HA group: {}",
Expand All @@ -636,6 +690,16 @@ public void onStateChange(String haGroupName, HAGroupState fromState, HAGroupSta
}
}

/**
* Setup peer failover management for the given HA group. Peer failover management is responsible
* for handling the state transitions on the peer cluster. Peer failover management handles peer
* state transitions.
* <p>
* Example of peer state transition: For example, if the peer cluster transitions from
* ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will transition the local cluster
* from STANDBY to DEGRADED_STANDBY.
* </p>
*/
public void setupPeerFailoverManagement(String haGroupName) throws IOException {
HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,39 @@ public void testSetHAGroupStatusIfNeededUpdateExistingRecord() throws Exception
currentRecord.getHAGroupState());
}

@Test
public void testSetHAGroupStatusIfNeededWithExplicitLastSyncTimeUpdateExistingRecord()
throws Exception {
String haGroupName = testName.getMethodName();

// Create initial record
HAGroupStoreRecord initialRecord =
new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl,
this.peerMasterUrl, 0L);
createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, initialRecord);

HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient
.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl);
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);

// Verify initial state
HAGroupStoreRecord currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, currentRecord.getHAGroupState());

// Update to STANDBY (this should succeed as it's a valid transition)
long timestamp = System.currentTimeMillis();
haGroupStoreClient.setHAGroupStatusIfNeeded(
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, timestamp);
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);

// Verify the record was updated
currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
currentRecord.getHAGroupState());
assertEquals(timestamp, (long) currentRecord.getLastSyncStateTimeInMs());
}

@Test
public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws Exception {
String haGroupName = testName.getMethodName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -848,6 +849,8 @@ public void testE2EStoreAndForwardWithAutomaticStateTransitions() throws Excepti
assertTrue("Cluster2 record should be present", cluster2Record.isPresent());
assertEquals("Cluster2 should be in STANDBY state", HAGroupStoreRecord.HAGroupState.STANDBY,
cluster2Record.get().getHAGroupState());
assertEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs());
assertEquals(0L, (long) cluster2Record.get().getLastSyncStateTimeInMs());

// === STEP 1: Transition to store-and-forward mode ===
// Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC (store-and-forward mode)
Expand All @@ -869,6 +872,9 @@ public void testE2EStoreAndForwardWithAutomaticStateTransitions() throws Excepti
assertTrue("Cluster2 record should be present", cluster2Record.isPresent());
assertEquals("Cluster2 should automatically transition to DEGRADED_STANDBY",
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, cluster2Record.get().getHAGroupState());
assertNotEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs());
assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
cluster1Record.get().getLastSyncStateTimeInMs());

// === STEP 3: Return to sync mode ===
// Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
Expand All @@ -892,5 +898,8 @@ public void testE2EStoreAndForwardWithAutomaticStateTransitions() throws Excepti
assertTrue("Cluster2 record should be present", cluster2Record.isPresent());
assertEquals("Cluster2 should automatically transition back to STANDBY",
HAGroupStoreRecord.HAGroupState.STANDBY, cluster2Record.get().getHAGroupState());
assertNotEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs());
assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
cluster1Record.get().getLastSyncStateTimeInMs());
}
}