Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/commands/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(SRCS_COMMANDS
${CMAKE_CURRENT_LIST_DIR}/ft_debug.cc
${CMAKE_CURRENT_LIST_DIR}/ft_dropindex.cc
${CMAKE_CURRENT_LIST_DIR}/ft_info.cc
${CMAKE_CURRENT_LIST_DIR}/ft_internal_update.cc
${CMAKE_CURRENT_LIST_DIR}/ft_list.cc
${CMAKE_CURRENT_LIST_DIR}/ft_search.cc
${CMAKE_CURRENT_LIST_DIR}/commands.h
Expand Down
5 changes: 5 additions & 0 deletions src/commands/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ constexpr absl::string_view kListCommand{"FT._LIST"};
constexpr absl::string_view kSearchCommand{"FT.SEARCH"};
constexpr absl::string_view kDebugCommand{"FT._DEBUG"};
constexpr absl::string_view kAggregateCommand{"FT.AGGREGATE"};
constexpr absl::string_view kInternalUpdateCommand{"FT.INTERNAL_UPDATE"};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be FT._INTERNAL_UPDATE given it is meant to be internal/hidden. Perhaps FT._UPDATE_INDEX.


const absl::flat_hash_set<absl::string_view> kCreateCmdPermissions{
kSearchCategory, kWriteCategory, kFastCategory};
const absl::flat_hash_set<absl::string_view> kDropIndexCmdPermissions{
kSearchCategory, kWriteCategory, kFastCategory};
const absl::flat_hash_set<absl::string_view> kInternalUpdateCmdPermissions{
kAdminCategory};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also be write and fast categories?

const absl::flat_hash_set<absl::string_view> kSearchCmdPermissions{
kSearchCategory, kReadCategory, kSlowCategory};
const absl::flat_hash_set<absl::string_view> kInfoCmdPermissions{
Expand Down Expand Up @@ -79,6 +82,8 @@ absl::Status FTDebugCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc);
absl::Status FTAggregateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc);
absl::Status FTInternalUpdateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc);

//
// Common stuff for FT.SEARCH and FT.AGGREGATE command
Expand Down
6 changes: 5 additions & 1 deletion src/commands/ft_create.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ absl::Status FTCreateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
}
ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
ValkeyModule_ReplicateVerbatim(ctx);

// Replicate ft.create only for cmd clusters, whereas ft.internal_update will be replicated for CME clusters
if (!ValkeySearch::Instance().IsCluster()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right test for this might be !coordinator::Enabled. Not 100% sure. It certainly won't matter for us.

ValkeyModule_ReplicateVerbatim(ctx);
}
return absl::OkStatus();
}
} // namespace valkey_search
6 changes: 5 additions & 1 deletion src/commands/ft_dropindex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ absl::Status FTDropIndexCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
}
ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
ValkeyModule_ReplicateVerbatim(ctx);

// Replicate ft.drop only for cmd clusters, whereas ft.internal_update will be replicated for CME clusters
if (!ValkeySearch::Instance().IsCluster()) {
ValkeyModule_ReplicateVerbatim(ctx);
}
return absl::OkStatus();
}
} // namespace valkey_search
55 changes: 55 additions & 0 deletions src/commands/ft_internal_update.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2025, valkey-search contributors
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*
*/

#include "src/commands/commands.h"
#include "src/commands/ft_create_parser.h"
#include "src/index_schema.pb.h"
#include "src/schema_manager.h"

namespace valkey_search {

constexpr int kFTInternalUpdateArgCount = 4;

absl::Status FTInternalUpdateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc != kFTInternalUpdateArgCount) {
return absl::InvalidArgumentError("ERR wrong number of arguments for FT_INTERNAL_UPDATE");
}

// Parse index ID
size_t id_len;
const char *id_data = ValkeyModule_StringPtrLen(argv[1], &id_len);
std::string id(id_data, id_len);

// Deserialize GlobalMetadataEntry
size_t metadata_len;
const char *metadata_data = ValkeyModule_StringPtrLen(argv[2], &metadata_len);
coordinator::GlobalMetadataEntry metadata_entry;
if (!metadata_entry.ParseFromArray(metadata_data, metadata_len)) {
return absl::InvalidArgumentError("ERR failed to parse GlobalMetadataEntry");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This merits a log entry and some kind of error counter. Either we have a logic problem, data corruption or a bad actor. In any case, this seems like something that wants to trigger an alarm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably in a poison-pill situation. This is VERY bad.

}

// Deserialize GlobalMetadataVersionHeader
size_t header_len;
const char *header_data = ValkeyModule_StringPtrLen(argv[3], &header_len);
coordinator::GlobalMetadataVersionHeader version_header;
if (!version_header.ParseFromArray(header_data, header_len)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

return absl::InvalidArgumentError("ERR failed to parse GlobalMetadataVersionHeader");
}

auto status = coordinator::MetadataManager::Instance().ProcessInternalUpdate(
ctx, kSchemaManagerMetadataTypeName, id, &metadata_entry, &version_header);
if (!status.ok()) {
return status;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably same here too.

}

ValkeyModule_ReplicateVerbatim(ctx);

ValkeyModule_ReplyWithSimpleString(ctx, "OK");
return absl::OkStatus();
}

} // namespace valkey_search
145 changes: 135 additions & 10 deletions src/coordinator/metadata_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <algorithm>
#include <cstdint>
#include <future>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -205,6 +206,23 @@ absl::StatusOr<IndexFingerprintVersion> MetadataManager::CreateEntry(
metadata.version_header().top_level_version() + 1);
metadata.mutable_version_header()->set_top_level_fingerprint(
ComputeTopLevelFingerprint(metadata.type_namespace_map()));


metadata_ = metadata;

// Call FT.INTERNAL_UPDATE for coordinator to ensure unified AOF replication
std::string metadata_binary, header_binary;
new_entry.SerializeToString(&metadata_binary);
metadata.version_header().SerializeToString(&header_binary);

ValkeyModuleCallReply *reply = ValkeyModule_Call(detached_ctx_.get(), "FT.INTERNAL_UPDATE", "!Kcbb",
std::string(id).c_str(),
metadata_binary.data(), metadata_binary.size(),
header_binary.data(), header_binary.size());
if (reply) {
ValkeyModule_FreeCallReply(reply);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else crash?


BroadcastMetadata(detached_ctx_.get(), metadata.version_header());
IndexFingerprintVersion index_fingerprint_version;
index_fingerprint_version.set_fingerprint(fingerprint);
Expand Down Expand Up @@ -246,6 +264,20 @@ absl::Status MetadataManager::DeleteEntry(absl::string_view type_name,
metadata.version_header().top_level_version() + 1);
metadata.mutable_version_header()->set_top_level_fingerprint(
ComputeTopLevelFingerprint(metadata.type_namespace_map()));

// Call FT.INTERNAL_UPDATE for coordinator to ensure unified AOF replication for DROP
std::string metadata_binary, header_binary;
new_entry.SerializeToString(&metadata_binary);
metadata.version_header().SerializeToString(&header_binary);

ValkeyModuleCallReply *reply = ValkeyModule_Call(detached_ctx_.get(), "FT.INTERNAL_UPDATE", "!Kcbb",
std::string(id).c_str(),
metadata_binary.data(), metadata_binary.size(),
header_binary.data(), header_binary.size());
if (reply) {
ValkeyModule_FreeCallReply(reply);
}

BroadcastMetadata(detached_ctx_.get(), metadata.version_header());
return absl::OkStatus();
}
Expand Down Expand Up @@ -284,10 +316,36 @@ void MetadataManager::BroadcastMetadata(
}
std::string payload;
version_header.SerializeToString(&payload);
// Nullptr for target means broadcast to all.
ValkeyModule_SendClusterMessage(ctx, /* target= */ nullptr,
kMetadataBroadcastClusterMessageReceiverId,
payload.c_str(), payload.size());

// Get all cluster nodes and send only to masters
size_t num_nodes;
char **node_list = ValkeyModule_GetClusterNodesList(ctx, &num_nodes);
if (node_list == nullptr) {
VMSDK_LOG(WARNING, ctx) << "Failed to get cluster nodes list";
return;
}

for (size_t i = 0; i < num_nodes; i++) {
std::string node_id(node_list[i], VALKEYMODULE_NODE_ID_LEN);
int flags;
if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), nullptr, nullptr, nullptr, &flags) == VALKEYMODULE_OK) {
// Only send to master nodes, skip replicas
if (flags & VALKEYMODULE_NODE_MASTER) {
ValkeyModule_SendClusterMessage(ctx, node_id.c_str(),
kMetadataBroadcastClusterMessageReceiverId,
payload.c_str(), payload.size());
} else {
VMSDK_LOG_EVERY_N_SEC(NOTICE, ctx, 10)
<< "Skipping metadata broadcast to non-master node " << node_id
<< " (flags=" << flags << ")";
}
} else {
Comment on lines +319 to +342
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the new ClusterMap abstraction for determining the addresses of all of the primaries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this block is probably a good candidate for a separate function

VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 10)
<< "Failed to get cluster node info for node " << node_id;
}
}

ValkeyModule_FreeClusterNodesList(node_list);
}

void MetadataManager::DelayHandleClusterMessage(
Expand Down Expand Up @@ -473,12 +531,46 @@ absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed,
}

if (trigger_callbacks) {
auto result = TriggerCallbacks(type_name, id, proposed_entry);
if (!result.ok()) {
VMSDK_LOG(WARNING, detached_ctx_.get())
<< "Failed during reconciliation callback: %s"
<< result.message().data();
return result;
// Use FT.INTERNAL_UPDATE for metadata updates

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good candidate for separate function given this function is already long

// Use RunByMain to call FT.INTERNAL_UPDATE via ValkeyModule_Call
// Use promise/future to make it synchronous for error handling
std::promise<absl::Status> promise;
auto future = promise.get_future();

vmsdk::RunByMain([this, proposed_entry, id, &promise]() {

auto thread_safe_ctx = vmsdk::MakeUniqueValkeyDetachedThreadSafeContext(detached_ctx_.get());
ValkeyModuleCtx *ctx = thread_safe_ctx.get();

coordinator::GlobalMetadataVersionHeader version_header;
version_header.set_top_level_version(metadata_.Get().version_header().top_level_version());
version_header.set_top_level_fingerprint(metadata_.Get().version_header().top_level_fingerprint());

// Serialize to binary
std::string metadata_binary, header_binary;
proposed_entry.SerializeToString(&metadata_binary);
version_header.SerializeToString(&header_binary);


ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx, "FT.INTERNAL_UPDATE", "!Kcbb",
id.c_str(),
metadata_binary.data(), metadata_binary.size(),
header_binary.data(), header_binary.size());

if (reply == nullptr || ValkeyModule_CallReplyType(reply) == VALKEYMODULE_REPLY_ERROR) {
if (reply) ValkeyModule_FreeCallReply(reply);
promise.set_value(absl::InternalError("FT.INTERNAL_UPDATE failed"));
return;
}

ValkeyModule_FreeCallReply(reply);
promise.set_value(absl::OkStatus());

});

auto status = future.get();
if (!status.ok()) {
return status;
}
}
}
Expand Down Expand Up @@ -722,4 +814,37 @@ void MetadataManager::RegisterForClusterMessages(ValkeyModuleCtx *ctx) {
ctx, coordinator::kMetadataBroadcastClusterMessageReceiverId,
MetadataManagerOnClusterMessageCallback);
}

absl::Status MetadataManager::ProcessInternalUpdate(ValkeyModuleCtx *ctx,
absl::string_view type_name,
absl::string_view id,
const coordinator::GlobalMetadataEntry *metadata_entry,
const coordinator::GlobalMetadataVersionHeader *global_version_header) {
if (metadata_entry) {
auto callback_status = TriggerCallbacks(type_name, id, *metadata_entry);
if (!callback_status.ok()) {
return callback_status;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this feels like a crash situation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes makes sense

Copy link

@otherscase otherscase Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this return not ok? If the version we are writing is older and rejected, does it still return ok?

}
}

auto result = metadata_.Get();

auto insert_result = result.mutable_type_namespace_map()->insert(
{std::string(type_name), coordinator::GlobalMetadataEntryMap()});
auto &existing_inner_map = insert_result.first->second;
auto mutable_entries = existing_inner_map.mutable_entries();
(*mutable_entries)[id] = *metadata_entry;

// Update global version header
const auto new_version = global_version_header->top_level_version();
const auto new_fingerprint = ComputeTopLevelFingerprint(result.type_namespace_map());

result.mutable_version_header()->set_top_level_version(new_version);
result.mutable_version_header()->set_top_level_fingerprint(new_fingerprint);

metadata_ = result;

return absl::OkStatus();
}

} // namespace valkey_search::coordinator
6 changes: 6 additions & 0 deletions src/coordinator/metadata_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ class MetadataManager {

absl::Status DeleteEntry(absl::string_view type_name, absl::string_view id);

absl::Status ProcessInternalUpdate(ValkeyModuleCtx *ctx,
absl::string_view type_name,
absl::string_view id,
const coordinator::GlobalMetadataEntry *metadata_entry,
const coordinator::GlobalMetadataVersionHeader *global_version_header);

std::unique_ptr<GlobalMetadata> GetGlobalMetadata();

// RegisterType is used to register a new metadata type in the metadata
Expand Down
7 changes: 7 additions & 0 deletions src/module_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ vmsdk::module::Options options = {
vmsdk::module::kAdminFlag},
.cmd_func = &vmsdk::CreateCommand<valkey_search::FTDebugCmd>,
},
{
.cmd_name = valkey_search::kInternalUpdateCommand,
.permissions =
ACLPermissionFormatter(valkey_search::kInternalUpdateCmdPermissions),
.flags = {vmsdk::module::kWriteFlag, vmsdk::module::kAdminFlag},
.cmd_func = &vmsdk::CreateCommand<valkey_search::FTInternalUpdateCmd>,
},
{
.cmd_name = valkey_search::kAggregateCommand,
.permissions = ACLPermissionFormatter(
Expand Down
Loading