-
Notifications
You must be signed in to change notification settings - Fork 4.1k
flatbuffers support(2/3): Add flatbuffers message construction logic #3196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| cmake_minimum_required(VERSION 2.8.10) | ||
| project(benchmark_fb C CXX) | ||
|
|
||
| option(LINK_SO "Whether examples are linked dynamically" OFF) | ||
| option(WITH_ASAN "With AddressSanitizer" OFF) | ||
|
|
||
| execute_process( | ||
| COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" | ||
| OUTPUT_VARIABLE OUTPUT_PATH | ||
| ) | ||
|
|
||
| set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) | ||
|
|
||
| include(FindThreads) | ||
| include(FindProtobuf) | ||
|
|
||
| # include current directory for generated files | ||
| include_directories(${CMAKE_CURRENT_SOURCE_DIR}) | ||
|
|
||
| # Search for libthrift* by best effort. If it is not found and brpc is | ||
| # compiled with thrift protocol enabled, a link error would be reported. | ||
| find_library(THRIFT_LIB NAMES thrift) | ||
| if (NOT THRIFT_LIB) | ||
| set(THRIFT_LIB "") | ||
| endif() | ||
|
|
||
| find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) | ||
| if(LINK_SO) | ||
| find_library(BRPC_LIB NAMES brpc) | ||
| else() | ||
| find_library(BRPC_LIB NAMES libbrpc.a brpc) | ||
| endif() | ||
| if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) | ||
| message(FATAL_ERROR "Fail to find brpc") | ||
| endif() | ||
| include_directories(${BRPC_INCLUDE_PATH}) | ||
|
|
||
| find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) | ||
| find_library(GFLAGS_LIBRARY NAMES gflags libgflags) | ||
| if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) | ||
| message(FATAL_ERROR "Fail to find gflags") | ||
| endif() | ||
| include_directories(${GFLAGS_INCLUDE_PATH}) | ||
|
|
||
| # Find FlatBuffers | ||
| find_path(FLATBUFFERS_INCLUDE_PATH flatbuffers/flatbuffers.h) | ||
| find_library(FLATBUFFERS_LIBRARY NAMES flatbuffers) | ||
| if((NOT FLATBUFFERS_INCLUDE_PATH) OR (NOT FLATBUFFERS_LIBRARY)) | ||
| message(FATAL_ERROR "Fail to find flatbuffers") | ||
| endif() | ||
| include_directories(${FLATBUFFERS_INCLUDE_PATH}) | ||
|
|
||
| if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") | ||
| include(CheckFunctionExists) | ||
| CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) | ||
| if(NOT HAVE_CLOCK_GETTIME) | ||
| set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") | ||
| endif() | ||
| endif() | ||
|
|
||
| # set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -g -O0 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") | ||
| set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") | ||
|
|
||
| set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}") | ||
|
|
||
| if (WITH_ASAN) | ||
| set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") | ||
| set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") | ||
| endif() | ||
|
|
||
| if(CMAKE_VERSION VERSION_LESS "3.1.3") | ||
| if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") | ||
| set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") | ||
| endif() | ||
| if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") | ||
| set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") | ||
| endif() | ||
| else() | ||
| set(CMAKE_CXX_STANDARD 11) | ||
| set(CMAKE_CXX_STANDARD_REQUIRED ON) | ||
| endif() | ||
|
|
||
| find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) | ||
| find_library(LEVELDB_LIB NAMES leveldb) | ||
| if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) | ||
| message(FATAL_ERROR "Fail to find leveldb") | ||
| endif() | ||
| include_directories(${LEVELDB_INCLUDE_PATH}) | ||
|
|
||
| if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") | ||
| set(OPENSSL_ROOT_DIR | ||
| "/usr/local/opt/openssl" # Homebrew installed OpenSSL | ||
| ) | ||
| endif() | ||
|
|
||
| find_package(OpenSSL) | ||
| include_directories(${OPENSSL_INCLUDE_DIR}) | ||
|
|
||
| set(DYNAMIC_LIB | ||
| ${CMAKE_THREAD_LIBS_INIT} | ||
| ${GFLAGS_LIBRARY} | ||
| ${PROTOBUF_LIBRARIES} | ||
| ${LEVELDB_LIB} | ||
| ${OPENSSL_CRYPTO_LIBRARY} | ||
| ${OPENSSL_SSL_LIBRARY} | ||
| ${FLATBUFFERS_LIBRARY} | ||
| ${THRIFT_LIB} | ||
| dl | ||
| ) | ||
|
|
||
| if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") | ||
| set(DYNAMIC_LIB ${DYNAMIC_LIB} | ||
| pthread | ||
| "-framework CoreFoundation" | ||
| "-framework CoreGraphics" | ||
| "-framework CoreData" | ||
| "-framework CoreText" | ||
| "-framework Security" | ||
| "-framework Foundation" | ||
| "-Wl,-U,_MallocExtension_ReleaseFreeMemory" | ||
| "-Wl,-U,_ProfilerStart" | ||
| "-Wl,-U,_ProfilerStop" | ||
| "-Wl,-U,__Z13GetStackTracePPvii" | ||
| "-Wl,-U,_mallctl" | ||
| "-Wl,-U,_malloc_stats_print" | ||
| ) | ||
| endif() | ||
|
|
||
| set(FLATBUFFERS_SOURCES | ||
| test.brpc.fb.cpp | ||
| test_generated.h | ||
| test.brpc.fb.h | ||
| ) | ||
|
|
||
| add_executable(client client.cpp ${FLATBUFFERS_SOURCES}) | ||
| add_executable(server server.cpp ${FLATBUFFERS_SOURCES}) | ||
|
|
||
| target_link_libraries(client ${BRPC_LIB} ${DYNAMIC_LIB}) | ||
| target_link_libraries(server ${BRPC_LIB} ${DYNAMIC_LIB}) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,130 @@ | ||||||
| #include <gflags/gflags.h> | ||||||
| #include <bthread/bthread.h> | ||||||
| #include <butil/logging.h> | ||||||
| #include <brpc/server.h> | ||||||
| #include <brpc/channel.h> | ||||||
| #include <bvar/bvar.h> | ||||||
|
|
||||||
| #include "test.brpc.fb.h" | ||||||
|
|
||||||
|
|
||||||
| DEFINE_int32(thread_num, 1, "Number of threads to send requests"); | ||||||
| DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests"); | ||||||
| DEFINE_int32(request_size, 16, "Bytes of each request"); | ||||||
| DEFINE_string(servers, "0.0.0.0:8002", "IP Address of server"); | ||||||
| DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); | ||||||
| DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); | ||||||
| DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); | ||||||
|
|
||||||
| std::string g_request; | ||||||
| butil::IOBuf g_attachment; | ||||||
|
|
||||||
| bvar::LatencyRecorder g_latency_recorder("client"); | ||||||
| bvar::LatencyRecorder g_msg_recorder("msg"); | ||||||
| bvar::Adder<int> g_error_count("client_error_count"); | ||||||
|
|
||||||
| static void* sender(void* arg) { | ||||||
| test::BenchmarkServiceStub stub(static_cast<brpc::Channel*>(arg)); | ||||||
| int log_id = 0; | ||||||
| while (!brpc::IsAskedToQuit()) { | ||||||
| brpc::Controller cntl; | ||||||
| brpc::flatbuffers::Message response; | ||||||
|
|
||||||
| cntl.set_log_id(log_id++); | ||||||
| cntl.request_attachment().append(g_attachment); | ||||||
|
|
||||||
| uint64_t msg_begin_ns = butil::cpuwide_time_ns(); | ||||||
| brpc::flatbuffers::MessageBuilder mb; | ||||||
| auto message = mb.CreateString(g_request); | ||||||
| auto req = test::CreateBenchmarkRequest(mb, 123, 333, 1111, 2222, 0, message); | ||||||
| mb.Finish(req); | ||||||
| brpc::flatbuffers::Message request = mb.ReleaseMessage(); | ||||||
|
|
||||||
| uint64_t msg_end_ns = butil::cpuwide_time_ns(); | ||||||
| stub.Test(&cntl, &request, &response, NULL); | ||||||
|
|
||||||
| if (!cntl.Failed()) { | ||||||
| g_latency_recorder << cntl.latency_us(); | ||||||
| g_msg_recorder << (msg_end_ns - msg_begin_ns); | ||||||
| } else { | ||||||
| g_error_count << 1; | ||||||
| CHECK(brpc::IsAskedToQuit()) | ||||||
| << "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); | ||||||
| // We can't connect to the server, sleep a while. Notice that this | ||||||
| // is a specific sleeping to prevent this thread from spinning too | ||||||
| // fast. You should continue the business logic in a production | ||||||
| // server rather than sleeping. | ||||||
| bthread_usleep(50000); | ||||||
| } | ||||||
| } | ||||||
| return NULL; | ||||||
| } | ||||||
|
|
||||||
| int main(int argc, char* argv[]) { | ||||||
| GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); | ||||||
| // Print parameter information in one line | ||||||
| LOG(INFO) << "Parameters - request_size : " << FLAGS_request_size | ||||||
| << ", attachment_size: " << FLAGS_attachment_size | ||||||
| << ", thread_num: " << FLAGS_thread_num; | ||||||
|
|
||||||
| // A Channel represents a communication line to a Server. Notice that | ||||||
| // Channel is thread-safe and can be shared by all threads in your program. | ||||||
| brpc::Channel channel; | ||||||
|
|
||||||
| // Initialize the channel, NULL means using default options. | ||||||
| brpc::ChannelOptions options; | ||||||
| options.protocol = "fb_rpc"; | ||||||
| options.connection_type = ""; | ||||||
| options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100); | ||||||
| options.timeout_ms = FLAGS_timeout_ms; | ||||||
| options.max_retry = FLAGS_max_retry; | ||||||
| if (channel.Init(FLAGS_servers.c_str(), &options) != 0) { | ||||||
| LOG(ERROR) << "Fail to initialize channel"; | ||||||
| return -1; | ||||||
| } | ||||||
| if (FLAGS_attachment_size > 0) { | ||||||
| void* _attachment_addr = malloc(FLAGS_attachment_size); | ||||||
| if (!_attachment_addr) { | ||||||
| LOG(ERROR) << "Fail to alloc _attachment from system heap"; | ||||||
| return -1; | ||||||
| } | ||||||
| g_attachment.append(_attachment_addr, FLAGS_attachment_size); | ||||||
| free(_attachment_addr); | ||||||
| } | ||||||
| if (FLAGS_request_size < 0) { | ||||||
| LOG(ERROR) << "Bad request_size=" << FLAGS_request_size; | ||||||
| return -1; | ||||||
| } | ||||||
| g_request.resize(FLAGS_request_size, 'r'); | ||||||
|
|
||||||
| if (FLAGS_dummy_port >= 0) { | ||||||
| brpc::StartDummyServerAt(FLAGS_dummy_port); | ||||||
| } | ||||||
|
|
||||||
| std::vector<bthread_t> bids; | ||||||
| bids.resize(FLAGS_thread_num); | ||||||
| for (int i = 0; i < FLAGS_thread_num; ++i) { | ||||||
| if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) { | ||||||
| LOG(ERROR) << "Fail to create bthread"; | ||||||
| return -1; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| while (!brpc::IsAskedToQuit()) { | ||||||
| sleep(1); | ||||||
| LOG(INFO) << "Sending EchoRequest at qps=" << (g_latency_recorder.qps(1) / 1000) | ||||||
| << "k latency=" << g_latency_recorder.latency(1) << "us" | ||||||
| << " msg latency=" << g_msg_recorder.latency(1) << "ns"; | ||||||
| } | ||||||
|
|
||||||
| LOG(INFO) << "EchoClient is going to quit"; | ||||||
|
||||||
| LOG(INFO) << "EchoClient is going to quit"; | |
| LOG(INFO) << "Client is going to quit"; |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,74 @@ | ||||||||
| #include <gflags/gflags.h> | ||||||||
| #include <butil/logging.h> | ||||||||
| #include <brpc/server.h> | ||||||||
| #include "test.brpc.fb.h" | ||||||||
|
|
||||||||
| DEFINE_bool(echo_attachment, true, "Echo attachment as well"); | ||||||||
| DEFINE_int32(port, 8080, "TCP Port of this server"); | ||||||||
| DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " | ||||||||
| "read/write operations during the last `idle_timeout_s'"); | ||||||||
| DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); | ||||||||
| DEFINE_int32(internal_port, -1, "Only allow builtin services at this port"); | ||||||||
|
|
||||||||
| namespace test{ | ||||||||
| class BenchmarkServiceImpl : public BenchmarkService { | ||||||||
| public: | ||||||||
| BenchmarkServiceImpl() {} | ||||||||
| ~BenchmarkServiceImpl() {} | ||||||||
|
|
||||||||
| void Test(google::protobuf::RpcController* controller, | ||||||||
| const brpc::flatbuffers::Message* request_base, | ||||||||
| brpc::flatbuffers::Message* response, | ||||||||
| google::protobuf::Closure* done) { | ||||||||
| brpc::ClosureGuard done_guard(done); | ||||||||
| brpc::Controller* cntl = | ||||||||
| static_cast<brpc::Controller*>(controller); | ||||||||
| const test::BenchmarkRequest* request = request_base->GetRoot<test::BenchmarkRequest>(); | ||||||||
| // Set Response Message | ||||||||
| brpc::flatbuffers::MessageBuilder mb_; | ||||||||
| const char *req_str = request->message()->c_str(); | ||||||||
|
||||||||
| const char *req_str = request->message()->c_str(); | |
| const auto* msg = request->message(); | |
| const char* req_str = msg ? msg->c_str() : ""; |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent error message: The error message says "Fail to start EchoServer" but this is BenchmarkServer, not EchoServer. The message should be updated to "Fail to start BenchmarkServer" or simply "Fail to start server" for consistency.
| LOG(ERROR) << "Fail to start EchoServer"; | |
| LOG(ERROR) << "Fail to start server"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent error message: The error message says "Sending EchoRequest" but this is a BenchmarkRequest, not EchoRequest. The message should be updated to "Sending BenchmarkRequest" or simply "Sending request" for consistency.