Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3158c06
keep track of export partition zk requests
arthurpassos Nov 27, 2025
ba581eb
vibe coded getexportpartitioninfo
arthurpassos Nov 27, 2025
efa0b03
move vibe coded getpartitionexports inside the updating task
arthurpassos Nov 27, 2025
acf5fd1
rmv unexistent increment
arthurpassos Nov 28, 2025
fadf2f7
lock part inside task
arthurpassos Nov 28, 2025
d51d703
tmp
arthurpassos Dec 3, 2025
ef5807b
okish
arthurpassos Dec 9, 2025
64accc3
add setting to control locking behavior (inside/outside task), deny e…
arthurpassos Dec 10, 2025
a63e05d
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
arthurpassos Dec 10, 2025
3817e50
clear part references from partition task when status changes, proper…
arthurpassos Dec 11, 2025
85e14f9
settings history
arthurpassos Dec 11, 2025
0d183bf
improvements
arthurpassos Dec 11, 2025
76b3bd6
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
arthurpassos Dec 11, 2025
651d6e4
implement local query to system replicated partition exports
arthurpassos Dec 12, 2025
7a1e741
address comments
arthurpassos Dec 16, 2025
1962e19
glitch
arthurpassos Dec 16, 2025
77c64be
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
arthurpassos Jan 5, 2026
a053fc8
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
arthurpassos Jan 7, 2026
88cfc70
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
arthurpassos Jan 15, 2026
c9d903d
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
arthurpassos Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@
M(ZooKeeperBytesSent, "Number of bytes send over network while communicating with ZooKeeper.", ValueType::Bytes) \
M(ZooKeeperBytesReceived, "Number of bytes received over network while communicating with ZooKeeper.", ValueType::Bytes) \
\
M(ExportPartitionZooKeeperRequests, "Total number of ZooKeeper requests made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGet, "Number of 'get' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetChildren, "Number of 'getChildren' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetChildrenWatch, "Number of 'getChildrenWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetWatch, "Number of 'getWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperCreate, "Number of 'create' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperSet, "Number of 'set' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperRemove, "Number of 'remove' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperRemoveRecursive, "Number of 'removeRecursive' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
\
M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \
M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \
M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.", ValueType::Number) \
Expand Down
8 changes: 8 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6912,6 +6912,14 @@ Possible values:
- `` (empty value) - use session timezone

Default value is `UTC`.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"(
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, true, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled.
)", 0) \
DECLARE(UInt64, export_merge_tree_part_max_bytes_per_file, 0, R"(
Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit.
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."},
{"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
{"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
{"input_format_parquet_verify_checksums", true, true, "New setting."},
{"output_format_parquet_write_checksums", false, true, "New setting."},
Expand Down
19 changes: 10 additions & 9 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
bool lock_inside_the_task; /// todo temporary

std::string toJsonString() const
{
Expand All @@ -139,6 +140,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("lock_inside_the_task", lock_inside_the_task);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -168,18 +170,17 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.max_threads = json->getValue<size_t>("max_threads");
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
/// todo what to do if it's not a valid value?
if (file_already_exists_policy)
{
manifest.file_already_exists_policy = file_already_exists_policy.value();
}

manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
if (json->has("file_already_exists_policy"))
{
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
if (file_already_exists_policy)
{
manifest.file_already_exists_policy = file_already_exists_policy.value();
}

/// what to do if it's not a valid value?
}
manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

return manifest;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
/// This is used to prevent the parts from being deleted before finishing the export operation
/// It does not mean this replica will export all the parts
/// There is also a chance this replica does not contain a given part and it is totally ok.
std::vector<DataPartPtr> part_references;
mutable std::vector<DataPartPtr> part_references;

std::string getCompositeKey() const
{
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/BackgroundJobsAssignee.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, b
return schedule_res;
}

std::size_t BackgroundJobsAssignee::getAvailableMoveExecutors() const
{
return getContext()->getMovesExecutor()->getAvailableSlots();
}

String BackgroundJobsAssignee::toString(Type type)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/BackgroundJobsAssignee.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class BackgroundJobsAssignee : public WithContext
bool scheduleMoveTask(ExecutableTaskPtr move_task);
bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);

std::size_t getAvailableMoveExecutors() const;

/// Just call finish
~BackgroundJobsAssignee();

Expand Down
68 changes: 68 additions & 0 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include <Storages/MergeTree/ExportPartFromPartitionExportTask.h>
#include <Common/ProfileEvents.h>

namespace ProfileEvents
{
extern const Event ExportPartitionZooKeeperRequests;
extern const Event ExportPartitionZooKeeperGetChildren;
extern const Event ExportPartitionZooKeeperCreate;
}
namespace DB
{

ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_)
: storage(storage_),
key(key_),
manifest(manifest_)
{
export_part_task = std::make_shared<ExportPartTask>(storage, manifest);
}

bool ExportPartFromPartitionExportTask::executeStep()
{
const auto zk = storage.getZooKeeper();
const auto part_name = manifest.data_part->name;

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
{
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
export_part_task->executeStep();
return false;
}

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name);
return false;
}

void ExportPartFromPartitionExportTask::cancel() noexcept
{
export_part_task->cancel();
}

void ExportPartFromPartitionExportTask::onCompleted()
{
export_part_task->onCompleted();
}

StorageID ExportPartFromPartitionExportTask::getStorageID() const
{
return export_part_task->getStorageID();
}

Priority ExportPartFromPartitionExportTask::getPriority() const
{
return export_part_task->getPriority();
}

String ExportPartFromPartitionExportTask::getQueryId() const
{
return export_part_task->getQueryId();
}
}
36 changes: 36 additions & 0 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ExportPartTask.h>

namespace DB
{

/*
Decorator around the ExportPartTask to lock the part inside the task
*/
class ExportPartFromPartitionExportTask : public IExecutableTask
{
public:
explicit ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_);
bool executeStep() override;
void onCompleted() override;
StorageID getStorageID() const override;
Priority getPriority() const override;
String getQueryId() const override;

void cancel() noexcept override;

private:
StorageReplicatedMergeTree & storage;
std::string key;
MergeTreePartExportManifest manifest;
std::shared_ptr<ExportPartTask> export_part_task;
};

}
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
{
}

const MergeTreePartExportManifest & ExportPartTask::getManifest() const
{
return manifest;
}

bool ExportPartTask::executeStep()
{
auto local_context = Context::createCopy(storage.getContext());
Expand Down Expand Up @@ -304,6 +309,7 @@ bool ExportPartTask::executeStep()

void ExportPartTask::cancel() noexcept
{
LOG_INFO(getLogger("ExportPartTask"), "Export part {} task cancel() method called", manifest.data_part->name);
cancel_requested.store(true);
pipeline.cancel();
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ExportPartTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ExportPartTask : public IExecutableTask
StorageID getStorageID() const override;
Priority getPriority() const override;
String getQueryId() const override;
const MergeTreePartExportManifest & getManifest() const;

void cancel() noexcept override;

Expand Down
Loading
Loading