Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
Expand All @@ -69,10 +70,14 @@ public void start(CoprocessorEnvironment env) throws IOException {
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
this.zkUrl = getLocalZkUrl(conf);
// Start replication log replay
ReplicationLogReplayService.getInstance(conf).start();
}

@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// Stop replication log replay
ReplicationLogReplayService.getInstance(conf).stop();
RegionServerCoprocessor.super.stop(env);
if (uncoveredIndexThreadPool != null) {
uncoveredIndexThreadPool
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public class ReplicationLogGroup {
"phoenix.replication.log.standby.hdfs.url";
public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
"phoenix.replication.log.fallback.hdfs.url";
public static final String REPLICATION_NUM_SHARDS_KEY = "phoenix.replication.log.shards";
public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
"phoenix.replication.log.rotation.time.ms";
public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L;
public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
"phoenix.replication.log.rotation.size.bytes";
public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 1024 * 1024L;
Expand All @@ -85,8 +85,7 @@ public class ReplicationLogGroup {
"phoenix.replication.log.retry.delay.ms";
public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;

public static final String SHARD_DIR_FORMAT = "%05d";
public static final String FILE_NAME_FORMAT = "%d-%s.plog";
public static final String FILE_NAME_FORMAT = "%d_%s.plog";

/** Cache of ReplicationLogGroup instances by HA Group ID */
protected static final ConcurrentHashMap<String, ReplicationLogGroup> INSTANCES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public abstract class ReplicationLogGroupWriter {
protected Disruptor<LogEvent> disruptor;
protected RingBuffer<LogEvent> ringBuffer;
protected volatile boolean closed = false;
protected ReplicationShardDirectoryManager replicationShardDirectoryManager;

/** The reason for requesting a log rotation. */
protected enum RotationReason {
Expand Down Expand Up @@ -175,6 +176,7 @@ protected ReplicationLogGroupWriter(ReplicationLogGroup logGroup) {
/** Initialize the writer. */
public void init() throws IOException {
initializeFileSystems();
initializeReplicationShardDirectoryManager();
// Start time based rotation.
lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
startRotationExecutor();
Expand Down Expand Up @@ -246,6 +248,12 @@ public void sync() throws IOException {
/** Initialize file systems needed by this writer implementation. */
protected abstract void initializeFileSystems() throws IOException;

/**
* Initialize the {@link ReplicationShardDirectoryManager} to manage file to shard directory
* mapping
*/
protected abstract void initializeReplicationShardDirectoryManager();

/**
* Create a new log writer for rotation.
*/
Expand Down
Loading