flatbuffers support(2/3): Add flatbuffers message construction logic#3196
flatbuffers support(2/3): Add flatbuffers message construction logic#3196Q1ngbo wants to merge 1 commit intoapache:masterfrom
Conversation
Key components: - flatbuffers_common.h: Defines abstract interfaces (RpcChannel and Service) for FlatBuffers-based RPC communication, similar to protobuf's RPC interfaces. - flatbuffers_impl.h: Implements FlatBuffers-specific message handling: * SlabAllocator: Custom allocator using SingleIOBuf for zero-copy operations. * Message: Wrapper for FlatBuffers messages with SingleIOBuf storage. * MessageBuilder: BRPC-specific FlatBufferBuilder with SlabAllocator. * ServiceDescriptor/MethodDescriptor: Service introspection support. - flatbuffers_impl.cpp: Implementation of allocation, serialization, and service descriptor initialization logic
| message:string; | ||
| } | ||
|
|
||
| rpc_service BenchmarkService { |
There was a problem hiding this comment.
flatc 应该是不支持 rpc_service 的,请问这里是怎么处理的?
There was a problem hiding this comment.
flatc 应该是不支持 rpc_service 的,请问这里是怎么处理的?
类似于为grpc实现的--grpc参数,可以给flatc增加了--brpc参数,来将其编译为brpc可用的service。如example/benchmark_fb/test.brpc.fb.h中的class BenchmarkService。但我尚未向flatbuffers提交pr,因为还可能需要根据brpc中实现进行相应修改。
There was a problem hiding this comment.
fb 是否可以这样使用,显式的在 method 后指定 index?
对于从中间删除 method 会导致不兼容的情况,感觉是个隐患,用户不太容易注意到。
rpc_service BenchmarkService {
Test1(BenchmarkRequest):BenchmarkResponse = 1;
Test2(BenchmarkRequest):BenchmarkResponse = 2;
}
There was a problem hiding this comment.
fb 是否可以这样使用,显式的在 method 后指定 index?
好主意!我可以修改下flatc。但为了兼容flatbuffers语法,fbs文件里需要这么写
rpc_service BenchmarkService {
Test(BenchmarkRequest):BenchmarkResponse (id: 2);
Test2(BenchmarkRequest):BenchmarkResponse (id: 5);
Test3(BenchmarkRequest):BenchmarkResponse (id: 1);
}
最终可以在test.brpc.fb.cpp中生成如下代码。
switch (method->index()) {
case 2:
Test(controller, request, response, done);
break;
case 5:
Test2(controller, request, response, done);
break;
case 1:
Test3(controller, request, response, done);
break;
default:
std::cout << "ERROR: " << "Bad method index; this should never happen."
<< std::endl;
break;
}
这样后续任意删除method也能保持兼容性,而且不需要修改brpc中的处理逻辑
There was a problem hiding this comment.
Pull request overview
This pull request adds FlatBuffers message construction logic to brpc as part 2 of a 3-part implementation to support the FlatBuffers protocol. The PR introduces custom allocators, message builders, and service descriptors that integrate FlatBuffers with brpc's zero-copy IOBuf system.
Changes:
- Adds FlatBuffers message construction infrastructure with custom SlabAllocator for zero-copy operations
- Implements Message and MessageBuilder classes for FlatBuffers message lifecycle management
- Provides ServiceDescriptor and MethodDescriptor classes for RPC method resolution
- Fixes a bug in SingleIOBuf::assign() by moving reset() call to ensure proper cleanup in all paths
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 23 comments.
Show a summary per file
| File | Description |
|---|---|
| src/butil/single_iobuf.cpp | Bug fix: moves reset() call before conditional to ensure proper cleanup in both code paths |
| src/brpc/details/flatbuffers_common.h | Defines base interfaces (Service, RpcChannel) for FlatBuffers RPC integration |
| src/brpc/details/flatbuffers_impl.h | Core implementation header with SlabAllocator, Message, MessageBuilder, and descriptor classes |
| src/brpc/details/flatbuffers_impl.cpp | Implementation of allocators, message handling, and service descriptor parsing |
| example/benchmark_fb/test_generated.h | Auto-generated FlatBuffers message definitions (temporary example) |
| example/benchmark_fb/test.fbs | FlatBuffers schema for benchmark service (temporary example) |
| example/benchmark_fb/test.brpc.fb.h | Auto-generated brpc service stub header (temporary example) |
| example/benchmark_fb/test.brpc.fb.cpp | Auto-generated brpc service stub implementation (temporary example) |
| example/benchmark_fb/server.cpp | Example benchmark server implementation (temporary example) |
| example/benchmark_fb/client.cpp | Example benchmark client implementation (temporary example) |
| example/benchmark_fb/CMakeLists.txt | Build configuration for benchmark example (temporary) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| namespace test { | ||
|
|
||
| static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; |
There was a problem hiding this comment.
Potential memory leak: The static array file_level_service_descriptors_my_2eproto[0] is allocated in parse_service_descriptors but never deallocated. This creates a permanent memory leak. Consider using a smart pointer or implementing proper cleanup, perhaps via an atexit handler or a static destructor pattern to ensure the ServiceDescriptor is properly deleted at program exit.
| static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; | |
| static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; | |
| // Ensure the statically allocated ServiceDescriptor is cleaned up at program exit. | |
| struct FileLevelServiceDescriptorsCleanup { | |
| ~FileLevelServiceDescriptorsCleanup() { | |
| if (file_level_service_descriptors_my_2eproto[0] != NULL) { | |
| delete file_level_service_descriptors_my_2eproto[0]; | |
| file_level_service_descriptors_my_2eproto[0] = NULL; | |
| } | |
| } | |
| }; | |
| static FileLevelServiceDescriptorsCleanup file_level_service_descriptors_cleanup; |
| if (table.service_name == "" || table.prefix == "" || | ||
| table.method_name_list == "") { |
There was a problem hiding this comment.
String comparison issue: Comparing std::string objects with "" using == is correct but using empty() method is more idiomatic and clearer. Consider changing 'table.service_name == ""' to 'table.service_name.empty()' (and similarly for prefix and method_name_list) for better code clarity and style consistency.
| if (table.service_name == "" || table.prefix == "" || | |
| table.method_name_list == "") { | |
| if (table.service_name.empty() || table.prefix.empty() || | |
| table.method_name_list.empty()) { |
| // deserialization without unnecessary memory copies. | ||
| class SlabAllocator : public ::flatbuffers::Allocator { | ||
| public: | ||
| SlabAllocator() {} |
There was a problem hiding this comment.
Uninitialized member variables: The SlabAllocator class has member variables _full_buf_head, _fb_begin_head, and _old_size_param that are not initialized in the default constructor. These should be initialized to safe default values (e.g., nullptr for pointers, 0 for size_t) to prevent undefined behavior if methods are called before allocate() is invoked.
| SlabAllocator() {} | |
| SlabAllocator() | |
| : _full_buf_head(nullptr) | |
| , _fb_begin_head(nullptr) | |
| , _old_size_param(0) {} |
| const test::BenchmarkRequest* request = request_base->GetRoot<test::BenchmarkRequest>(); | ||
| // Set Response Message | ||
| brpc::flatbuffers::MessageBuilder mb_; | ||
| const char *req_str = request->message()->c_str(); |
There was a problem hiding this comment.
Potential null pointer dereference: The request->message() call on line 29 can return nullptr if the message field is not set in the FlatBuffers message. Calling c_str() on a nullptr will cause a crash. Add a null check before accessing the string, or handle the case where message is not present.
| const char *req_str = request->message()->c_str(); | |
| const auto* msg = request->message(); | |
| const char* req_str = msg ? msg->c_str() : ""; |
| FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(MethodDescriptor); | ||
| }; | ||
|
|
||
| bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaind = 0); |
There was a problem hiding this comment.
Spelling error in parameter name: The parameter name 'meta_remaind' should be 'meta_remain' or 'meta_remaining' for correct spelling.
| bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaind = 0); | |
| bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaining = 0); |
| Message::Message(const butil::SingleIOBuf &iobuf, | ||
| uint32_t meta_size = 0, | ||
| uint32_t msg_size = 0) |
There was a problem hiding this comment.
Const-correctness issue: The default parameter values for meta_size and msg_size should not be specified in the constructor definition when they are private constructors. Default parameter values should only be declared in the header file (lines 97-103 in flatbuffers_impl.h), not redeclared in the implementation. This can cause confusion and potential mismatches.
| uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p, | ||
| size_t old_size, | ||
| size_t new_size, | ||
| size_t in_use_back, | ||
| size_t in_use_front) { | ||
| _old_size_param = new_size; | ||
| size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE; | ||
|
|
||
| void* old_block = _iobuf.get_cur_block(); | ||
| butil::SingleIOBuf::target_block_inc_ref(old_block); | ||
| _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 0); | ||
| _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE; | ||
| memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, in_use_front); | ||
| butil::SingleIOBuf::target_block_dec_ref(old_block); | ||
| return _fb_begin_head; |
There was a problem hiding this comment.
Missing null pointer check: After calling _iobuf.reallocate_downward(new_real_size, 0, 0) on line 39, the return value should be checked for NULL before proceeding. If reallocation fails, _full_buf_head will be NULL, and subsequent operations including pointer arithmetic and memcpy_downward will result in undefined behavior. Add a null check and handle the error case appropriately, ensuring old_block reference is properly cleaned up.
| Message &operator=(const Message &other) = delete; | ||
| private: | ||
| Message(const butil::SingleIOBuf &iobuf, | ||
| uint32_t meta_size, | ||
| uint32_t msg_size); | ||
|
|
||
| Message(const butil::IOBuf::BlockRef &ref, | ||
| uint32_t meta_size, | ||
| uint32_t msg_size); | ||
| friend class MessageBuilder; | ||
| public: | ||
| Message(Message &&other) = default; | ||
|
|
||
| Message(const Message &other) = delete; |
There was a problem hiding this comment.
Redundant copy constructor deletion: The Message class has both 'Message(const Message &other) = delete;' on line 108 and 'Message &operator=(const Message &other) = delete;' on line 95. However, line 108 is redundant because the copy constructor is already deleted on line 108. The order is also unusual - typically copy constructor and copy assignment operator are declared together. Consider removing line 95 or organizing the declarations more conventionally.
| << " msg latency=" << g_msg_recorder.latency(1) << "ns"; | ||
| } | ||
|
|
||
| LOG(INFO) << "EchoClient is going to quit"; |
There was a problem hiding this comment.
Inconsistent error message: The error message says "EchoClient is going to quit" but this is a benchmark client, not EchoClient. The message should be updated to "BenchmarkClient is going to quit" or simply "Client is going to quit" for consistency.
| LOG(INFO) << "EchoClient is going to quit"; | |
| LOG(INFO) << "Client is going to quit"; |
|
|
||
| namespace test { | ||
|
|
||
| static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; | ||
| BenchmarkService::~BenchmarkService() {} | ||
| const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() { | ||
| if (file_level_service_descriptors_my_2eproto[0] == NULL) { | ||
| const brpc::flatbuffers::BrpcDescriptorTable desc_table = { | ||
| "test.", "BenchmarkService" , "Test"}; | ||
| if (brpc::flatbuffers::parse_service_descriptors(desc_table, &file_level_service_descriptors_my_2eproto[0])) { | ||
| std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl; | ||
| return NULL; | ||
| } | ||
| } |
There was a problem hiding this comment.
Thread safety issue: The descriptor() method has a race condition. Multiple threads could simultaneously check if file_level_service_descriptors_my_2eproto[0] is NULL and call parse_service_descriptors concurrently, leading to memory leaks and undefined behavior. This should be protected with a mutex or use std::call_once for thread-safe lazy initialization. This is a code generation issue that should be addressed in the flatc code generator.
| namespace test { | |
| static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; | |
| BenchmarkService::~BenchmarkService() {} | |
| const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() { | |
| if (file_level_service_descriptors_my_2eproto[0] == NULL) { | |
| const brpc::flatbuffers::BrpcDescriptorTable desc_table = { | |
| "test.", "BenchmarkService" , "Test"}; | |
| if (brpc::flatbuffers::parse_service_descriptors(desc_table, &file_level_service_descriptors_my_2eproto[0])) { | |
| std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl; | |
| return NULL; | |
| } | |
| } | |
| #include <mutex> | |
| namespace test { | |
| static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; | |
| static std::once_flag file_level_service_descriptors_my_2eproto_once_flag; | |
| BenchmarkService::~BenchmarkService() {} | |
| const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() { | |
| std::call_once(file_level_service_descriptors_my_2eproto_once_flag, []() { | |
| const brpc::flatbuffers::BrpcDescriptorTable desc_table = { | |
| "test.", "BenchmarkService" , "Test"}; | |
| if (brpc::flatbuffers::parse_service_descriptors( | |
| desc_table, &file_level_service_descriptors_my_2eproto[0])) { | |
| std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl; | |
| } | |
| }); |
| #include "brpc/details/flatbuffers_common.h" | ||
|
|
||
| namespace brpc { | ||
| namespace flatbuffers { |


What problem does this PR solve?
Issue Number: resolve #2354 #978
Problem Summary:
Hi, 我基于先前的commit #3062 完成了flatbuffers协议对brpc的适配,准备将完整实现提交至社区。在brpc中增加flatbuffers(fb)支持可分为fb message构造与fb协议处理两部分。前者解决的问题是如何使用flatbuffers库提供的接口去创建一个message,以及应用接收到message后如何从中读取出数据;后者解决的是brpc如何处理"fb"协议。因此,我将相关实现拆分为了两个提交。本提交重点在于message构造。
我之所以将message构造提交至brpc仓库中而不是像grpc一样在google/flatbuffers中实现,主要出于两点考虑:首先,为了与IOBuf兼容,创建message使用的底层数据结构为之前已经提交的SingleIOBuf,在flatbuffers中使用该数据结构会导致循环依赖;其次,flatbuffers提供了完善的接口,只需要在brpc里定义好内存分配器,即可调用相关接口来构造消息,实现上很方便。
特别说明:
Check List: