-
Notifications
You must be signed in to change notification settings - Fork 54
refactor: clean up ProcessInternalUpdate to use standard callback system #521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Replace direct ProcessMetadataUpdate calls with TriggerCallbacks in ProcessInternalUpdate - Remove unnecessary ProcessMetadataUpdate wrapper method from SchemaManager - Fix protobuf parsing error in test metadata string - Update test expectations to work with TriggerCallbacks architecture This change improves code consistency by using the standard callback registration system throughout the metadata manager, rather than mixing direct method calls with the callback pattern. All 41 coordinator tests now pass. The ProcessInternalUpdate method now follows the same pattern as CreateEntry and DeleteEntry by using TriggerCallbacks to invoke registered callbacks, providing a cleaner and more maintainable architecture. Signed-off-by: Elias Tamraz <[email protected]>
allenss-amazon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this looks OK to me.
We will want to get this reviewed by the original author of the metadata handling system (Jacob @ GCP) when it's all done.
| const char *metadata_data = ValkeyModule_StringPtrLen(argv[2], &metadata_len); | ||
| coordinator::GlobalMetadataEntry metadata_entry; | ||
| if (!metadata_entry.ParseFromArray(metadata_data, metadata_len)) { | ||
| return absl::InvalidArgumentError("ERR failed to parse GlobalMetadataEntry"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This merits a log entry and some kind of error counter. Either we have a logic problem, data corruption or a bad actor. In any case, this seems like something that wants to trigger an alarm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're probably in a poison-pill situation. This is VERY bad.
| ValkeyModule_ReplicateVerbatim(ctx); | ||
|
|
||
| // Replicate ft.create only for cmd clusters, whereas ft.internal_update will be replicated for CME clusters | ||
| if (!ValkeySearch::Instance().IsCluster()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The right test for this might be !coordinator::Enabled. Not 100% sure. It certainly won't matter for us.
| size_t header_len; | ||
| const char *header_data = ValkeyModule_StringPtrLen(argv[3], &header_len); | ||
| coordinator::GlobalMetadataVersionHeader version_header; | ||
| if (!version_header.ParseFromArray(header_data, header_len)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
| auto status = coordinator::MetadataManager::Instance().ProcessInternalUpdate( | ||
| ctx, kSchemaManagerMetadataTypeName, id, &metadata_entry, &version_header); | ||
| if (!status.ok()) { | ||
| return status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably same here too.
| header_binary.data(), header_binary.size()); | ||
| if (reply) { | ||
| ValkeyModule_FreeCallReply(reply); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else crash?
|
|
||
| // Get all cluster nodes and send only to masters | ||
| size_t num_nodes; | ||
| char **node_list = ValkeyModule_GetClusterNodesList(ctx, &num_nodes); | ||
| if (node_list == nullptr) { | ||
| VMSDK_LOG(WARNING, ctx) << "Failed to get cluster nodes list"; | ||
| return; | ||
| } | ||
|
|
||
| for (size_t i = 0; i < num_nodes; i++) { | ||
| std::string node_id(node_list[i], VALKEYMODULE_NODE_ID_LEN); | ||
| int flags; | ||
| if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), nullptr, nullptr, nullptr, &flags) == VALKEYMODULE_OK) { | ||
| // Only send to master nodes, skip replicas | ||
| if (flags & VALKEYMODULE_NODE_MASTER) { | ||
| ValkeyModule_SendClusterMessage(ctx, node_id.c_str(), | ||
| kMetadataBroadcastClusterMessageReceiverId, | ||
| payload.c_str(), payload.size()); | ||
| } else { | ||
| VMSDK_LOG_EVERY_N_SEC(NOTICE, ctx, 10) | ||
| << "Skipping metadata broadcast to non-master node " << node_id | ||
| << " (flags=" << flags << ")"; | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the new ClusterMap abstraction for determining the addresses of all of the primaries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
| if (metadata_entry) { | ||
| auto callback_status = TriggerCallbacks(type_name, id, *metadata_entry); | ||
| if (!callback_status.ok()) { | ||
| return callback_status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this feels like a crash situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes makes sense
@allenss-amazon This is a draft PR, I will raise a revision with unit test and integration tests as well
changes:
Testing
Note: if you want to test the code you need to get this fix as well to fix replica crash during drop index
Fix replica crash by using legacy ACL path during replication #510
CME
echo "=== CME (Cluster Mode) Test ===" && echo "" && echo "Cluster Structure:" && /tmp/valkey/src/valkey-cli -c -p 7000 CLUSTER NODESecho "Initial Index Status:" && echo "Node 7000 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7000 FT._LIST && echo "Node 7001 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7001 FT._LIST && echo "Node 7002 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7002 FT._LIST && echo "Node 7003 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7003 FT._LIST/tmp/valkey/src/valkey-cli -c -p 7000 FT.CREATE cluster_demo SCHEMA field VECTOR HNSW 6 TYPE FLOAT32 DIM 128 DISTANCE_METRIC L2OK
/tmp/valkey/src/valkey-cli -c -p 7001 FT.DROPINDEX cluster_demo && echo "After dropping index:" && echo "Node 7000 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7000 FT._LIST && echo "Node 7001 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7001 FT._LIST && echo "Node 7002 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7002 FT._LIST && echo "Node 7003 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7003 FT._LIST6.AOF files for reference:
note
Node 7000 AOF:
Node 7001 AOF:
CMD test
echo "=== CMD (Primary-Replica) Test ===" && echo "Creating cluster_demo on primary:" && /tmp/valkey/src/valkey-cli -p 8888 FT.CREATE cluster_demo SCHEMA field VECTOR HNSW 6 TYPE FLOAT32 DIM 128 DISTANCE_METRIC L2 && echo "Listing indexes:" && echo "Primary (8888):" && /tmp/valkey/src/valkey-cli -p 8888 FT._LIST && echo "Replica (8889):" && /tmp/valkey/src/valkey-cli -p 8889 FT._LIST/tmp/valkey/src/valkey-cli -p 8888 FT.DROPINDEX cluster_demo && echo "Primary (8888):" && /tmp/valkey/src/valkey-cli -p 8888 FT._LIST && echo "Replica (8889):" && /tmp/valkey/src/valkey-cli -p 8889 FT._LISTAOF file: (same bevahiour)