diff --git a/src/db/collection.cc b/src/db/collection.cc index d88efb8e2..b25d18bcc 100644 --- a/src/db/collection.cc +++ b/src/db/collection.cc @@ -198,6 +198,13 @@ class CollectionImpl : public Collection { std::vector build_drop_scalar_index_task( const std::vector &segments, const std::string &column); + std::vector build_create_fts_index_task( + const std::vector &segments, const std::string &column, + const IndexParams::Ptr &index_params); + + std::vector build_drop_fts_index_task( + const std::vector &segments, const std::string &column); + Status execute_tasks(std::vector &tasks) const; private: @@ -471,6 +478,18 @@ Status CollectionImpl::CreateIndex(const std::string &column_name, return Status::OK(); } + // Reject creating a non-vector index when the column already has a different + // non-vector index type (e.g. adding FTS when INVERT exists, or vice versa). + if (!field->is_vector_field() && field->index_params() != nullptr && + field->index_params()->type() != index_params->type()) { + return Status::NotSupported( + "CreateIndex: column[", column_name, "] already has index type [", + IndexTypeCodeBook::AsString(field->index_params()->type()), + "], cannot create index type [", + IndexTypeCodeBook::AsString(index_params->type()), + "] on the same column"); + } + // forbidden writing until index is ready std::lock_guard write_lock(write_mtx_); @@ -533,6 +552,9 @@ Status CollectionImpl::CreateIndex(const std::string &column_name, } else if (index_params->type() == IndexType::INVERT) { tasks = build_create_scalar_index_task(persist_segments, column_name, index_params, options.concurrency_); + } else if (index_params->type() == IndexType::FTS) { + tasks = build_create_fts_index_task(persist_segments, column_name, + index_params); } else { return Status::NotSupported( "CreateIndex: index type [", @@ -570,6 +592,10 @@ Status CollectionImpl::CreateIndex(const std::string &column_name, auto create_index_task = std::get(task_info); s = new_version.update_persisted_segment_meta( create_index_task.output_segment_meta_); + } else if (std::holds_alternative(task_info)) { + auto fts_task = std::get(task_info); + s = new_version.update_persisted_segment_meta( + fts_task.output_segment_meta_); } CHECK_RETURN_STATUS(s); } @@ -597,6 +623,11 @@ Status CollectionImpl::CreateIndex(const std::string &column_name, s = create_index_task.input_segment_->reload_scalar_index( *new_schema, create_index_task.output_segment_meta_, create_index_task.output_scalar_indexer_); + } else if (std::holds_alternative(task_info)) { + auto fts_task = std::get(task_info); + s = fts_task.input_segment_->reload_fts_index( + *new_schema, fts_task.output_segment_meta_, + fts_task.output_fts_indexer_); } CHECK_RETURN_STATUS(s); } @@ -630,6 +661,27 @@ std::vector CollectionImpl::build_create_scalar_index_task( return tasks; } +std::vector CollectionImpl::build_create_fts_index_task( + const std::vector &segments, const std::string &column, + const IndexParams::Ptr &index_params) { + std::vector tasks; + for (auto &segment : segments) { + tasks.push_back(SegmentTask::CreateCreateFtsIndexTask( + CreateFtsIndexTask{segment, column, index_params})); + } + return tasks; +} + +std::vector CollectionImpl::build_drop_fts_index_task( + const std::vector &segments, const std::string &column) { + std::vector tasks; + for (auto &segment : segments) { + tasks.push_back( + SegmentTask::CreateDropFtsIndexTask(DropFtsIndexTask{segment, column})); + } + return tasks; +} + Status CollectionImpl::execute_tasks( std::vector &tasks) const { Status s; @@ -723,6 +775,8 @@ Status CollectionImpl::DropIndex(const std::string &column_name) { tasks = build_drop_vector_index_task(persist_segments, column_name); } else if (field->index_params()->type() == IndexType::INVERT) { tasks = build_drop_scalar_index_task(persist_segments, column_name); + } else if (field->index_params()->type() == IndexType::FTS) { + tasks = build_drop_fts_index_task(persist_segments, column_name); } else { return Status::NotSupported( "DropIndex: index type [", @@ -760,6 +814,10 @@ Status CollectionImpl::DropIndex(const std::string &column_name) { auto drop_index_task = std::get(task_info); s = new_version.update_persisted_segment_meta( drop_index_task.output_segment_meta_); + } else if (std::holds_alternative(task_info)) { + auto fts_task = std::get(task_info); + s = new_version.update_persisted_segment_meta( + fts_task.output_segment_meta_); } CHECK_RETURN_STATUS(s); } @@ -785,6 +843,11 @@ Status CollectionImpl::DropIndex(const std::string &column_name) { s = drop_index_task.input_segment_->reload_scalar_index( *new_schema, drop_index_task.output_segment_meta_, drop_index_task.output_scalar_indexer_); + } else if (std::holds_alternative(task_info)) { + auto fts_task = std::get(task_info); + s = fts_task.input_segment_->reload_fts_index( + *new_schema, fts_task.output_segment_meta_, + fts_task.output_fts_indexer_); } CHECK_RETURN_STATUS(s); } diff --git a/src/db/common/file_helper.h b/src/db/common/file_helper.h index c983f4a86..e5c8f6cf8 100644 --- a/src/db/common/file_helper.h +++ b/src/db/common/file_helper.h @@ -139,14 +139,19 @@ class FileHelper { ailego::StringHelper::Concat("scalar.index.", block_id, ".rocksdb")); } - // e.g.: **/seg1/fts.rocksdb + // e.g.: **/seg1/fts.0.rocksdb static const std::string MakeFtsIndexPath(const std::string &path, - uint32_t seg_id) { - return ailego::FileHelper::PathJoin(path, seg_id, "fts.rocksdb"); + uint32_t seg_id, + uint32_t block_id) { + return ailego::FileHelper::PathJoin( + path, seg_id, + ailego::StringHelper::Concat("fts.", block_id, ".rocksdb")); } - static const std::string MakeFtsIndexPath(const std::string &seg_path) { - return ailego::FileHelper::PathJoin(seg_path, "fts.rocksdb"); + static const std::string MakeFtsIndexPath(const std::string &seg_path, + uint32_t block_id) { + return ailego::FileHelper::PathJoin( + seg_path, ailego::StringHelper::Concat("fts.", block_id, ".rocksdb")); } static const std::string MakeVectorIndexPath(const std::string &path, diff --git a/src/db/index/column/fts_column/fts_indexer.cc b/src/db/index/column/fts_column/fts_indexer.cc new file mode 100644 index 000000000..e71776acd --- /dev/null +++ b/src/db/index/column/fts_column/fts_indexer.cc @@ -0,0 +1,329 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "fts_indexer.h" +#include +#include "db/common/constants.h" +#include "db/common/file_helper.h" +#include "fts_rocksdb_merge.h" +#include "fts_utils.h" + +namespace zvec { + +FtsIndexer::~FtsIndexer() { + close(); +} + +FtsIndexer::Ptr FtsIndexer::CreateAndOpen(const std::string &working_dir, + const FieldSchemaPtrList &fts_fields, + bool create, bool read_only) { + auto indexer = std::make_shared(working_dir); + auto s = indexer->open(fts_fields, create, read_only); + if (!s.ok()) { + return nullptr; + } + return indexer; +} + +Status FtsIndexer::open(const FieldSchemaPtrList &fts_fields, bool create, + bool read_only) { + if (fts_fields.empty()) { + return Status::OK(); + } + + // Collect CF names and per-CF merge operators. + std::vector cf_names; + std::unordered_map> + per_cf_merge_ops; + + for (const auto &field : fts_fields) { + const auto &name = field->name(); + cf_names.push_back(name); + cf_names.push_back(name + kFtsPositionsSuffix); + + per_cf_merge_ops[name] = std::make_shared(); + per_cf_merge_ops[name + kFtsMaxTfSuffix] = + std::make_shared(); + + if (create) { + cf_names.push_back(name + kFtsTfSuffix); + cf_names.push_back(name + kFtsMaxTfSuffix); + cf_names.push_back(name + kFtsDocLenSuffix); + } + } + cf_names.push_back(kFtsStatCfName); + + fts_ctx_ = std::make_shared(); + Status s; + + if (create) { + s = fts_ctx_->create(RocksdbContext::Args{working_dir_, cf_names, nullptr, + per_cf_merge_ops, true}); + } else { + // Auto-discover existing CFs via ListColumnFamilies (empty column_names). + s = fts_ctx_->open( + RocksdbContext::Args{working_dir_, {}, nullptr, per_cf_merge_ops, true}, + read_only); + } + if (!s.ok()) { + LOG_ERROR("FtsIndexer: failed to %s RocksDB at [%s]: %s", + create ? "create" : "open", working_dir_.c_str(), + s.message().c_str()); + fts_ctx_.reset(); + return s; + } + + auto *stat_cf = fts_ctx_->get_cf(kFtsStatCfName); + + for (const auto &field : fts_fields) { + const auto &name = field->name(); + auto *postings_cf = fts_ctx_->get_cf(name); + auto *positions_cf = fts_ctx_->get_cf(name + kFtsPositionsSuffix); + auto *term_freq_cf = fts_ctx_->get_cf(name + kFtsTfSuffix); + auto *max_tf_cf = fts_ctx_->get_cf(name + kFtsMaxTfSuffix); + auto *doc_len_cf = fts_ctx_->get_cf(name + kFtsDocLenSuffix); + + auto indexer = std::make_shared(); + auto ret = indexer->open(field, fts_ctx_.get(), postings_cf, positions_cf, + term_freq_cf, max_tf_cf, doc_len_cf, stat_cf); + if (!ret.has_value()) { + LOG_ERROR("FtsIndexer: FtsColumnIndexer::open failed for field[%s]: %s", + name.c_str(), ret.error().message().c_str()); + return Status::InternalError("FtsIndexer: open field failed: ", name, " ", + ret.error().message()); + } + + indexers_[name] = indexer; + } + + return Status::OK(); +} + +fts::FtsColumnIndexerPtr FtsIndexer::get(const std::string &field_name) const { + auto it = indexers_.find(field_name); + return it != indexers_.end() ? it->second : nullptr; +} + +Status FtsIndexer::flush() { + for (const auto &[name, indexer] : indexers_) { + auto ret = indexer->flush(); + if (!ret.has_value()) { + return Status::InternalError("FtsIndexer: flush failed: ", name, " ", + ret.error().message()); + } + } + if (fts_ctx_) { + auto s = fts_ctx_->flush(); + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + +Status FtsIndexer::close() { + indexers_.clear(); + if (fts_ctx_) { + auto s = fts_ctx_->close(); + fts_ctx_.reset(); + return s; + } + return Status::OK(); +} + +Status FtsIndexer::create_snapshot(const std::string &snapshot_path) { + auto s = flush(); + if (!s.ok()) { + LOG_ERROR("FtsIndexer: flush failed during snapshot"); + return s; + } + s = fts_ctx_->create_checkpoint(snapshot_path); + if (!s.ok()) { + LOG_ERROR("FtsIndexer: create_checkpoint to [%s] failed: %s", + snapshot_path.c_str(), s.message().c_str()); + } + return s; +} + +Status FtsIndexer::create_field_indexer(const FieldSchema &field) { + if (field.index_type() != IndexType::FTS) { + return Status::InvalidArgument( + "FtsIndexer::create_field_indexer: not FTS field"); + } + + const auto &name = field.name(); + if (indexers_.find(name) != indexers_.end()) { + return Status::InvalidArgument( + "FtsIndexer::create_field_indexer: field already exists: ", name); + } + + // Register merge operators before creating CFs. + fts_ctx_->per_cf_merge_ops_[name] = std::make_shared(); + fts_ctx_->per_cf_merge_ops_[name + kFtsMaxTfSuffix] = + std::make_shared(); + + // Create all CFs for this field. + std::vector cf_names = { + name, name + kFtsPositionsSuffix, name + kFtsTfSuffix, + name + kFtsMaxTfSuffix, name + kFtsDocLenSuffix}; + + for (const auto &cf_name : cf_names) { + auto s = fts_ctx_->create_cf(cf_name); + if (!s.ok()) { + LOG_ERROR("FtsIndexer::create_field_indexer: create_cf[%s] failed: %s", + cf_name.c_str(), s.message().c_str()); + return s; + } + } + + // Open FtsColumnIndexer on the new CFs. + auto *postings_cf = fts_ctx_->get_cf(name); + auto *positions_cf = fts_ctx_->get_cf(name + kFtsPositionsSuffix); + auto *term_freq_cf = fts_ctx_->get_cf(name + kFtsTfSuffix); + auto *max_tf_cf = fts_ctx_->get_cf(name + kFtsMaxTfSuffix); + auto *doc_len_cf = fts_ctx_->get_cf(name + kFtsDocLenSuffix); + auto *stat_cf = fts_ctx_->get_cf(kFtsStatCfName); + + auto field_schema = std::make_shared(field); + auto indexer = std::make_shared(); + auto ret = + indexer->open(field_schema, fts_ctx_.get(), postings_cf, positions_cf, + term_freq_cf, max_tf_cf, doc_len_cf, stat_cf); + if (!ret.has_value()) { + LOG_ERROR( + "FtsIndexer::create_field_indexer: FtsColumnIndexer::open failed: %s", + ret.error().message().c_str()); + return Status::InternalError("FtsIndexer::create_field_indexer: open: ", + ret.error().message()); + } + + indexers_[name] = indexer; + return Status::OK(); +} + +Status FtsIndexer::remove_field_indexer(const std::string &field_name) { + auto it = indexers_.find(field_name); + if (it != indexers_.end()) { + indexers_.erase(it); + } + + // Drop all CFs belonging to this field. + fts_ctx_->drop_cf(field_name); + fts_ctx_->drop_cf(field_name + kFtsPositionsSuffix); + fts_ctx_->drop_cf(field_name + kFtsTfSuffix); + fts_ctx_->drop_cf(field_name + kFtsMaxTfSuffix); + fts_ctx_->drop_cf(field_name + kFtsDocLenSuffix); + + // Remove per-field stat keys. + auto *stat_cf = fts_ctx_->get_cf(kFtsStatCfName); + if (stat_cf) { + auto rs = fts_ctx_->db_->Delete(fts_ctx_->write_opts_, stat_cf, + fts::make_total_docs_key(field_name)); + if (!rs.ok()) { + LOG_ERROR( + "FtsIndexer::remove_field_indexer: delete total_docs key " + "failed for field[%s]: %s", + field_name.c_str(), rs.ToString().c_str()); + return Status::InternalError("delete total_docs key failed"); + } + rs = fts_ctx_->db_->Delete(fts_ctx_->write_opts_, stat_cf, + fts::make_total_tokens_key(field_name)); + if (!rs.ok()) { + LOG_ERROR( + "FtsIndexer::remove_field_indexer: delete total_tokens key " + "failed for field[%s]: %s", + field_name.c_str(), rs.ToString().c_str()); + return Status::InternalError("delete total_tokens key failed"); + } + } + + return Status::OK(); +} + +Status FtsIndexer::insert(const std::string &field_name, uint32_t seg_doc_id, + const std::string &text) { + auto it = indexers_.find(field_name); + if (it == indexers_.end()) { + return Status::NotFound("FtsIndexer::insert: field not found: ", + field_name); + } + auto ret = it->second->insert(seg_doc_id, text); + if (!ret.has_value()) { + return Status::InternalError("FtsIndexer::insert failed: ", field_name, " ", + ret.error().message()); + } + return Status::OK(); +} + +Status FtsIndexer::seal(const std::string &field_name) { + auto it = indexers_.find(field_name); + if (it == indexers_.end()) { + return Status::NotFound("FtsIndexer::seal: field not found: ", field_name); + } + + auto &indexer = it->second; + + auto ret = indexer->flush(); + if (!ret.has_value()) { + return Status::InternalError("FtsIndexer::seal flush failed: ", field_name, + " ", ret.error().message()); + } + + ret = indexer->convert_postings_to_bitpacked(); + if (!ret.has_value()) { + return Status::InternalError("FtsIndexer::seal convert failed: ", + field_name, " ", ret.error().message()); + } + + indexer->reset_side_cfs(); + fts_ctx_->drop_cf(field_name + kFtsTfSuffix); + fts_ctx_->drop_cf(field_name + kFtsMaxTfSuffix); + fts_ctx_->drop_cf(field_name + kFtsDocLenSuffix); + + return Status::OK(); +} + +Status FtsIndexer::seal_all() { + // Flush all indexers first. + for (const auto &[name, indexer] : indexers_) { + auto ret = indexer->flush(); + if (!ret.has_value()) { + return Status::InternalError("FtsIndexer::seal_all flush failed: ", name, + " ", ret.error().message()); + } + } + + // Convert all postings to bitpacked format. + for (const auto &[name, indexer] : indexers_) { + auto ret = indexer->convert_postings_to_bitpacked(); + if (!ret.has_value()) { + return Status::InternalError("FtsIndexer::seal_all convert failed: ", + name, " ", ret.error().message()); + } + } + + // Reset side CFs and drop them. + for (const auto &[name, indexer] : indexers_) { + indexer->reset_side_cfs(); + } + for (const auto &[name, _] : indexers_) { + fts_ctx_->drop_cf(name + kFtsTfSuffix); + fts_ctx_->drop_cf(name + kFtsMaxTfSuffix); + fts_ctx_->drop_cf(name + kFtsDocLenSuffix); + } + + return Status::OK(); +} + +} // namespace zvec diff --git a/src/db/index/column/fts_column/fts_indexer.h b/src/db/index/column/fts_column/fts_indexer.h new file mode 100644 index 000000000..f68797a5a --- /dev/null +++ b/src/db/index/column/fts_column/fts_indexer.h @@ -0,0 +1,93 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include "db/common/rocksdb_context.h" +#include "fts_column_indexer.h" + +namespace zvec { + +// Manages the per-segment FTS RocksDB instance and all FtsColumnIndexers. +// Analogous to InvertedIndexer for scalar invert indexes. +class FtsIndexer { + public: + using Ptr = std::shared_ptr; + + FtsIndexer(const std::string &working_dir) : working_dir_(working_dir) {} + + ~FtsIndexer(); + + // Create a new fts.rocksdb or open an existing one. Returns nullptr on + // failure. + static Ptr CreateAndOpen(const std::string &working_dir, + const FieldSchemaPtrList &fts_fields, bool create, + bool read_only = false); + + // Get FtsColumnIndexer by field name. + fts::FtsColumnIndexerPtr get(const std::string &field_name) const; + + // Flush all indexers and the underlying RocksDB. + Status flush(); + + // Close all indexers and the underlying RocksDB. + Status close(); + + // Create a RocksDB checkpoint (hard-link snapshot) to snapshot_path. + Status create_snapshot(const std::string &snapshot_path); + + // Add a new FTS field's CFs and open its FtsColumnIndexer. + Status create_field_indexer(const FieldSchema &field); + + // Remove a field's CFs and stat keys from the RocksDB. + Status remove_field_indexer(const std::string &field_name); + + // Insert a document's text into a specific field's indexer. + Status insert(const std::string &field_name, uint32_t seg_doc_id, + const std::string &text); + + // Seal a single field: flush + convert_postings_to_bitpacked + drop side CFs. + Status seal(const std::string &field_name); + + // Seal all fields (used by dump path). + Status seal_all(); + + const std::string &working_dir() const { + return working_dir_; + } + + bool empty() const { + return indexers_.empty(); + } + + bool has_field(const std::string &field_name) const { + return indexers_.find(field_name) != indexers_.end(); + } + + private: + Status open(const FieldSchemaPtrList &fts_fields, bool create, + bool read_only); + + std::string working_dir_; + std::shared_ptr fts_ctx_; + std::unordered_map indexers_; +}; + +} // namespace zvec diff --git a/src/db/index/common/meta.h b/src/db/index/common/meta.h index ae0ba3df8..272e831ef 100644 --- a/src/db/index/common/meta.h +++ b/src/db/index/common/meta.h @@ -246,6 +246,16 @@ class SegmentMeta { persisted_blocks_ = new_persisted_blocks; } + void remove_fts_index_block() { + std::vector new_persisted_blocks; + for (auto &b : persisted_blocks_) { + if (b.type() != BlockType::FTS_INDEX) { + new_persisted_blocks.push_back(b); + } + } + persisted_blocks_ = new_persisted_blocks; + } + void set_writing_forward_block(const BlockMeta &writing_forward_block) { writing_forward_block_ = writing_forward_block; } diff --git a/src/db/index/common/schema.cc b/src/db/index/common/schema.cc index dc550194c..f08b02320 100644 --- a/src/db/index/common/schema.cc +++ b/src/db/index/common/schema.cc @@ -220,13 +220,17 @@ Status FieldSchema::validate() const { if (index_params_) { if (index_params_->is_vector_index_type()) { return Status::InvalidArgument( - "schema validate failed: scalar_field's index_params only support " - "INVERT " - "index, " - "but field[", - name_, "]'s index_type is ", + "schema validate failed: scalar field[", name_, + "] does not support vector index params, but got index_type ", IndexTypeCodeBook::AsString(index_params_->type())); } + if (index_params_->type() == IndexType::FTS && + data_type_ != DataType::STRING) { + return Status::InvalidArgument( + "schema validate failed: FTS index only supports STRING data type, " + "but field[", + name_, "]'s data_type is ", DataTypeCodeBook::AsString(data_type_)); + } } } return Status::OK(); @@ -631,4 +635,4 @@ bool CollectionSchema::has_index(const std::string &column) const { return false; } -} // namespace zvec \ No newline at end of file +} // namespace zvec diff --git a/src/db/index/common/type_helper.h b/src/db/index/common/type_helper.h index 0dc56055f..ca03f9f15 100644 --- a/src/db/index/common/type_helper.h +++ b/src/db/index/common/type_helper.h @@ -503,6 +503,9 @@ struct BlockTypeCodeBook { case proto::BlockType::BT_VECTOR_INDEX_QUANTIZE: block_types = BlockType::VECTOR_INDEX_QUANTIZE; break; + case proto::BlockType::BT_FTS_INDEX: + block_types = BlockType::FTS_INDEX; + break; default: break; } @@ -524,6 +527,9 @@ struct BlockTypeCodeBook { case BlockType::VECTOR_INDEX_QUANTIZE: block_types = proto::BlockType::BT_VECTOR_INDEX_QUANTIZE; break; + case BlockType::FTS_INDEX: + block_types = proto::BlockType::BT_FTS_INDEX; + break; default: break; } @@ -541,6 +547,8 @@ struct BlockTypeCodeBook { return "VECTOR_INDEX"; case BlockType::VECTOR_INDEX_QUANTIZE: return "VECTOR_INDEX_QUANTIZE"; + case BlockType::FTS_INDEX: + return "FTS_INDEX"; default: return "UNDEFINED"; } diff --git a/src/db/index/segment/segment.cc b/src/db/index/segment/segment.cc index 42752f587..2e334c3fc 100644 --- a/src/db/index/segment/segment.cc +++ b/src/db/index/segment/segment.cc @@ -46,8 +46,7 @@ #include "db/common/global_resource.h" #include "db/common/typedef.h" #include "db/index/column/fts_column/fts_column_indexer.h" -#include "db/index/column/fts_column/fts_rocksdb_merge.h" -#include "db/index/column/fts_column/fts_types.h" +#include "db/index/column/fts_column/fts_indexer.h" #include "db/index/column/inverted_column/inverted_indexer.h" #include "db/index/column/vector_column/vector_column_indexer.h" #include "db/index/column/vector_column/vector_column_params.h" @@ -213,6 +212,19 @@ class SegmentImpl : public Segment, const CollectionSchema &schema, const SegmentMeta::Ptr &segment_meta, const InvertedIndexer::Ptr &scalar_indexer) override; + Status create_fts_index(const std::string &column, + const IndexParams::Ptr &index_params, + SegmentMeta::Ptr *new_segment_meta, + FtsIndexer::Ptr *output_fts_indexer) override; + + Status drop_fts_index(const std::string &column, + SegmentMeta::Ptr *new_segment_meta, + FtsIndexer::Ptr *output_fts_indexer) override; + + Status reload_fts_index(const CollectionSchema &schema, + const SegmentMeta::Ptr &segment_meta, + const FtsIndexer::Ptr &new_fts_indexer) override; + Status dump() override; Status flush() override; @@ -336,8 +348,7 @@ class SegmentImpl : public Segment, InvertedIndexer::Ptr invert_indexers_; // FTS index (uses segment-local doc ID) - std::shared_ptr fts_ctx_; - std::unordered_map fts_indexers_; + FtsIndexer::Ptr fts_indexer_; bool has_fts_{false}; // vector index (uses block-local doc ID, each indexer starts from 0) @@ -1958,10 +1969,17 @@ Status SegmentImpl::create_scalar_index(const std::vector &columns, return Status::InvalidArgument("Invalid column name"); } - if (field->index_params() != nullptr && - *field->index_params() == *index_params) { - // if already indexed, just skip it - continue; + if (field->index_params() != nullptr) { + if (*field->index_params() == *index_params) { + // if already indexed with same params, just skip it + continue; + } + if (field->index_params()->type() != index_params->type()) { + return Status::InvalidArgument( + "create_scalar_index: field[", column, "] already has index type ", + IndexTypeCodeBook::AsString(field->index_params()->type())); + } + // same type but different params — will rebuild below } auto new_field = std::make_shared(*field); @@ -4467,7 +4485,7 @@ Result Segment::Open(const std::string &path, } //////////////////////////////////////////////////////////////////////////////////// -// FTS integration +// FTS integration (delegated to FtsIndexer) //////////////////////////////////////////////////////////////////////////////////// Status SegmentImpl::open_fts_indexers(bool create) { @@ -4475,115 +4493,47 @@ Status SegmentImpl::open_fts_indexers(bool create) { return Status::OK(); } - auto fts_fields = collection_schema_->fts_fields(); - has_fts_ = true; - - auto fts_path = FileHelper::MakeFtsIndexPath(seg_path_); - - // Collect CF names and per-CF merge operators - std::vector cf_names; - std::unordered_map> - per_cf_merge_ops; - - for (const auto &field : fts_fields) { - const auto &name = field->name(); - cf_names.push_back(name); // postings - cf_names.push_back(name + kFtsPositionsSuffix); // positions - - per_cf_merge_ops[name] = std::make_shared(); - per_cf_merge_ops[name + kFtsMaxTfSuffix] = - std::make_shared(); - - // Side CFs (_tf / _max_tf / _doc_len) are present in mutable segments - // that have not yet been dumped. After dump, - // convert_postings_to_bitpacked() inlines their payloads into BitPacked - // postings and the CFs are dropped. When opening (!create), we pass - // empty column_names so RocksdbContext::open auto-discovers existing CFs - // via ListColumnFamilies — side CFs are included only when present. - if (create) { - cf_names.push_back(name + kFtsTfSuffix); - cf_names.push_back(name + kFtsMaxTfSuffix); - cf_names.push_back(name + kFtsDocLenSuffix); - } - } - cf_names.push_back(kFtsStatCfName); - - fts_ctx_ = std::make_shared(); - Status s; - - bool enable_hash_skiplist = true; if (create) { - s = fts_ctx_->create(RocksdbContext::Args{ - fts_path, cf_names, nullptr, per_cf_merge_ops, enable_hash_skiplist}); + auto block_id = allocate_block_id(); + auto fts_path = FileHelper::MakeFtsIndexPath(seg_path_, block_id); + fts_indexer_ = FtsIndexer::CreateAndOpen( + fts_path, collection_schema_->fts_fields(), true); + if (!fts_indexer_) { + return Status::InternalError("open_fts_indexers: create failed at [", + fts_path, "]"); + } + segment_meta_->add_persisted_block( + BlockMeta{block_id, BlockType::FTS_INDEX, 0, 0, 0, {}}); } else { - // Auto-discover existing CFs via ListColumnFamilies (empty column_names). - // per_cf_merge_ops covers both base and side CFs; entries for CFs that - // were dropped after dump are harmlessly ignored. - s = fts_ctx_->open( - RocksdbContext::Args{ - fts_path, {}, nullptr, per_cf_merge_ops, enable_hash_skiplist}, - options_.read_only_); - } - if (!s.ok()) { - LOG_ERROR("open_fts_indexers: failed to %s FTS RocksDB at [%s]: %s", - create ? "create" : "open", fts_path.c_str(), - s.message().c_str()); - return s; - } - - auto *stat_cf = fts_ctx_->get_cf(kFtsStatCfName); - - for (const auto &field : fts_fields) { - const auto &name = field->name(); - auto *postings_cf = fts_ctx_->get_cf(name); - auto *positions_cf = fts_ctx_->get_cf(name + kFtsPositionsSuffix); - // Side CF handles are non-null when the segment has not been dumped - // (side CFs still exist). For dumped immutable segments get_cf returns - // nullptr and FtsColumnIndexer falls back to BitPacked inline payloads - // or tf=1/doc_len=1 defaults. - auto *term_freq_cf = fts_ctx_->get_cf(name + kFtsTfSuffix); - auto *max_tf_cf = fts_ctx_->get_cf(name + kFtsMaxTfSuffix); - auto *doc_len_cf = fts_ctx_->get_cf(name + kFtsDocLenSuffix); - - auto indexer = std::make_shared(); - - auto ret = indexer->open(field, fts_ctx_.get(), postings_cf, positions_cf, - term_freq_cf, max_tf_cf, doc_len_cf, stat_cf); - if (!ret.has_value()) { - LOG_ERROR( - "open_fts_indexers: FtsColumnIndexer::open failed for field[%s] " - "err[%s] postings_cf[%p] positions_cf[%p] stat_cf[%p]", - name.c_str(), ret.error().message().c_str(), (void *)postings_cf, - (void *)positions_cf, (void *)stat_cf); - return Status::InternalError("Failed to open FTS indexer: ", name, " ", - ret.error().message()); + for (const auto &block : segment_meta_->persisted_blocks()) { + if (block.type() == BlockType::FTS_INDEX) { + auto fts_path = FileHelper::MakeFtsIndexPath(seg_path_, block.id()); + fts_indexer_ = FtsIndexer::CreateAndOpen( + fts_path, collection_schema_->fts_fields(), false, + options_.read_only_); + if (!fts_indexer_) { + return Status::InternalError("open_fts_indexers: open failed at [", + fts_path, "]"); + } + break; + } } - - fts_indexers_[name] = indexer; } + has_fts_ = (fts_indexer_ != nullptr); return Status::OK(); } Status SegmentImpl::flush_fts_indexers() { - for (const auto &[name, indexer] : fts_indexers_) { - auto ret = indexer->flush(); - if (!ret.has_value()) { - return Status::InternalError("FTS flush failed: ", name, " ", - ret.error().message()); - } + if (!fts_indexer_) { + return Status::OK(); } - auto s = fts_ctx_->flush(); - CHECK_RETURN_STATUS(s); - return Status::OK(); + return fts_indexer_->flush(); } Status SegmentImpl::close_fts_indexers() { - fts_indexers_.clear(); - if (fts_ctx_) { - auto s = fts_ctx_->close(); - fts_ctx_.reset(); - return s; + if (fts_indexer_) { + fts_indexer_.reset(); } return Status::OK(); } @@ -4593,17 +4543,13 @@ Status SegmentImpl::insert_fts_indexer(Doc &doc) { return Status::OK(); } for (const auto &field : collection_schema_->fts_fields()) { - auto it = fts_indexers_.find(field->name()); - if (it == fts_indexers_.end()) { - return Status::InternalError("FTS indexer not found: ", field->name()); - } auto value = doc.get(field->name()); if (value.has_value()) { auto segment_doc_id = doc_ids_.size(); - auto ret = it->second->insert(segment_doc_id, value.value()); - if (!ret.has_value()) { - return Status::InternalError("FTS insert failed: ", field->name(), " ", - ret.error().message()); + auto s = + fts_indexer_->insert(field->name(), segment_doc_id, value.value()); + if (!s.ok()) { + return s; } } } @@ -4611,49 +4557,18 @@ Status SegmentImpl::insert_fts_indexer(Doc &doc) { } Status SegmentImpl::dump_fts_indexers() { - if (!has_fts_) { + if (!has_fts_ || !fts_indexer_) { return Status::OK(); } - - // flush all indexers - for (const auto &[name, indexer] : fts_indexers_) { - auto ret = indexer->flush(); - if (!ret.has_value()) { - return Status::InternalError("FTS flush failed during dump: ", name, " ", - ret.error().message()); - } - } - - // convert postings to bitpacked format - for (const auto &[name, indexer] : fts_indexers_) { - auto ret = indexer->convert_postings_to_bitpacked(); - if (!ret.has_value()) { - return Status::InternalError("FTS convert_postings_to_bitpacked failed: ", - name, " ", ret.error().message()); - } - } - - // reset side CFs and drop $TF/$MAX_TF/$DOC_LEN CFs - for (const auto &[name, indexer] : fts_indexers_) { - indexer->reset_side_cfs(); - } - for (const auto &field : collection_schema_->fts_fields()) { - const auto &name = field->name(); - fts_ctx_->drop_cf(name + kFtsTfSuffix); - fts_ctx_->drop_cf(name + kFtsMaxTfSuffix); - fts_ctx_->drop_cf(name + kFtsDocLenSuffix); - } - - return Status::OK(); + return fts_indexer_->seal_all(); } fts::FtsColumnIndexerPtr SegmentImpl::get_fts_indexer( const std::string &field_name) const { - auto it = fts_indexers_.find(field_name); - if (it != fts_indexers_.end()) { - return it->second; + if (!fts_indexer_) { + return nullptr; } - return nullptr; + return fts_indexer_->get(field_name); } Result> SegmentImpl::fts_search( @@ -4674,4 +4589,222 @@ Result> SegmentImpl::fts_search( return std::move(ret.value()); } +Status SegmentImpl::create_fts_index(const std::string &column, + const IndexParams::Ptr &index_params, + SegmentMeta::Ptr *out_segment_meta, + FtsIndexer::Ptr *out_fts_indexer) { + auto fts_params = std::dynamic_pointer_cast(index_params); + if (!fts_params) { + return Status::InvalidArgument("create_fts_index: not FtsIndexParams"); + } + + auto field = collection_schema_->get_field(column); + if (!field) { + return Status::NotFound("create_fts_index: field not found: ", column); + } + + if (field->index_params() != nullptr) { + if (*field->index_params() == *index_params) { + // Already indexed with same params, nothing to do. + *out_segment_meta = std::make_shared(*segment_meta_); + *out_fts_indexer = fts_indexer_; + return Status::OK(); + } + if (field->index_params()->type() != index_params->type()) { + return Status::InvalidArgument( + "create_fts_index: field[", column, "] already has index type ", + IndexTypeCodeBook::AsString(field->index_params()->type())); + } + // Same type but different params — will rebuild below. + } + + auto new_segment_meta = std::make_shared(*segment_meta_); + new_segment_meta->remove_fts_index_block(); + + auto block_id = allocate_block_id(); + auto new_fts_path = FileHelper::MakeFtsIndexPath(seg_path_, block_id); + + // Build a new schema that includes the new FTS field for the snapshot open. + auto new_schema = std::make_shared(*collection_schema_); + new_schema->get_field(column)->set_index_params(index_params); + + FtsIndexer::Ptr new_fts_indexer; + if (fts_indexer_) { + // Snapshot existing fts DB, then open the copy with current schema. + auto s = fts_indexer_->create_snapshot(new_fts_path); + CHECK_RETURN_STATUS(s); + + new_fts_indexer = FtsIndexer::CreateAndOpen( + new_fts_path, collection_schema_->fts_fields(), false); + } else { + // No existing fts DB — create a fresh one. + new_fts_indexer = + FtsIndexer::CreateAndOpen(new_fts_path, new_schema->fts_fields(), true); + } + if (!new_fts_indexer) { + FileHelper::RemoveDirectory(new_fts_path); + return Status::InternalError("create_fts_index: failed to open snapshot"); + } + + // If the field already exists in the snapshot (params change), remove it + // first so we can recreate with the new params. + if (new_fts_indexer->has_field(column)) { + auto s = new_fts_indexer->remove_field_indexer(column); + if (!s.ok()) { + FileHelper::RemoveDirectory(new_fts_path); + return s; + } + } + + { + auto new_field_schema = std::make_shared( + column, field->data_type(), field->nullable(), index_params); + auto s = new_fts_indexer->create_field_indexer(*new_field_schema); + if (!s.ok()) { + FileHelper::RemoveDirectory(new_fts_path); + return s; + } + } + + // Scan forward store and replay all existing documents. + auto reader = scan({column}); + if (reader) { + uint32_t seg_doc_id = 0; + while (true) { + auto batch = reader->Next(); + if (!batch.ok()) { + FileHelper::RemoveDirectory(new_fts_path); + return Status::InternalError("create_fts_index: scan failed: ", + batch.status().message()); + } + auto batch_value = batch.ValueOrDie(); + if (!batch_value) { + break; + } + auto col_idx = batch_value->schema()->GetFieldIndex(column); + if (col_idx < 0) { + seg_doc_id += batch_value->num_rows(); + continue; + } + auto string_array = std::static_pointer_cast( + batch_value->column(col_idx)); + for (int64_t i = 0; i < string_array->length(); ++i) { + if (!string_array->IsNull(i)) { + auto s = new_fts_indexer->insert(column, seg_doc_id, + string_array->GetString(i)); + if (!s.ok()) { + FileHelper::RemoveDirectory(new_fts_path); + return s; + } + } + seg_doc_id++; + } + } + } + + // Seal the new field (flush + convert to BitPacked + drop side CFs). + auto s = new_fts_indexer->seal(column); + if (!s.ok()) { + FileHelper::RemoveDirectory(new_fts_path); + return s; + } + + s = new_fts_indexer->flush(); + if (!s.ok()) { + FileHelper::RemoveDirectory(new_fts_path); + return s; + } + + // Register the new block in segment meta. + BlockMeta block; + block.set_id(block_id); + block.set_type(BlockType::FTS_INDEX); + new_segment_meta->add_persisted_block(block); + + *out_segment_meta = new_segment_meta; + *out_fts_indexer = new_fts_indexer; + + return Status::OK(); +} + +Status SegmentImpl::drop_fts_index(const std::string &column, + SegmentMeta::Ptr *out_segment_meta, + FtsIndexer::Ptr *out_fts_indexer) { + auto field = collection_schema_->get_field(column); + if (!field) { + return Status::NotFound("drop_fts_index: field not found: ", column); + } + + auto new_segment_meta = std::make_shared(*segment_meta_); + new_segment_meta->remove_fts_index_block(); + + // Build a new schema without the FTS index on this column. + auto new_schema = std::make_shared(*collection_schema_); + new_schema->get_field(column)->set_index_params(nullptr); + + if (!fts_indexer_) { + *out_segment_meta = new_segment_meta; + *out_fts_indexer = nullptr; + return Status::OK(); + } + + // Check if other FTS fields remain after removal. + bool has_other_fts = new_schema->has_fts_field(); + + if (has_other_fts) { + auto block_id = allocate_block_id(); + auto new_fts_path = FileHelper::MakeFtsIndexPath(seg_path_, block_id); + + // Snapshot and reopen without this field. + auto s = fts_indexer_->create_snapshot(new_fts_path); + CHECK_RETURN_STATUS(s); + + auto new_fts_indexer = FtsIndexer::CreateAndOpen( + new_fts_path, collection_schema_->fts_fields(), false); + if (!new_fts_indexer) { + FileHelper::RemoveDirectory(new_fts_path); + return Status::InternalError("drop_fts_index: failed to open snapshot"); + } + + s = new_fts_indexer->remove_field_indexer(column); + CHECK_RETURN_STATUS(s); + + s = new_fts_indexer->flush(); + CHECK_RETURN_STATUS(s); + + BlockMeta block; + block.set_id(block_id); + block.set_type(BlockType::FTS_INDEX); + new_segment_meta->add_persisted_block(block); + + *out_fts_indexer = new_fts_indexer; + } else { + // Last FTS field removed — no new indexer. + *out_fts_indexer = nullptr; + } + + *out_segment_meta = new_segment_meta; + return Status::OK(); +} + +Status SegmentImpl::reload_fts_index(const CollectionSchema &schema, + const SegmentMeta::Ptr &segment_meta, + const FtsIndexer::Ptr &new_fts_indexer) { + collection_schema_ = std::make_shared(schema); + segment_meta_ = segment_meta; + + if (fts_indexer_) { + auto old_dir = fts_indexer_->working_dir(); + fts_indexer_ = new_fts_indexer; + FileHelper::RemoveDirectory(old_dir); + } else { + fts_indexer_ = new_fts_indexer; + } + + has_fts_ = (fts_indexer_ != nullptr); + fresh_persist_block_offset(); + + return Status::OK(); +} + } // namespace zvec diff --git a/src/db/index/segment/segment.h b/src/db/index/segment/segment.h index e31f07227..0b3a8b87a 100644 --- a/src/db/index/segment/segment.h +++ b/src/db/index/segment/segment.h @@ -26,6 +26,7 @@ #include #include #include "db/index/column/fts_column/fts_column_indexer.h" +#include "db/index/column/fts_column/fts_indexer.h" #include "db/index/column/inverted_column/inverted_column_indexer.h" #include "db/index/column/inverted_column/inverted_indexer.h" #include "db/index/column/vector_column/combined_vector_column_indexer.h" @@ -125,6 +126,19 @@ class Segment { const CollectionSchema &schema, const SegmentMeta::Ptr &segment_meta, const InvertedIndexer::Ptr &scalar_indexer) = 0; + virtual Status create_fts_index(const std::string &column, + const IndexParams::Ptr &index_params, + SegmentMeta::Ptr *new_segment_meta, + FtsIndexer::Ptr *output_fts_indexer) = 0; + + virtual Status drop_fts_index(const std::string &column, + SegmentMeta::Ptr *new_segment_meta, + FtsIndexer::Ptr *output_fts_indexer) = 0; + + virtual Status reload_fts_index(const CollectionSchema &schema, + const SegmentMeta::Ptr &segment_meta, + const FtsIndexer::Ptr &new_fts_indexer) = 0; + // ---- Data operations ---------------------------------------------------- virtual Status Insert(Doc &doc) = 0; @@ -168,6 +182,7 @@ class Segment { virtual InvertedColumnIndexer::Ptr get_scalar_indexer( const std::string &field_name) const = 0; + // caller hold segment shared_ptr for segment handle the indexer's lifetime virtual fts::FtsColumnIndexerPtr get_fts_indexer( const std::string &field_name) const = 0; diff --git a/src/db/index/segment/segment_helper.cc b/src/db/index/segment/segment_helper.cc index 7c2883240..ccb75413b 100644 --- a/src/db/index/segment/segment_helper.cc +++ b/src/db/index/segment/segment_helper.cc @@ -66,6 +66,16 @@ Status SegmentHelper::Execute(SegmentTask::Ptr &task) { } else if (std::holds_alternative(task_info)) { auto &drop_index_task = std::get(task_info); s = ExecuteDropScalarIndexTask(drop_index_task); + } else if (std::holds_alternative(task_info)) { + auto &fts_task = std::get(task_info); + s = fts_task.input_segment_->create_fts_index( + fts_task.column_, fts_task.index_params_, + &fts_task.output_segment_meta_, &fts_task.output_fts_indexer_); + } else if (std::holds_alternative(task_info)) { + auto &fts_task = std::get(task_info); + s = fts_task.input_segment_->drop_fts_index(fts_task.column_, + &fts_task.output_segment_meta_, + &fts_task.output_fts_indexer_); } else { return Status::InvalidArgument("Unknown task type"); } @@ -146,7 +156,7 @@ Status SegmentHelper::ExecuteCompactTask(CompactTask &task) { LOG_INFO("Compacted vector index"); s = ReduceFts(schema, input_segments, output_segment_path, - delete_row_id_bitmap); + delete_row_id_bitmap, block_id_generator, &block_metas); CHECK_RETURN_STATUS(s); LOG_INFO("Compacted fts index"); @@ -973,7 +983,9 @@ arrow::Status SegmentHelper::FilterRecordBatch( Status SegmentHelper::ReduceFts(const CollectionSchema::Ptr &schema, const std::vector &input_segments, const std::string &output_segment_path, - const roaring::Roaring &delete_row_id_bitmap) { + const roaring::Roaring &delete_row_id_bitmap, + std::function &block_id_generator, + std::vector *output_block_metas) { if (!schema->has_fts_field()) { return Status::OK(); } @@ -983,12 +995,15 @@ Status SegmentHelper::ReduceFts(const CollectionSchema::Ptr &schema, auto fts_fields = schema->fts_fields(); + auto fts_block_id = block_id_generator(); + // Build the destination FTS RocksDB with the post-dump CF layout: // postings + positions per field, plus the shared stat CF. Side CFs // ($TF/$MAX_TF/$DOC_LEN) are skipped — the reducer writes BitPacked // directly, matching the immutable-segment shape after // convert_postings_to_bitpacked(). - auto dst_fts_path = FileHelper::MakeFtsIndexPath(output_segment_path); + auto dst_fts_path = + FileHelper::MakeFtsIndexPath(output_segment_path, fts_block_id); std::vector cf_names; std::unordered_map> per_cf_merge_ops; @@ -1078,6 +1093,12 @@ Status SegmentHelper::ReduceFts(const CollectionSchema::Ptr &schema, s.message().c_str()); return s; } + + BlockMeta fts_block; + fts_block.set_id(fts_block_id); + fts_block.set_type(BlockType::FTS_INDEX); + output_block_metas->push_back(fts_block); + return Status::OK(); } diff --git a/src/db/index/segment/segment_helper.h b/src/db/index/segment/segment_helper.h index 3de46eb2e..ab8c3a9df 100644 --- a/src/db/index/segment/segment_helper.h +++ b/src/db/index/segment/segment_helper.h @@ -130,13 +130,42 @@ struct DropScalarIndexTask { InvertedIndexer::Ptr output_scalar_indexer_; // nullptr means no scalar index }; +struct CreateFtsIndexTask { + CreateFtsIndexTask(Segment::Ptr input_segment, std::string column, + IndexParams::Ptr index_params) + : input_segment_(input_segment), + column_(std::move(column)), + index_params_(std::move(index_params)) {} + + Segment::Ptr input_segment_; + std::string column_; + IndexParams::Ptr index_params_; + + // output + SegmentMeta::Ptr output_segment_meta_; + FtsIndexer::Ptr output_fts_indexer_; +}; + +struct DropFtsIndexTask { + DropFtsIndexTask(Segment::Ptr input_segment, std::string column) + : input_segment_(input_segment), column_(std::move(column)) {} + + Segment::Ptr input_segment_; + std::string column_; + + // output + SegmentMeta::Ptr output_segment_meta_; + FtsIndexer::Ptr output_fts_indexer_; +}; + class SegmentTask { public: using Ptr = std::shared_ptr; using TaskInfo = std::variant; + CreateScalarIndexTask, DropScalarIndexTask, + CreateFtsIndexTask, DropFtsIndexTask>; static Ptr CreateCompactTask(const CompactTask &task) { return std::make_shared(task); @@ -158,6 +187,14 @@ class SegmentTask { return std::make_shared(task); } + static Ptr CreateCreateFtsIndexTask(const CreateFtsIndexTask &task) { + return std::make_shared(task); + } + + static Ptr CreateDropFtsIndexTask(const DropFtsIndexTask &task) { + return std::make_shared(task); + } + public: SegmentTask(const CompactTask &task) : task_info_(task) {} @@ -169,6 +206,10 @@ class SegmentTask { SegmentTask(const DropScalarIndexTask &task) : task_info_(task) {} + SegmentTask(const CreateFtsIndexTask &task) : task_info_(task) {} + + SegmentTask(const DropFtsIndexTask &task) : task_info_(task) {} + TaskInfo &task_info() { return task_info_; } @@ -251,7 +292,9 @@ class SegmentHelper { static Status ReduceFts(const CollectionSchema::Ptr &schema, const std::vector &input_segments, const std::string &output_segment_path, - const roaring::Roaring &delete_row_id_bitmap); + const roaring::Roaring &delete_row_id_bitmap, + std::function &block_id_generator, + std::vector *output_block_metas); static arrow::Status FilterRecordBatch( const std::shared_ptr &batch, diff --git a/src/db/proto/zvec.proto b/src/db/proto/zvec.proto index e94d1d399..ce01ed876 100644 --- a/src/db/proto/zvec.proto +++ b/src/db/proto/zvec.proto @@ -171,6 +171,7 @@ enum BlockType { BT_SCALAR_INDEX = 2; BT_VECTOR_INDEX = 3; BT_VECTOR_INDEX_QUANTIZE = 4; + BT_FTS_INDEX = 5; }; message BlockMeta { diff --git a/src/include/zvec/db/type.h b/src/include/zvec/db/type.h index a48267994..495e4bec9 100644 --- a/src/include/zvec/db/type.h +++ b/src/include/zvec/db/type.h @@ -125,6 +125,7 @@ enum BlockType : uint32_t { SCALAR_INDEX = 2, VECTOR_INDEX = 3, VECTOR_INDEX_QUANTIZE = 4, + FTS_INDEX = 5, }; diff --git a/tests/db/collection_test.cc b/tests/db/collection_test.cc index 877a07dc8..7f4e257d1 100644 --- a/tests/db/collection_test.cc +++ b/tests/db/collection_test.cc @@ -5625,7 +5625,7 @@ TEST_F(CollectionTest, Feature_NoVectorCollection_FtsLifecycle) { auto create_res = Collection::CreateAndOpen(col_path, *schema, CollectionOptions{false, true}); ASSERT_TRUE(create_res.has_value()) << create_res.error().message(); - auto col = create_res.value(); + auto col = std::move(create_res.value()); // Insert a corpus where 4 of 5 docs contain "hello". Doc 4 is the only // doc without "hello"; we'll delete it later to verify Optimize correctly @@ -5680,16 +5680,41 @@ TEST_F(CollectionTest, Feature_NoVectorCollection_FtsLifecycle) { ASSERT_EQ(fts_search("hello").size(), 3u); ASSERT_EQ(fts_search("nothing").size(), 0u); + // Close and reopen in read-only mode (same as bench query mode). + col.reset(); + CollectionOptions ro_options; + ro_options.read_only_ = true; + auto reopen_res = Collection::Open(col_path, ro_options); + ASSERT_TRUE(reopen_res.has_value()) << reopen_res.error().message(); + col = std::move(reopen_res.value()); + + auto fts_search_ro = [&](const std::string &term) { + SearchQuery vq; + vq.target_.field_name_ = "content"; + vq.topk_ = 10; + FtsClause fts_q; + fts_q.query_string_ = term; + vq.target_.clause_ = fts_q; + auto r = col->Query(vq); + EXPECT_TRUE(r.has_value()) << r.error().message(); + return r.has_value() ? r.value() : DocPtrList{}; + }; + + ASSERT_EQ(fts_search_ro("hello").size(), 3u); + ASSERT_EQ(fts_search_ro("nothing").size(), 0u); + col.reset(); FileHelper::RemoveDirectory(col_path); } -// CreateIndex/DropIndex must explicitly reject index types they don't -// support (today: anything other than vector index types or INVERT). This -// keeps a hypothetically supported-looking call like CreateIndex(field, -// FtsClause) from silently routing through the scalar/invert path and -// corrupting state. -TEST_F(CollectionTest, CornerCase_CreateOrDropIndex_UnsupportedTypes) { +// Dynamic CreateIndex/DropIndex for FTS: create an FTS index on a STRING column +// that already has data, verify queries hit, then drop the index and verify FTS +// is no longer available. Also covers reopen persistence. +TEST_F(CollectionTest, Feature_CreateOrDropFtsIndex) { +#ifdef __ANDROID__ + GTEST_SKIP() << "Skipped on Android: emulator filesystem lacks hardlink " + "support (needed by RocksDB checkpoint)"; +#endif auto build_schema = [](bool with_fts) { auto schema = std::make_shared("fts_dyn"); schema->add_field(std::make_shared("title", DataType::STRING)); @@ -5720,16 +5745,32 @@ TEST_F(CollectionTest, CornerCase_CreateOrDropIndex_UnsupportedTypes) { return col->Query(vq); }; - // Case 1: CreateIndex(FtsIndexParams) and CreateIndex(nullptr) on a column - // declared without an FTS index — both should be rejected up front and - // leave the schema unchanged. + // CreateIndex(nullptr) should fail with INVALID_ARGUMENT. { FileHelper::RemoveDirectory(col_path); - auto schema = build_schema(/*with_fts=*/false); - auto create_res = Collection::CreateAndOpen(col_path, *schema, - CollectionOptions{false, true}); - ASSERT_TRUE(create_res.has_value()) << create_res.error().message(); - auto col = create_res.value(); + auto schema = build_schema(false); + auto col_res = Collection::CreateAndOpen(col_path, *schema, + CollectionOptions{false, true}); + ASSERT_TRUE(col_res.has_value()) << col_res.error().message(); + auto col = std::move(col_res.value()); + + auto s_null = col->CreateIndex("content", nullptr); + ASSERT_FALSE(s_null.ok()); + ASSERT_EQ(s_null.code(), StatusCode::INVALID_ARGUMENT); + + col.reset(); + FileHelper::RemoveDirectory(col_path); + } + + // Case 1: CreateIndex(FtsIndexParams) on a STRING column without FTS. + // Insert data first, then create index, verify queries hit, verify reopen. + { + FileHelper::RemoveDirectory(col_path); + auto schema = build_schema(false); + CollectionOptions options{false, true}; + auto col_res = Collection::CreateAndOpen(col_path, *schema, options); + ASSERT_TRUE(col_res.has_value()) << col_res.error().message(); + auto col = std::move(col_res.value()); std::vector docs; docs.push_back(make_doc(0, "intro", "hello world")); @@ -5738,56 +5779,191 @@ TEST_F(CollectionTest, CornerCase_CreateOrDropIndex_UnsupportedTypes) { ASSERT_TRUE(col->Insert(docs).has_value()); ASSERT_TRUE(col->Flush().ok()); - auto s_fts = - col->CreateIndex("content", std::make_shared()); - ASSERT_FALSE(s_fts.ok()); - ASSERT_EQ(s_fts.code(), StatusCode::NOT_SUPPORTED); + // FTS query before index creation should fail. + auto q_before = fts_search(col, "hello"); + ASSERT_FALSE(q_before.has_value()); - auto s_null = col->CreateIndex("content", nullptr); - ASSERT_FALSE(s_null.ok()); - ASSERT_EQ(s_null.code(), StatusCode::INVALID_ARGUMENT); + // Create FTS index. + auto s = col->CreateIndex("content", std::make_shared()); + ASSERT_TRUE(s.ok()) << s.message(); - // Schema must not be mutated by the rejected calls. - ASSERT_EQ(col->Schema().value(), *schema); + // FTS query should now succeed. + auto q_after = fts_search(col, "hello"); + ASSERT_TRUE(q_after.has_value()) << q_after.error().message(); + ASSERT_EQ(q_after.value().size(), 2u); - // Subsequent FTS query still fails because the column was never indexed, - // but it's a query-side validation error rather than a corruption symptom. - auto q = fts_search(col, "hello"); - ASSERT_FALSE(q.has_value()); + // "nothing" appears in doc 2 only. + auto q_nothing = fts_search(col, "nothing"); + ASSERT_TRUE(q_nothing.has_value()) << q_nothing.error().message(); + ASSERT_EQ(q_nothing.value().size(), 1u); + + // Reopen and verify persistence. + col.reset(); + auto reopen_res = Collection::Open(col_path, options); + ASSERT_TRUE(reopen_res.has_value()) << reopen_res.error().message(); + col = reopen_res.value(); + + auto q_reopen = fts_search(col, "hello"); + ASSERT_TRUE(q_reopen.has_value()) << q_reopen.error().message(); + ASSERT_EQ(q_reopen.value().size(), 2u); col.reset(); FileHelper::RemoveDirectory(col_path); } - // Case 2: DropIndex on an FTS column is rejected (we don't tear down FTS - // physical state through DropIndex today), and the FTS index remains usable. + // Case 2: DropIndex on an FTS column removes the FTS index. { FileHelper::RemoveDirectory(col_path); - auto schema = build_schema(/*with_fts=*/true); - auto create_res = Collection::CreateAndOpen(col_path, *schema, - CollectionOptions{false, true}); - ASSERT_TRUE(create_res.has_value()) << create_res.error().message(); - auto col = create_res.value(); + auto schema = build_schema(true); + CollectionOptions options{false, true}; + auto col_res = Collection::CreateAndOpen(col_path, *schema, options); + ASSERT_TRUE(col_res.has_value()) << col_res.error().message(); + auto col = std::move(col_res.value()); std::vector docs; docs.push_back(make_doc(0, "intro", "hello world")); docs.push_back(make_doc(1, "guide", "hello foo")); ASSERT_TRUE(col->Insert(docs).has_value()); ASSERT_TRUE(col->Flush().ok()); + + // Baseline: FTS query works. auto baseline = fts_search(col, "hello"); ASSERT_TRUE(baseline.has_value()); ASSERT_EQ(baseline.value().size(), 2u); + // Drop FTS index. auto s = col->DropIndex("content"); - ASSERT_FALSE(s.ok()); - ASSERT_EQ(s.code(), StatusCode::NOT_SUPPORTED); + ASSERT_TRUE(s.ok()) << s.message(); + + // FTS query should now fail (field no longer FTS-indexed). + auto q_after = fts_search(col, "hello"); + ASSERT_FALSE(q_after.has_value()); - // Schema and FTS index untouched. - ASSERT_EQ(col->Schema().value(), *schema); + // Reopen and verify FTS is still gone. + col.reset(); + auto reopen_res = Collection::Open(col_path, options); + ASSERT_TRUE(reopen_res.has_value()) << reopen_res.error().message(); + col = reopen_res.value(); + + auto q_reopen = fts_search(col, "hello"); + ASSERT_FALSE(q_reopen.has_value()); + + col.reset(); + FileHelper::RemoveDirectory(col_path); + } + + // Case 3: Create → Drop → Create → Drop cycle on the same column. + { + FileHelper::RemoveDirectory(col_path); + auto schema = build_schema(false); + CollectionOptions options{false, true}; + auto col_res = Collection::CreateAndOpen(col_path, *schema, options); + ASSERT_TRUE(col_res.has_value()) << col_res.error().message(); + auto col = std::move(col_res.value()); + + std::vector docs; + docs.push_back(make_doc(0, "intro", "hello world")); + docs.push_back(make_doc(1, "guide", "hello foo")); + docs.push_back(make_doc(2, "more", "nothing here")); + ASSERT_TRUE(col->Insert(docs).has_value()); + ASSERT_TRUE(col->Flush().ok()); + + // Round 1: Create FTS index. + auto s = col->CreateIndex("content", std::make_shared()); + ASSERT_TRUE(s.ok()) << s.message(); auto q = fts_search(col, "hello"); + ASSERT_TRUE(q.has_value()) << q.error().message(); + ASSERT_EQ(q.value().size(), 2u); + + // Round 1: Drop FTS index. + s = col->DropIndex("content"); + ASSERT_TRUE(s.ok()) << s.message(); + q = fts_search(col, "hello"); + ASSERT_FALSE(q.has_value()); + + // Round 2: Re-create FTS index. + s = col->CreateIndex("content", std::make_shared()); + ASSERT_TRUE(s.ok()) << s.message(); + q = fts_search(col, "hello"); + ASSERT_TRUE(q.has_value()) << q.error().message(); + ASSERT_EQ(q.value().size(), 2u); + + // Round 2: Re-drop FTS index. + s = col->DropIndex("content"); + ASSERT_TRUE(s.ok()) << s.message(); + q = fts_search(col, "hello"); + ASSERT_FALSE(q.has_value()); + + // Reopen and verify final state (no FTS). + col.reset(); + auto reopen_res = Collection::Open(col_path, options); + ASSERT_TRUE(reopen_res.has_value()) << reopen_res.error().message(); + col = reopen_res.value(); + + q = fts_search(col, "hello"); + ASSERT_FALSE(q.has_value()); + + col.reset(); + FileHelper::RemoveDirectory(col_path); + } + + // Case 4: CreateIndex with different FtsIndexParams on a column that already + // has an FTS index — should remove the old index and rebuild with new params. + { + FileHelper::RemoveDirectory(col_path); + auto schema = build_schema(false); + CollectionOptions options{false, true}; + auto col_res = Collection::CreateAndOpen(col_path, *schema, options); + ASSERT_TRUE(col_res.has_value()) << col_res.error().message(); + auto col = std::move(col_res.value()); + + std::vector docs; + docs.push_back(make_doc(0, "intro", "hello world")); + docs.push_back(make_doc(1, "guide", "hello foo")); + docs.push_back(make_doc(2, "more", "nothing here")); + ASSERT_TRUE(col->Insert(docs).has_value()); + ASSERT_TRUE(col->Flush().ok()); + + // Create FTS index with default params (tokenizer="standard"). + auto params_v1 = std::make_shared("standard"); + auto s = col->CreateIndex("content", params_v1); + ASSERT_TRUE(s.ok()) << s.message(); + auto q = fts_search(col, "hello"); + ASSERT_TRUE(q.has_value()) << q.error().message(); + ASSERT_EQ(q.value().size(), 2u); + + // Re-create with different params: no lowercase filter, so indexing + // preserves original case and case-mismatched queries should miss. + auto params_v2 = std::make_shared( + "standard", std::vector{}); + ASSERT_NE(*params_v1, *params_v2); + s = col->CreateIndex("content", params_v2); + ASSERT_TRUE(s.ok()) << s.message(); + + // Lowercase query should still hit (source text is lowercase). + q = fts_search(col, "hello"); + ASSERT_TRUE(q.has_value()) << q.error().message(); + ASSERT_EQ(q.value().size(), 2u); + + // Uppercase query should miss — no lowercase filter means case-sensitive. + q = fts_search(col, "HELLO"); ASSERT_TRUE(q.has_value()); + ASSERT_EQ(q.value().size(), 0u); + + // Reopen and verify persistence. + col.reset(); + auto reopen_res = Collection::Open(col_path, options); + ASSERT_TRUE(reopen_res.has_value()) << reopen_res.error().message(); + col = reopen_res.value(); + + q = fts_search(col, "hello"); + ASSERT_TRUE(q.has_value()) << q.error().message(); ASSERT_EQ(q.value().size(), 2u); + q = fts_search(col, "HELLO"); + ASSERT_TRUE(q.has_value()); + ASSERT_EQ(q.value().size(), 0u); + col.reset(); FileHelper::RemoveDirectory(col_path); } diff --git a/tests/db/sqlengine/mock_segment.h b/tests/db/sqlengine/mock_segment.h index ccdb61918..130c8796f 100644 --- a/tests/db/sqlengine/mock_segment.h +++ b/tests/db/sqlengine/mock_segment.h @@ -462,6 +462,25 @@ class MockSegment : public Segment { return Status::OK(); } + Status reload_fts_index(const CollectionSchema &schema, + const SegmentMeta::Ptr &segment_meta, + const FtsIndexer::Ptr &new_fts_indexer) override { + return Status::OK(); + } + + Status create_fts_index(const std::string &column, + const IndexParams::Ptr &index_params, + SegmentMeta::Ptr *new_segment_meta, + FtsIndexer::Ptr *output_fts_indexer) override { + return Status::OK(); + } + + Status drop_fts_index(const std::string &column, + SegmentMeta::Ptr *new_segment_meta, + FtsIndexer::Ptr *output_fts_indexer) override { + return Status::OK(); + } + Status Insert(Doc &doc) override { return Status::OK(); }