-
Notifications
You must be signed in to change notification settings - Fork 55
refactor: clean up ProcessInternalUpdate to use standard callback system #521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,11 +40,14 @@ constexpr absl::string_view kListCommand{"FT._LIST"}; | |
| constexpr absl::string_view kSearchCommand{"FT.SEARCH"}; | ||
| constexpr absl::string_view kDebugCommand{"FT._DEBUG"}; | ||
| constexpr absl::string_view kAggregateCommand{"FT.AGGREGATE"}; | ||
| constexpr absl::string_view kInternalUpdateCommand{"FT.INTERNAL_UPDATE"}; | ||
|
|
||
| const absl::flat_hash_set<absl::string_view> kCreateCmdPermissions{ | ||
| kSearchCategory, kWriteCategory, kFastCategory}; | ||
| const absl::flat_hash_set<absl::string_view> kDropIndexCmdPermissions{ | ||
| kSearchCategory, kWriteCategory, kFastCategory}; | ||
| const absl::flat_hash_set<absl::string_view> kInternalUpdateCmdPermissions{ | ||
| kAdminCategory}; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this also be write and fast categories? |
||
| const absl::flat_hash_set<absl::string_view> kSearchCmdPermissions{ | ||
| kSearchCategory, kReadCategory, kSlowCategory}; | ||
| const absl::flat_hash_set<absl::string_view> kInfoCmdPermissions{ | ||
|
|
@@ -79,6 +82,8 @@ absl::Status FTDebugCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, | |
| int argc); | ||
| absl::Status FTAggregateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, | ||
| int argc); | ||
| absl::Status FTInternalUpdateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, | ||
| int argc); | ||
|
|
||
| // | ||
| // Common stuff for FT.SEARCH and FT.AGGREGATE command | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,7 +79,11 @@ absl::Status FTCreateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, | |
| } | ||
| ValkeyModule_ReplyWithSimpleString(ctx, "OK"); | ||
| } | ||
| ValkeyModule_ReplicateVerbatim(ctx); | ||
|
|
||
| // Replicate ft.create only for cmd clusters, whereas ft.internal_update will be replicated for CME clusters | ||
| if (!ValkeySearch::Instance().IsCluster()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The right test for this might be !coordinator::Enabled. Not 100% sure. It certainly won't matter for us. |
||
| ValkeyModule_ReplicateVerbatim(ctx); | ||
| } | ||
| return absl::OkStatus(); | ||
| } | ||
| } // namespace valkey_search | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Copyright (c) 2025, valkey-search contributors | ||
| * All rights reserved. | ||
| * SPDX-License-Identifier: BSD 3-Clause | ||
| * | ||
| */ | ||
|
|
||
| #include "src/commands/commands.h" | ||
| #include "src/commands/ft_create_parser.h" | ||
| #include "src/index_schema.pb.h" | ||
| #include "src/schema_manager.h" | ||
|
|
||
| namespace valkey_search { | ||
|
|
||
| constexpr int kFTInternalUpdateArgCount = 4; | ||
|
|
||
| absl::Status FTInternalUpdateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { | ||
| if (argc != kFTInternalUpdateArgCount) { | ||
| return absl::InvalidArgumentError("ERR wrong number of arguments for FT_INTERNAL_UPDATE"); | ||
| } | ||
|
|
||
| // Parse index ID | ||
| size_t id_len; | ||
| const char *id_data = ValkeyModule_StringPtrLen(argv[1], &id_len); | ||
| std::string id(id_data, id_len); | ||
|
|
||
| // Deserialize GlobalMetadataEntry | ||
| size_t metadata_len; | ||
| const char *metadata_data = ValkeyModule_StringPtrLen(argv[2], &metadata_len); | ||
| coordinator::GlobalMetadataEntry metadata_entry; | ||
| if (!metadata_entry.ParseFromArray(metadata_data, metadata_len)) { | ||
| return absl::InvalidArgumentError("ERR failed to parse GlobalMetadataEntry"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This merits a log entry and some kind of error counter. Either we have a logic problem, data corruption or a bad actor. In any case, this seems like something that wants to trigger an alarm.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're probably in a poison-pill situation. This is VERY bad. |
||
| } | ||
|
|
||
| // Deserialize GlobalMetadataVersionHeader | ||
| size_t header_len; | ||
| const char *header_data = ValkeyModule_StringPtrLen(argv[3], &header_len); | ||
| coordinator::GlobalMetadataVersionHeader version_header; | ||
| if (!version_header.ParseFromArray(header_data, header_len)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
| return absl::InvalidArgumentError("ERR failed to parse GlobalMetadataVersionHeader"); | ||
| } | ||
|
|
||
| auto status = coordinator::MetadataManager::Instance().ProcessInternalUpdate( | ||
| ctx, kSchemaManagerMetadataTypeName, id, &metadata_entry, &version_header); | ||
| if (!status.ok()) { | ||
| return status; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably same here too. |
||
| } | ||
|
|
||
| ValkeyModule_ReplicateVerbatim(ctx); | ||
|
|
||
| ValkeyModule_ReplyWithSimpleString(ctx, "OK"); | ||
| return absl::OkStatus(); | ||
| } | ||
|
|
||
| } // namespace valkey_search | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
|
|
||
| #include <algorithm> | ||
| #include <cstdint> | ||
| #include <future> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <utility> | ||
|
|
@@ -205,6 +206,23 @@ absl::StatusOr<IndexFingerprintVersion> MetadataManager::CreateEntry( | |
| metadata.version_header().top_level_version() + 1); | ||
| metadata.mutable_version_header()->set_top_level_fingerprint( | ||
| ComputeTopLevelFingerprint(metadata.type_namespace_map())); | ||
|
|
||
|
|
||
| metadata_ = metadata; | ||
|
|
||
| // Call FT.INTERNAL_UPDATE for coordinator to ensure unified AOF replication | ||
| std::string metadata_binary, header_binary; | ||
| new_entry.SerializeToString(&metadata_binary); | ||
| metadata.version_header().SerializeToString(&header_binary); | ||
|
|
||
| ValkeyModuleCallReply *reply = ValkeyModule_Call(detached_ctx_.get(), "FT.INTERNAL_UPDATE", "!Kcbb", | ||
| std::string(id).c_str(), | ||
| metadata_binary.data(), metadata_binary.size(), | ||
| header_binary.data(), header_binary.size()); | ||
| if (reply) { | ||
| ValkeyModule_FreeCallReply(reply); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. else crash? |
||
|
|
||
| BroadcastMetadata(detached_ctx_.get(), metadata.version_header()); | ||
| IndexFingerprintVersion index_fingerprint_version; | ||
| index_fingerprint_version.set_fingerprint(fingerprint); | ||
|
|
@@ -246,6 +264,20 @@ absl::Status MetadataManager::DeleteEntry(absl::string_view type_name, | |
| metadata.version_header().top_level_version() + 1); | ||
| metadata.mutable_version_header()->set_top_level_fingerprint( | ||
| ComputeTopLevelFingerprint(metadata.type_namespace_map())); | ||
|
|
||
| // Call FT.INTERNAL_UPDATE for coordinator to ensure unified AOF replication for DROP | ||
| std::string metadata_binary, header_binary; | ||
| new_entry.SerializeToString(&metadata_binary); | ||
| metadata.version_header().SerializeToString(&header_binary); | ||
|
|
||
| ValkeyModuleCallReply *reply = ValkeyModule_Call(detached_ctx_.get(), "FT.INTERNAL_UPDATE", "!Kcbb", | ||
| std::string(id).c_str(), | ||
| metadata_binary.data(), metadata_binary.size(), | ||
| header_binary.data(), header_binary.size()); | ||
| if (reply) { | ||
| ValkeyModule_FreeCallReply(reply); | ||
| } | ||
|
|
||
| BroadcastMetadata(detached_ctx_.get(), metadata.version_header()); | ||
| return absl::OkStatus(); | ||
| } | ||
|
|
@@ -284,10 +316,36 @@ void MetadataManager::BroadcastMetadata( | |
| } | ||
| std::string payload; | ||
| version_header.SerializeToString(&payload); | ||
| // Nullptr for target means broadcast to all. | ||
| ValkeyModule_SendClusterMessage(ctx, /* target= */ nullptr, | ||
| kMetadataBroadcastClusterMessageReceiverId, | ||
| payload.c_str(), payload.size()); | ||
|
|
||
| // Get all cluster nodes and send only to masters | ||
| size_t num_nodes; | ||
| char **node_list = ValkeyModule_GetClusterNodesList(ctx, &num_nodes); | ||
| if (node_list == nullptr) { | ||
| VMSDK_LOG(WARNING, ctx) << "Failed to get cluster nodes list"; | ||
| return; | ||
| } | ||
|
|
||
| for (size_t i = 0; i < num_nodes; i++) { | ||
| std::string node_id(node_list[i], VALKEYMODULE_NODE_ID_LEN); | ||
| int flags; | ||
| if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), nullptr, nullptr, nullptr, &flags) == VALKEYMODULE_OK) { | ||
| // Only send to master nodes, skip replicas | ||
| if (flags & VALKEYMODULE_NODE_MASTER) { | ||
| ValkeyModule_SendClusterMessage(ctx, node_id.c_str(), | ||
| kMetadataBroadcastClusterMessageReceiverId, | ||
| payload.c_str(), payload.size()); | ||
| } else { | ||
| VMSDK_LOG_EVERY_N_SEC(NOTICE, ctx, 10) | ||
| << "Skipping metadata broadcast to non-master node " << node_id | ||
| << " (flags=" << flags << ")"; | ||
| } | ||
| } else { | ||
|
Comment on lines
+319
to
+342
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the new ClusterMap abstraction for determining the addresses of all of the primaries.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this block is probably a good candidate for a separate function |
||
| VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 10) | ||
| << "Failed to get cluster node info for node " << node_id; | ||
| } | ||
| } | ||
|
|
||
| ValkeyModule_FreeClusterNodesList(node_list); | ||
| } | ||
|
|
||
| void MetadataManager::DelayHandleClusterMessage( | ||
|
|
@@ -473,12 +531,46 @@ absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed, | |
| } | ||
|
|
||
| if (trigger_callbacks) { | ||
| auto result = TriggerCallbacks(type_name, id, proposed_entry); | ||
| if (!result.ok()) { | ||
| VMSDK_LOG(WARNING, detached_ctx_.get()) | ||
| << "Failed during reconciliation callback: %s" | ||
| << result.message().data(); | ||
| return result; | ||
| // Use FT.INTERNAL_UPDATE for metadata updates | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably a good candidate for separate function given this function is already long |
||
| // Use RunByMain to call FT.INTERNAL_UPDATE via ValkeyModule_Call | ||
| // Use promise/future to make it synchronous for error handling | ||
| std::promise<absl::Status> promise; | ||
| auto future = promise.get_future(); | ||
|
|
||
| vmsdk::RunByMain([this, proposed_entry, id, &promise]() { | ||
|
|
||
| auto thread_safe_ctx = vmsdk::MakeUniqueValkeyDetachedThreadSafeContext(detached_ctx_.get()); | ||
| ValkeyModuleCtx *ctx = thread_safe_ctx.get(); | ||
|
|
||
| coordinator::GlobalMetadataVersionHeader version_header; | ||
| version_header.set_top_level_version(metadata_.Get().version_header().top_level_version()); | ||
| version_header.set_top_level_fingerprint(metadata_.Get().version_header().top_level_fingerprint()); | ||
|
|
||
| // Serialize to binary | ||
| std::string metadata_binary, header_binary; | ||
| proposed_entry.SerializeToString(&metadata_binary); | ||
| version_header.SerializeToString(&header_binary); | ||
|
|
||
|
|
||
| ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx, "FT.INTERNAL_UPDATE", "!Kcbb", | ||
| id.c_str(), | ||
| metadata_binary.data(), metadata_binary.size(), | ||
| header_binary.data(), header_binary.size()); | ||
|
|
||
| if (reply == nullptr || ValkeyModule_CallReplyType(reply) == VALKEYMODULE_REPLY_ERROR) { | ||
| if (reply) ValkeyModule_FreeCallReply(reply); | ||
| promise.set_value(absl::InternalError("FT.INTERNAL_UPDATE failed")); | ||
| return; | ||
| } | ||
|
|
||
| ValkeyModule_FreeCallReply(reply); | ||
| promise.set_value(absl::OkStatus()); | ||
|
|
||
| }); | ||
|
|
||
| auto status = future.get(); | ||
| if (!status.ok()) { | ||
| return status; | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -722,4 +814,37 @@ void MetadataManager::RegisterForClusterMessages(ValkeyModuleCtx *ctx) { | |
| ctx, coordinator::kMetadataBroadcastClusterMessageReceiverId, | ||
| MetadataManagerOnClusterMessageCallback); | ||
| } | ||
|
|
||
| absl::Status MetadataManager::ProcessInternalUpdate(ValkeyModuleCtx *ctx, | ||
| absl::string_view type_name, | ||
| absl::string_view id, | ||
| const coordinator::GlobalMetadataEntry *metadata_entry, | ||
| const coordinator::GlobalMetadataVersionHeader *global_version_header) { | ||
| if (metadata_entry) { | ||
| auto callback_status = TriggerCallbacks(type_name, id, *metadata_entry); | ||
| if (!callback_status.ok()) { | ||
| return callback_status; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, this feels like a crash situation.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes makes sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does this return not ok? If the version we are writing is older and rejected, does it still return ok? |
||
| } | ||
| } | ||
|
|
||
| auto result = metadata_.Get(); | ||
|
|
||
| auto insert_result = result.mutable_type_namespace_map()->insert( | ||
| {std::string(type_name), coordinator::GlobalMetadataEntryMap()}); | ||
| auto &existing_inner_map = insert_result.first->second; | ||
| auto mutable_entries = existing_inner_map.mutable_entries(); | ||
| (*mutable_entries)[id] = *metadata_entry; | ||
|
|
||
| // Update global version header | ||
| const auto new_version = global_version_header->top_level_version(); | ||
| const auto new_fingerprint = ComputeTopLevelFingerprint(result.type_namespace_map()); | ||
|
|
||
| result.mutable_version_header()->set_top_level_version(new_version); | ||
| result.mutable_version_header()->set_top_level_fingerprint(new_fingerprint); | ||
|
|
||
| metadata_ = result; | ||
|
|
||
| return absl::OkStatus(); | ||
| } | ||
|
|
||
| } // namespace valkey_search::coordinator | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be
FT._INTERNAL_UPDATEgiven it is meant to be internal/hidden. PerhapsFT._UPDATE_INDEX.