Skip to content

Conversation

@eliastam
Copy link
Collaborator

@eliastam eliastam commented Dec 5, 2025

@allenss-amazon This is a draft PR, I will raise a revision with unit test and integration tests as well

changes:

  1. created ft_internal_update with admin flag. this command will be used to replicate create/drop with global version and index metadata version and fingerprint. Customers will not be allowed to call it
  2. gRPC broadcast only happen to primaries.
  3. replicas will only receive one information via replication stream (ft_internal_update)
  4. for CMD cluster no change happen we still replicate create/ drop as is.

Testing

  1. integration test passes
  2. manual testing
    Note: if you want to test the code you need to get this fix as well to fix replica crash during drop index
    Fix replica crash by using legacy ACL path during replication #510

CME

  1. Show cluster structure
    echo "=== CME (Cluster Mode) Test ===" && echo "" && echo "Cluster Structure:" && /tmp/valkey/src/valkey-cli -c -p 7000 CLUSTER NODES
=== CME (Cluster Mode) Test ===

Cluster Structure:
f78399f13c864d6c3d3ca4f6f3c300857102d419 127.0.0.1:7000@17000 myself,master - 0 0 1 connected 0-8191
10a60c6587d8f35b2e2be47241335e40b61294b9 127.0.0.1:7002@17002 slave 4519ea74c5aab5a91d58033be7dee8eaa714669d 0 1764965420000 2 connected
4519ea74c5aab5a91d58033be7dee8eaa714669d 127.0.0.1:7001@17001 master - 0 1764965420214 2 connected 8192-16383
8b37362676cd02c47eca8b5eb6ae46467e0d5614 127.0.0.1:7003@17003 slave f78399f13c864d6c3d3ca4f6f3c300857102d419 0 1764965421220 1 connected
  1. show cluster initial status

echo "Initial Index Status:" && echo "Node 7000 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7000 FT._LIST && echo "Node 7001 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7001 FT._LIST && echo "Node 7002 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7002 FT._LIST && echo "Node 7003 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7003 FT._LIST

Initial Index Status:
Node 7000 (primary):

Node 7001 (primary):

Node 7002 (replica):

Node 7003 (replica):

  1. Creating an index in primay 1 (7000)
    /tmp/valkey/src/valkey-cli -c -p 7000 FT.CREATE cluster_demo SCHEMA field VECTOR HNSW 6 TYPE FLOAT32 DIM 128 DISTANCE_METRIC L2

OK

  1. FT.List after creating:
After creating index on node 7000:
Node 7000 (primary):
cluster_demo
Node 7001 (primary):
cluster_demo
Node 7002 (replica):
cluster_demo
Node 7003 (replica):
cluster_demo
  1. drop from the other primary (7001) and list

/tmp/valkey/src/valkey-cli -c -p 7001 FT.DROPINDEX cluster_demo && echo "After dropping index:" && echo "Node 7000 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7000 FT._LIST && echo "Node 7001 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7001 FT._LIST && echo "Node 7002 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7002 FT._LIST && echo "Node 7003 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7003 FT._LIST

OK
After dropping index:
Node 7000 (primary):

Node 7001 (primary):

Node 7002 (replica):

Node 7003 (replica):y

6.AOF files for reference:
note

Node 7000 AOF:

*2
$6
SELECT
$1
0
*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$137
�������"z
8type.googleapis.com/valkey_search.data_model.IndexSchema>

cluster_demo-�?2%
fieldfield                                                                                                                                                 � (�P�
@
$13                                                                                                                                                        ���������
*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$2
$13                                                                                                                                                        ���������

Node 7001 AOF:

*2
$6
SELECT
$1
0
*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$137
�������"z
8type.googleapis.com/valkey_search.data_model.IndexSchema>

cluster_demo-�?2%
fieldfield                                                                                                                                                 � (�P�
@
$0

*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$2
$12                                                                                                                                                        ������֩=

CMD test

  1. create and list
    echo "=== CMD (Primary-Replica) Test ===" && echo "Creating cluster_demo on primary:" && /tmp/valkey/src/valkey-cli -p 8888 FT.CREATE cluster_demo SCHEMA field VECTOR HNSW 6 TYPE FLOAT32 DIM 128 DISTANCE_METRIC L2 && echo "Listing indexes:" && echo "Primary (8888):" && /tmp/valkey/src/valkey-cli -p 8888 FT._LIST && echo "Replica (8889):" && /tmp/valkey/src/valkey-cli -p 8889 FT._LIST

=== CMD (Primary-Replica) Test ===
Creating cluster_demo on primary:
OK
Listing indexes:
Primary (8888):
cluster_demo
Replica (8889):
cluster_demo
  1. drop

/tmp/valkey/src/valkey-cli -p 8888 FT.DROPINDEX cluster_demo && echo "Primary (8888):" && /tmp/valkey/src/valkey-cli -p 8888 FT._LIST && echo "Replica (8889):" && /tmp/valkey/src/valkey-cli -p 8889 FT._LIST

OK
Primary (8888):

Replica (8889):

AOF file: (same bevahiour)

*2
$6
SELECT
$1
0
*13
$9
FT.CREATE
$10
test_clean
$6
SCHEMA
$5
field
$6
VECTOR
$4
HNSW
$1
6
$4
TYPE
$7
FLOAT32
$3
DIM
$3
128
$15
DISTANCE_METRIC
$2
L2
*2
$12
FT.DROPINDEX
$10
test_clean

- Replace direct ProcessMetadataUpdate calls with TriggerCallbacks in ProcessInternalUpdate
- Remove unnecessary ProcessMetadataUpdate wrapper method from SchemaManager
- Fix protobuf parsing error in test metadata string
- Update test expectations to work with TriggerCallbacks architecture

This change improves code consistency by using the standard callback registration
system throughout the metadata manager, rather than mixing direct method calls
with the callback pattern. All 41 coordinator tests now pass.

The ProcessInternalUpdate method now follows the same pattern as CreateEntry
and DeleteEntry by using TriggerCallbacks to invoke registered callbacks,
providing a cleaner and more maintainable architecture.

Signed-off-by: Elias Tamraz <[email protected]>
Copy link
Member

@allenss-amazon allenss-amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks OK to me.

We will want to get this reviewed by the original author of the metadata handling system (Jacob @ GCP) when it's all done.

const char *metadata_data = ValkeyModule_StringPtrLen(argv[2], &metadata_len);
coordinator::GlobalMetadataEntry metadata_entry;
if (!metadata_entry.ParseFromArray(metadata_data, metadata_len)) {
return absl::InvalidArgumentError("ERR failed to parse GlobalMetadataEntry");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This merits a log entry and some kind of error counter. Either we have a logic problem, data corruption or a bad actor. In any case, this seems like something that wants to trigger an alarm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably in a poison-pill situation. This is VERY bad.

ValkeyModule_ReplicateVerbatim(ctx);

// Replicate ft.create only for cmd clusters, whereas ft.internal_update will be replicated for CME clusters
if (!ValkeySearch::Instance().IsCluster()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right test for this might be !coordinator::Enabled. Not 100% sure. It certainly won't matter for us.

size_t header_len;
const char *header_data = ValkeyModule_StringPtrLen(argv[3], &header_len);
coordinator::GlobalMetadataVersionHeader version_header;
if (!version_header.ParseFromArray(header_data, header_len)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

auto status = coordinator::MetadataManager::Instance().ProcessInternalUpdate(
ctx, kSchemaManagerMetadataTypeName, id, &metadata_entry, &version_header);
if (!status.ok()) {
return status;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably same here too.

header_binary.data(), header_binary.size());
if (reply) {
ValkeyModule_FreeCallReply(reply);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else crash?

Comment on lines +319 to +342

// Get all cluster nodes and send only to masters
size_t num_nodes;
char **node_list = ValkeyModule_GetClusterNodesList(ctx, &num_nodes);
if (node_list == nullptr) {
VMSDK_LOG(WARNING, ctx) << "Failed to get cluster nodes list";
return;
}

for (size_t i = 0; i < num_nodes; i++) {
std::string node_id(node_list[i], VALKEYMODULE_NODE_ID_LEN);
int flags;
if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), nullptr, nullptr, nullptr, &flags) == VALKEYMODULE_OK) {
// Only send to master nodes, skip replicas
if (flags & VALKEYMODULE_NODE_MASTER) {
ValkeyModule_SendClusterMessage(ctx, node_id.c_str(),
kMetadataBroadcastClusterMessageReceiverId,
payload.c_str(), payload.size());
} else {
VMSDK_LOG_EVERY_N_SEC(NOTICE, ctx, 10)
<< "Skipping metadata broadcast to non-master node " << node_id
<< " (flags=" << flags << ")";
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the new ClusterMap abstraction for determining the addresses of all of the primaries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

if (metadata_entry) {
auto callback_status = TriggerCallbacks(type_name, id, *metadata_entry);
if (!callback_status.ok()) {
return callback_status;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this feels like a crash situation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes makes sense

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants