diff --git a/Makefile b/Makefile index 586eeea..5ce9611 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,20 @@ all: binaries -CFLAGS = -std=c99 -Wall -O3 -g -D_GNU_SOURCE -DNO_LIBNUMA +CFLAGS := -std=c99 -Wall -O3 -g -D_GNU_SOURCE -DNO_LIBNUMA + +HEADERS_DIR := usr/include + +ifdef WITH_TCPDEVMEM_CUDA + CFLAGS += -DWITH_TCPDEVMEM_CUDA -I $(HEADERS_DIR) +endif +ifdef WITH_TCPDEVMEM_UDMABUF + CFLAGS += -DWITH_TCPDEVMEM_UDMABUF -DNDEBUG=1 -static -I $(HEADERS_DIR) + LDFLAGS += -static +endif + +ifndef_any_of = $(filter undefined,$(foreach v,$(1),$(origin $(v)))) +ifdef_any_of = $(filter-out undefined,$(foreach v,$(1),$(origin $(v)))) lib := \ check_all_options.o \ @@ -48,6 +61,16 @@ lib := \ tcp_rr-objs := tcp_rr_main.o tcp_rr.o rr.o $(lib) tcp_stream-objs := tcp_stream_main.o tcp_stream.o stream.o $(lib) +ifdef WITH_TCPDEVMEM_CUDA + tcp_stream-objs += tcpdevmem_cuda.o +endif +ifdef WITH_TCPDEVMEM_UDMABUF + tcp_stream-objs += tcpdevmem_udmabuf.o +endif +ifneq ($(call ifdef_any_of,WITH_TCPDEVMEM_CUDA WITH_TCPDEVMEM_UDMABUF),) + tcp_stream-objs += tcpdevmem.o +endif + tcp_crr-objs := tcp_crr_main.o tcp_crr.o rr.o $(lib) @@ -63,11 +86,18 @@ psp_rr-objs := psp_rr_main.o psp_rr.o rr.o psp_lib.o $(lib) ext-libs := -lm -lrt -lpthread +tcpdevmem_cuda.o: tcpdevmem_cuda.cu + nvcc -arch=sm_90 -O3 -g -I $(HEADERS_DIR) -D_GNU_SOURCE -DNO_LIBNUMA -DWITH_TCPDEVMEM_CUDA -c -o $@ $^ + tcp_rr: $(tcp_rr-objs) $(CC) $(LDFLAGS) -o $@ $^ $(ext-libs) tcp_stream: $(tcp_stream-objs) +ifdef WITH_TCPDEVMEM_CUDA + g++ $(LDFLAGS) -o $@ $^ $(ext-libs) -lc -L/usr/local/cuda/lib64 -lcudart -lcuda +else $(CC) $(LDFLAGS) -o $@ $^ $(ext-libs) +endif tcp_crr: $(tcp_crr-objs) $(CC) $(LDFLAGS) -o $@ $^ $(ext-libs) diff --git a/README_tcpdevmem.md b/README_tcpdevmem.md new file mode 100644 index 0000000..1c33fe6 --- /dev/null +++ b/README_tcpdevmem.md @@ -0,0 +1,212 @@ +# Neper with TCPDevmem run instructions + +Table of Contents +- [Neper with TCPDevmem run instructions](#neper-with-tcpdevmem-run-instructions) + - [TCPDevmem UDMABUF: Compiling tcp\_stream](#tcpdevmem-udmabuf-compiling-tcp_stream) + - [Manually specifying kernel headers directory (i.e. NOT in `usr/include`)](#manually-specifying-kernel-headers-directory-ie-not-in-usrinclude) + - [Running tcp\_stream](#running-tcp_stream) + - [Added flags](#added-flags) + - [Running tcp\_stream via `multi_neper.py`](#running-tcp_stream-via-multi_neperpy) + - [Example of successful output](#example-of-successful-output) + - [Running tcp\_stream directly](#running-tcp_stream-directly) + + +## TCPDevmem UDMABUF: Compiling tcp_stream + +**UDMABUF-capable tcp_stream can be built statically on a workstation.** + +Neper can be built statically on a host with UDMABUF header files. + +``` +# clone the Neper repository and checkout the tcpd branch +git clone -b tcpd https://github.com/google/neper.git +cd neper + +# copy kernel header files to Neper working directory +# (assumed to be found in ~/kernel/usr/include) +mkdir usr +cp -r ~/kernel/usr/include/ ./usr/ + +make tcp_steam WITH_TCPDEVMEM_UDMABUF=1 + +# copy the binary to your hosts +scp tcp_stream root@${HOST1}:~/ +scp multi_neper.py root@${HOST1}:~/ + +scp tcp_stream root@${HOST2}:~/ +scp multi_neper.py root@${HOST2}:~/ +``` + +### Manually specifying kernel headers directory (i.e. NOT in `usr/include`) + +Copying the header files is unnecessary if you override `HEADERS_DIR` variable when running make. The default value for this variable is `usr/include`. + +``` +git clone -b tcpd https://github.com/google/neper.git +cd neper + +make tcp_steam WITH_TCPDEVMEM_UDMABUF=1 HEADERS_DIR=~/kernel/usr/include +``` + + +## Running tcp_stream + + +### Added flags + +In general, these flags will be automatically populated by `multi_neper.py`. + +``` +--tcpd-validate # payload validation - must pass to both Tx/Rx if enabled +--tcpd-tcpd-rx-cpy # copies payload to another buffer (but doesn't validate) +--tcpd-nic-pci-addr +--tcpd-gpu-pci-addr +--tcpd-phys-len # CUDA mode allows for a much larger value than UDMABUF mode +--tcpd-src-ip +--tcpd-dst-ip +--tcpd-link-name +--queue-start +--queue-num +``` + +`--tcpd-validate`: Client populates the send buffer with [1,111] repeating, and Host verifies the repeating sequence. + + +### Running tcp_stream via `multi_neper.py` + +`multi_neper.py` is a python script that runs in parallel multiple tcp_streams, which is useful when running tcp_stream across multiple pairs of NICs. + +The script also calls ethtool commands on the receiver (host) before spawning tcp_streams, to set the receiver into a TCPDevmem-capable state. + +To view all of `multi_neper.py`’s accepted flags, run `multi_neper.py --help`. + + +``` +# Rx (host) +FLOWS=2 +BUF_SIZE=409600 +DEVS=eth1,eth2,eth3,eth4 +DSTS=192.168.1.26,192.168.2.26,192.168.3.26,192.168.4.26 # host IP addresses +SRCS=192.168.1.23,192.168.2.23,192.168.3.23,192.168.4.23 # client IP addresses +./multi_neper.py --hosts $DSTS \ + --devices $DEVS --buffer-size $BUF_SIZE \ + --flows $FLOWS --threads $FLOWS \ + --src-ips $SRCS --log DEBUG \ + --q-num $FLOWS --phys-len 2147483648 \ + --mode cuda + + +# Tx (client) +FLOWS=2 +BUF_SIZE=409600 +DEVS=eth1,eth2,eth3,eth4 +DSTS=192.168.1.26,192.168.2.26,192.168.3.26,192.168.4.26 +SRCS=192.168.1.23,192.168.2.23,192.168.3.23,192.168.4.23 +./multi_neper.py --hosts $DSTS \ + --devices $DEVS --buffer-size $BUF_SIZE \ + --flows $FLOWS --threads $FLOWS \ + --src-ips $SRCS --log DEBUG \ + --q-num $FLOWS --phys-len 2147483648 \ + --client \ + --mode cuda +``` + +#### Example of successful output + +``` +DEBUG:root:minflt_end=6037 +DEBUG:root:majflt_start=0 +DEBUG:root:majflt_end=0 +DEBUG:root:nvcsw_start=653 +DEBUG:root:nvcsw_end=675141 +DEBUG:root:nivcsw_start=2 +DEBUG:root:nivcsw_end=1018 +DEBUG:root:num_samples=155 +DEBUG:root:time_end=613529.729042674 +DEBUG:root:correlation_coefficient=1.00 +DEBUG:root:throughput=193669.32 +DEBUG:root:throughput_units=Mbit/s +DEBUG:root:local_throughput=193669323769 +DEBUG:root:remote_throughput=0 +DEBUG:root: +[eth1] Throughput (Mb/s): 193551.94 +[eth2] Throughput (Mb/s): 193652.69 +[eth3] Throughput (Mb/s): 193640.21 +[eth4] Throughput (Mb/s): 193669.32 +``` + + + +### Running tcp_stream directly + +**If you’re running Neper outside of the container, make sure to run** + +``` +sudo -s +``` + +**before everything. `ethtool` commands and queue-binding is only available to superuser.** + +Before running tcp_stream, the ethtool commands that `multi_neper.py` runs should also be run: + +``` +# run as superuser, if running Neper as root +sudo -s + +res_link() { +ethtool --set-priv-flags $1 enable-strict-header-split on +ethtool --set-priv-flags $1 enable-strict-header-split off +ethtool --set-priv-flags $1 enable-header-split off +ethtool --set-rxfh-indir $1 equal 16 +ethtool -K $1 ntuple off +ethtool --set-priv-flags $1 enable-strict-header-split off +ethtool --set-priv-flags $1 enable-header-split off +ethtool -K $1 ntuple off +ethtool --set-priv-flags $1 enable-max-rx-buffer-size on +ethtool -K $1 ntuple on +} + +# call on each link you plan to run tcp_stream across +res_link eth1 +``` + + +You can then run `multi_neper.py` with the `--dry-run` flag, to see what tcp_stream commands the script would run: + + +``` +$ FLOWS=1 +$ BUF_SIZE=409600 +$ DEVS=eth1 +$ DSTS=192.168.1.26 +$ SRCS=192.168.1.23 +$ ./multi_neper.py --hosts $DSTS \ + --devices $DEVS --buffer-size $BUF_SIZE \ + --flows $FLOWS --threads $FLOWS \ + --src-ips $SRCS --log DEBUG \ + --q-num $FLOWS --phys-len 2147483648 \ + --client \ + --mode cuda \ + --dry-run + +DEBUG:root:running on ['eth1'] +DEBUG:root:('taskset --cpu-list 2-2 ./tcp_stream -T 1 -F 1 --port 12345 --source-port 12345 --control-port 12866 --buffer-size 409600 -l 10 --num-ports 1 --tcpd-phys-len 2147483648 --tcpd-nic-pci-addr 0000:06:00.0 --tcpd-gpu-pci-addr 0000:04:00.0 -c -H 192.168.1.26', {'CUDA_VISIBLE_DEVICES': '0', ... +``` + +The script will print the tcp_stream command, as well as the environment variables. The only environment variable that matters is `CUDA_VISIBLE_DEVICES` if running in `cuda` mode, which tells tcp_stream which GPU it should allocate memory on. + +You can then reset the receiver, and copy/paste the command: + +``` +# on Rx (host) +res_link eth1 +./multi_neper.py --dry-run ${other_rx_args} + +CUDA_VISIBLE_DEVICES=0 ./tcp_stream # copy cmd from previous line + + +# on Tx (client) +./multi_neper.py --dry-run ${other_tx_args} + +CUDA_VISIBLE_DEVICES=0 ./tcp_stream # copy cmd from previous line +``` diff --git a/check_all_options.c b/check_all_options.c index cd9e0b1..0b6eb15 100644 --- a/check_all_options.c +++ b/check_all_options.c @@ -103,6 +103,34 @@ void check_options_tcp_rr(struct options *opts, struct callbacks *cb) void check_options_tcp_stream(struct options *opts, struct callbacks *cb) { +#ifdef WITH_TCPDEVMEM_CUDA + if (opts->tcpd_gpu_pci_addr) { + CHECK(cb, opts->tcpd_nic_pci_addr, + "Must provide NIC PCI address if GPU PCI address was provided."); + + if (opts->client) { + CHECK(cb, !opts->tcpd_rx_cpy, + "Copying CUDA buffer to userspace only allowed on hosts."); + } + } +#endif /* WITH_TCPDEVMEM_CUDA */ +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + if (opts->tcpd_nic_pci_addr) { + CHECK(cb, opts->tcpd_phys_len > 0, + "Must provide non-zero --tcpd-phys-len flag when running in devmem TCP mode."); + CHECK(cb, opts->num_flows == opts->num_threads, + "Thread/Flow count must be equal when running in devmem TCP mode."); + CHECK(cb, opts->num_flows == opts->num_ports, + "Number of ports should equal number of flows when running in devmem TCP mode."); + + if (!opts->client) { + CHECK(cb, opts->tcpd_src_ip, + "Must provide source IP address for devmem TCP host."); + CHECK(cb, opts->tcpd_dst_ip, + "Must provide destination IP address for devmem TCP host."); + } + } +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ } void check_options_udp_rr(struct options *opts, struct callbacks *cb) diff --git a/define_all_flags.c b/define_all_flags.c index 6ece34a..bfdce05 100644 --- a/define_all_flags.c +++ b/define_all_flags.c @@ -145,6 +145,18 @@ struct flags_parser *add_flags_tcp_stream(struct flags_parser *fp) DEFINE_FLAG(fp, bool, split_bidir , false, 0, "Bidirectional using separate tx/rx sockets"); DEFINE_FLAG(fp, bool, enable_tcp_maerts, false, 'M', "Enables TCP_MAERTS test (server writes and client reads). It overrides enable_read, and enable_write"); DEFINE_FLAG(fp, bool, async_connect, false, 0, "use non blocking connect"); +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + DEFINE_FLAG(fp, bool, tcpd_validate, false, 0, "Validates that received data is a repeating sequence of 1 to 111 inclusive"); + DEFINE_FLAG(fp, bool, tcpd_rx_cpy, false, 0, "After the CUDA buffer is filled to buffer_size, calls cudaMemcpy to a userspace buffer"); + DEFINE_FLAG(fp, const char *, tcpd_nic_pci_addr, 0, 0, "NIC PCI addr, e.x. 0000:06:00.0"); + DEFINE_FLAG(fp, const char *, tcpd_gpu_pci_addr, 0, 0, "GPU PCI addr, e.x. 0000:04:00.0"); + DEFINE_FLAG(fp, unsigned long long, tcpd_phys_len, 0, 0, "Remote memory length for tcpdevmem"); + DEFINE_FLAG(fp, const char *, tcpd_src_ip, 0, 0, "Src ip address for tcpdevmem"); + DEFINE_FLAG(fp, const char *, tcpd_dst_ip, 0, 0, "Dst ip address for tcpdevmem"); + DEFINE_FLAG(fp, const char *, tcpd_link_name, "eth1", 0, "Link name to bind DMA buffer_pages for Rx"); + DEFINE_FLAG(fp, int, queue_start, 8, 0, "Queue to start flow-steering at"); + DEFINE_FLAG(fp, int, queue_num, 4, 0, "Number of queues to flow-steer to"); +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ /* Return the updated fp */ return (fp); diff --git a/flags.c b/flags.c index dd801d5..8d1f5d3 100644 --- a/flags.c +++ b/flags.c @@ -157,6 +157,8 @@ static void default_parser(const char *type, char *arg, void *out, *(unsigned long *)out = strtoul(arg, NULL, 0); else if (strcmp(type, "double") == 0) *(double *)out = atof(arg); + else if (strcmp(type, "unsigned long long") == 0) + *(unsigned long long *)out = strtoull(arg, NULL, 0); else LOG_ERROR(cb, "Unknown type `%s' for arg `%s'.", type, arg); } @@ -339,6 +341,8 @@ static void print_flag(const struct flag *flag, struct callbacks *cb) PRINT(cb, name, "%f", *(double *)var); else if (strcmp(type, "long long") == 0) PRINT(cb, name, "%lld", *(long long *)var); + else if (strcmp(type, "unsigned long long") == 0) + PRINT(cb, name, "%llu", *(unsigned long long *)var); else LOG_ERROR(cb, "Unknown type `%s' for variable %s", type, name); } diff --git a/flow.c b/flow.c index fb46e47..3abe80b 100644 --- a/flow.c +++ b/flow.c @@ -19,6 +19,12 @@ #include "socket.h" #include "thread.h" #include "stats.h" +#ifdef WITH_TCPDEVMEM_CUDA +#include "tcpdevmem_cuda.h" +#endif /* WITH_TCPDEVMEM_CUDA */ +#ifdef WITH_TCPDEVMEM_UDMABUF +#include "tcpdevmem_udmabuf.h" +#endif /* WITH_TCPDEVMEM_UDMABUF */ /* * We define the flow struct locally to this file to force outside users to go @@ -271,6 +277,16 @@ void flow_delete(struct flow *f) thread_clear_flow_or_die(f->f_thread, f); } +#ifdef WITH_TCPDEVMEM_CUDA + if (flow_thread(f)->opts->tcpd_gpu_pci_addr) + cuda_flow_cleanup(f->f_mbuf); +#endif /* WITH_TCPDEVMEM_CUDA */ +#ifdef WITH_TCPDEVMEM_UDMABUF + if (flow_thread(f)->opts->tcpd_nic_pci_addr + && !flow_thread(f)->opts->tcpd_gpu_pci_addr) + udmabuf_flow_cleanup(f->f_mbuf); +#endif /* WITH_TCPDEVMEM_UDMABUF */ + /* TODO: need to free the stat struct here for crr tests */ free(f->f_opaque); /* Right now the test is always false, but let's leave it in case diff --git a/lib.h b/lib.h index 2b616df..ea8c29b 100644 --- a/lib.h +++ b/lib.h @@ -108,6 +108,18 @@ struct options { bool async_connect; /* tcp_stream */ +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + bool tcpd_validate; + bool tcpd_rx_cpy; + const char *tcpd_nic_pci_addr; + const char *tcpd_gpu_pci_addr; + unsigned long long tcpd_phys_len; + const char *tcpd_src_ip; + const char *tcpd_dst_ip; + const char *tcpd_link_name; + int queue_start; + int queue_num; +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ bool split_bidir; /* implies enable_read, enable_write, split rx/tx */ bool enable_read; bool enable_write; diff --git a/multi_neper.py b/multi_neper.py new file mode 100755 index 0000000..7bc5fc4 --- /dev/null +++ b/multi_neper.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 + +import argparse, sys, os, subprocess +from logging import debug,info,warning,error,critical,basicConfig + +parser=argparse.ArgumentParser() + +link_to_gpu_pci_addr = { + "eth1": "0000:04:00.0", # GPU0 + "eth2": "0000:0a:00.0", # GPU2 + "eth3": "0000:84:00.0", # GPU4 + "eth4": "0000:8a:00.0" # GPU6 +} + +link_to_nic_pci_addr = { + "eth1": "0000:06:00.0", + "eth2": "0000:0c:00.0", + "eth3": "0000:86:00.0", + "eth4": "0000:8c:00.0" +} + +link_to_gpu_index = { + "eth1": "0", + "eth2": "2", + "eth3": "4", + "eth4": "6" +} + +# returns a 2-tuple of a Neper command and a dict of env vars +def build_neper_cmd(neper_dir: str, is_client: bool, dev: str, + threads: int, flows: int, + cpu_list, buffer_size: int, phys_len: int, + nic_pci: str, gpu_pci: str, + control_port, source_port, port, length, + src_ip, dst_ip, queue_start, queue_num, + tcpd_validate, tcpd_rx_cpy)->tuple: + + cmd = (f"taskset --cpu-list {cpu_list} {neper_dir}/tcp_stream" + f" -T {threads} -F {flows}" + f" --port {port} --source-port {source_port}" + f" --control-port {control_port}" + f" --buffer-size {buffer_size} " + f" -l {length}" + f" --num-ports {flows}") + + if phys_len: + cmd += f" --tcpd-phys-len {phys_len}" + if nic_pci: + cmd += f" --tcpd-nic-pci-addr {nic_pci}" + if gpu_pci: + cmd += f" --tcpd-gpu-pci-addr {gpu_pci}" + if tcpd_validate: + cmd += " --tcpd-validate" + + if is_client: + cmd += f" -c -H {dst_ip}" + else: + cmd = cmd + (f" --tcpd-link-name {dev}" + f" --tcpd-src-ip {src_ip}" + f" --tcpd-dst-ip {dst_ip}" + f" --queue-start {queue_start}" + f" --queue-num {queue_num}") + if tcpd_rx_cpy: + cmd += " --tcpd-rx-cpy" + + env = {"CUDA_VISIBLE_DEVICES": link_to_gpu_index[dev]} + env.update(os.environ.copy()) + + return (cmd, env) + +# returns a CPU range for taskset +# e.x. returns 4-7 provided 0, 4, 1 as arguments +def get_cpu_range(starting_cpu:int, interval: int, idx: int)->str: + cpu_start = idx * interval + starting_cpu + cpu_end = cpu_start + interval - 1 + return f"{cpu_start}-{cpu_end}" + +def run_cmds(cmds: list)->list: + sp_list = [] + for cmd, env in cmds: + popen = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) + sp_list.append(popen) + + return sp_list + +def parse_subprocess_outputs(subprocesses): + output_dicts = [] + + for sp in subprocesses: + cur_hash = dict() + + sp.wait() + debug(sp.stderr.read()) + for line in sp.stdout.read().split("\n"): + debug(line) + stripped_line = line.strip() + if "=" in stripped_line: + parsed_line = stripped_line.split("=") + cur_hash[parsed_line[0]] = parsed_line[1] + if cur_hash: + output_dicts.append(cur_hash) + + return output_dicts + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument("--neper-dir", help="directory containing Neper binaries", default=".") + parser.add_argument("--threads", help="number of threads per Neper instance", default="4", type=int) + parser.add_argument("--flows", help="number of flows per Neper instance", default="4", type=int) + parser.add_argument("--source-port", default="12345", type=int) + parser.add_argument("--port", default="12345", type=int) + parser.add_argument("--control-port", default="12866", type=int) + parser.add_argument("--devices", help="comma-delimited list of links to run Neper on, i.e. eth1,eth2,eth3", + default="eth1") + parser.add_argument("--phys-len", default=4294967296) + parser.add_argument("--buffer-size", default=4096*120) + + parser.add_argument("-c", "--client", action="store_true") + parser.add_argument("--src-ips", required="--client" not in sys.argv and "-c" not in sys.argv, + help="required for Host to install/remove flow-steering rules, comma-delimited list of client IP addresses") + parser.add_argument("-H", "--hosts", required=True, + help="comma-delimited list of host IP addresses") + + parser.add_argument("--q-start", default="8", help="starting queue for flow-steering rules", type=int) + parser.add_argument("--q-num", default="4", help=("number of queues for flow-steering rules" + " (i.e. if q-start=8 and q-num=4, 2" + " flow-steering rules each will be" + " installed for queues [8-11])"), + type=int) + parser.add_argument("--tcpd-validate", action="store_true") + parser.add_argument("--tcpd-rx-cpy", action="store_true") + + parser.add_argument("--dry-run", default=False, action="store_true") + + parser.add_argument("-l", "--length", default=10) + parser.add_argument("--log", default="WARNING") + parser.add_argument("-m", "--mode", default="cuda", help="cuda|udmabuf|default") + + args = parser.parse_args() + + basicConfig(level=args.log.upper()) + + devices = args.devices.split(",") + hosts = args.hosts.split(",") + src_ips = args.src_ips.split(",") + + dev_to_rule = dict() + # setup flow_steering rules + if not args.client: + info("setting up flow-steering rules") + # src_ips = args.src_ips.split(",") + + cmds = [] + debug(f"running on {devices}") + is_client = args.client + + for i, dev in enumerate(devices): + nic_pci, gpu_pci = None, None + + if args.mode.lower() in ["cuda", "udmabuf"]: + nic_pci = link_to_nic_pci_addr[dev] + if args.mode.lower() == "cuda": + gpu_pci = link_to_gpu_pci_addr[dev] + + # increment control port by 1, and src/dst ports by flow_count + # for each additional link we're running Neper on + ctrl_port = args.control_port + i + src_port = i * args.flows + args.source_port + dst_port = i * args.flows + args.port + + src_ip, dst_ip = src_ips[i], hosts[i] + + # TODO should CPU range be configurable by the user? + cpu_range = get_cpu_range(2 + (52 if i >= 2 else 0), args.threads, i) + + cmd_env = build_neper_cmd(args.neper_dir, is_client, dev, + args.threads, args.flows, cpu_range, args.buffer_size, + args.phys_len, nic_pci, gpu_pci, + ctrl_port, src_port, dst_port, args.length, src_ip, dst_ip, + args.q_start, args.q_num, args.tcpd_validate, args.tcpd_rx_cpy) + + cmds.append(cmd_env) + + for cmd in cmds: + debug(cmd) + + if not args.dry_run: + sp_list = run_cmds(cmds) + debug("parsing subprocesses outputs") + for dev, i in zip(devices, parse_subprocess_outputs(sp_list)): + if not args.client: + try: + print(f"[{dev}] Throughput (Mb/s): {i['throughput']}") + except KeyError: + print(f"[{dev}] Throughput (Mb/s): NA") diff --git a/socket.c b/socket.c index 4fd19c4..7732c52 100644 --- a/socket.c +++ b/socket.c @@ -18,6 +18,15 @@ #include "flow.h" #include "socket.h" #include "thread.h" +#ifdef WITH_TCPDEVMEM_CUDA +#include "tcpdevmem_cuda.h" +#endif +#ifdef WITH_TCPDEVMEM_UDMABUF +#include "tcpdevmem_udmabuf.h" +#endif +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) +#include "tcpdevmem.h" +#endif #ifndef NO_LIBNUMA #include "third_party/libnuma/numa.h" @@ -63,6 +72,24 @@ static void socket_init_not_established(struct thread *t, int s) if (err) PLOG_ERROR(t->cb, "setsockopt(SO_LINGER)"); } +#ifdef WITH_TCPDEVMEM_CUDA + if (!t->f_mbuf && opts->tcpd_gpu_pci_addr) { + if (tcpd_cuda_setup_alloc(t->opts, &t->f_mbuf, t)) + LOG_FATAL(t->cb, "%s: failed to setup devmem CUDA socket", + __func__); + } +#endif /* WITH_TCPDEVMEM_CUDA */ +#ifdef WITH_TCPDEVMEM_UDMABUF + if (!t->f_mbuf && opts->tcpd_nic_pci_addr) { + if (udmabuf_setup_alloc(t->opts, &t->f_mbuf, t)) + LOG_FATAL(t->cb, "%s: failed to setup devmem UDMABUF socket", + __func__); + } +#endif /* WITH_TCPDEVMEM_UDMABUF */ +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + if (opts->tcpd_nic_pci_addr) + tcpd_setup_socket(t, s); +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ } /* @@ -242,6 +269,24 @@ void socket_listen(struct thread *t) struct addrinfo *ai = getaddrinfo_or_die(opts->host, opts->port, &hints, cb); int port = atoi(opts->port); +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + /* TCPDevmem: + * Since each thread has a dma buffer, and + * flow-steering rules are required, threads, TCP connections, and + * dma buffers need to be 1:1:1. + * + * We enforce that by co-opting the num_ports option. + * + * thread/flow 0 will listen on port x, and use thread_0's buf + * thread_1/flow_1 listen on x+1 -> thread_1->f_mbuf + * etc... + */ + if (opts->tcpd_nic_pci_addr) { + port += t->index; + reset_port(ai, port, cb); + } +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ + int i, n, s; struct flow_create_args args = { @@ -257,6 +302,17 @@ void socket_listen(struct thread *t) switch (ai->ai_socktype) { case SOCK_STREAM: n = opts->num_ports ? opts->num_ports : 1; +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + /* TCP Devmem: + * See TCP Devmem comment above^ + * + * We are co-opting the num_ports option, so each thread/flow + * listens on a port that's 1 larger than the previous thread's + * port. + */ + if (opts->tcpd_nic_pci_addr) + n = 1; +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ for (i = 0; i < n; i++) { s = socket_bind_listener(t, ai); socket_init_not_established(t, s); diff --git a/stream.c b/stream.c index 3ecf96b..f5eaaa1 100644 --- a/stream.c +++ b/stream.c @@ -24,11 +24,32 @@ #include "socket.h" #include "stats.h" #include "thread.h" +#ifdef WITH_TCPDEVMEM_CUDA +#include "tcpdevmem_cuda.h" +#endif +#ifdef WITH_TCPDEVMEM_UDMABUF +#include "tcpdevmem_udmabuf.h" +#endif static void *stream_alloc(struct thread *t) { const struct options *opts = t->opts; +#ifdef WITH_TCPDEVMEM_CUDA + if (!t->f_mbuf && t->opts->tcpd_gpu_pci_addr) { + if (tcpd_cuda_setup_alloc(t->opts, &t->f_mbuf, t)) + LOG_FATAL(t->cb, "%s: failed to setup devmem CUDA socket", + __func__); + } +#endif /* WITH_TCPDEVMEM_CUDA */ +#ifdef WITH_TCPDEVMEM_UDMABUF + if (!t->f_mbuf && t->opts->tcpd_nic_pci_addr) { + if (udmabuf_setup_alloc(t->opts, &t->f_mbuf, t)) + LOG_FATAL(t->cb, "%s: failed to setup devmem UDMABUF socket", + __func__); + } +#endif /* WITH_TCPDEVMEM_UDMABUF */ + if (!t->f_mbuf) { t->f_mbuf = malloc_or_die(opts->buffer_size, t->cb); if (opts->enable_write) @@ -91,6 +112,21 @@ void stream_handler(struct flow *f, uint32_t events) if (events & EPOLLIN) do { do { +#ifdef WITH_TCPDEVMEM_CUDA + if (t->opts->tcpd_gpu_pci_addr) + n = tcpd_recv(fd, mbuf, + opts->buffer_size, + opts->recv_flags, + t); + else +#endif /* WITH_TCPDEVMEM_CUDA */ +#ifdef WITH_TCPDEVMEM_UDMABUF + if (t->opts->tcpd_nic_pci_addr) + n = udmabuf_recv(fd, mbuf, + opts->buffer_size, + t); + else +#endif /* WITH_TCPDEVMEM_UDMABUF */ n = recv(fd, mbuf, opts->buffer_size, opts->recv_flags); } while(n == -1 && errno == EINTR); @@ -110,6 +146,22 @@ void stream_handler(struct flow *f, uint32_t events) if (events & EPOLLOUT) do { +#ifdef WITH_TCPDEVMEM_CUDA + if (t->opts->tcpd_gpu_pci_addr) { + n = tcpd_send(fd, mbuf, + opts->buffer_size, + opts->send_flags, + t); + } else +#endif /* WITH_TCPDEVMEM_CUDA */ +#ifdef WITH_TCPDEVMEM_UDMABUF + if (t->opts->tcpd_nic_pci_addr) { + n = udmabuf_send(fd, mbuf, + opts->buffer_size, + opts->send_flags, + t); + } else +#endif /* WITH_TCPDEVMEM_UDMABUF */ n = send(fd, mbuf, opts->buffer_size, opts->send_flags); t->io_stats.tx_ops++; t->io_stats.tx_bytes += n > 0 ? n : 0; diff --git a/tcpdevmem.c b/tcpdevmem.c new file mode 100644 index 0000000..cb86795 --- /dev/null +++ b/tcpdevmem.c @@ -0,0 +1,113 @@ +#include +#include +#include + +#include "flow.h" +#include "lib.h" +#include "logging.h" +#include "tcpdevmem_cuda.h" +#include "tcpdevmem.h" +#include "thread.h" + +#define TEST_PREFIX "ncdevmem" +#define RETURN_IF_NON_ZERO(cmd) \ + ret = (cmd); \ + if (ret) return ret + +int driver_reset(const struct options *opts) { + char driver_reset_cmd[512]; + int ret = 0; + + sprintf(driver_reset_cmd, "ethtool --set-priv-flags %s enable-strict-header-split on", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool --set-priv-flags %s enable-strict-header-split off", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool --set-priv-flags %s enable-header-split off", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool --set-rxfh-indir %s equal 16", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool -K %s ntuple off", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool --set-priv-flags %s enable-strict-header-split off", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool --set-priv-flags %s enable-header-split off", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool -K %s ntuple off", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool --set-priv-flags %s enable-max-rx-buffer-size on", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + sprintf(driver_reset_cmd, "ethtool -K %s ntuple on", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(driver_reset_cmd)); + + return ret; +} + +int install_flow_steering(const struct options *opts, intptr_t buf, + struct thread *t) +{ + int q_start = opts->queue_start; + int q_num = opts->queue_num; + int ret; + + int num_queues = q_start + (t->index % q_num); + LOG_INFO(t->cb, "Bind to queue %i\n", num_queues); + struct dma_buf_pages_bind_rx_queue bind_cmd; + + strcpy(bind_cmd.ifname, opts->tcpd_link_name); + bind_cmd.rxq_idx = num_queues; + + ret = ioctl(buf, DMA_BUF_PAGES_BIND_RX, &bind_cmd); + if (ret < 0) + LOG_FATAL(t->cb, "FAIL, bind fail queue=%d", num_queues); + + /* using t->index below requires 1 thread listening to 1 port + * (see relevant comments in socket.c) + */ + int src_port = t->index + opts->source_port; + int dst_port = t->index + atoi(opts->port); + + char flow_steer_cmd[512]; + sprintf(flow_steer_cmd, + "ethtool -N %s flow-type tcp4 src-ip %s dst-ip %s src-port %i dst-port %i queue %i", + opts->tcpd_link_name, opts->tcpd_src_ip, opts->tcpd_dst_ip, + src_port, dst_port, num_queues); + RETURN_IF_NON_ZERO(system(flow_steer_cmd)); + + // only running the below ethtool commands after last thread/flow is setup + if (t->index == opts->num_flows - 1) + { + char ethtool_cmd[512]; + sprintf(ethtool_cmd, "ethtool --set-priv-flags %s enable-strict-header-split on", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(ethtool_cmd)); + + sprintf(ethtool_cmd, "ethtool --set-priv-flags %s enable-header-split on", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(ethtool_cmd)); + + sprintf(ethtool_cmd, "ethtool --set-rxfh-indir %s equal 8", opts->tcpd_link_name); + RETURN_IF_NON_ZERO(system(ethtool_cmd)); + + LOG_INFO(t->cb, "ethtool cmds returned %i, sleeping 1...\n", ret); + sleep(1); + } + return ret; +} + +int tcpd_setup_socket(struct thread *t, int socket) +{ + const int one = 1; + if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) || + setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) || + setsockopt(socket, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one))) + PLOG_FATAL(t->cb, "tcpd_setup_socket"); + + return 0; +} diff --git a/tcpdevmem.h b/tcpdevmem.h new file mode 100644 index 0000000..937aa26 --- /dev/null +++ b/tcpdevmem.h @@ -0,0 +1,15 @@ +#include "thread.h" +#include "lib.h" + +#define PAGE_SHIFT (12) +#define PAGE_SIZE (1 << PAGE_SHIFT) + +#ifndef MSG_SOCK_DEVMEM +#define MSG_SOCK_DEVMEM 0x2000000 /* don't copy devmem pages but return + * them as cmsg instead */ +#endif + +int driver_reset(const struct options *opts); +int install_flow_steering(const struct options *opts, intptr_t buf, + struct thread *t); +int tcpd_setup_socket(struct thread *t, int socket); diff --git a/tcpdevmem_cuda.cu b/tcpdevmem_cuda.cu new file mode 100644 index 0000000..3aaf964 --- /dev/null +++ b/tcpdevmem_cuda.cu @@ -0,0 +1,450 @@ +#define __iovec_defined 1 + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#if __cplusplus +extern "C" { +#endif + +#include "common.h" +#include "tcpdevmem_cuda.h" +#include "tcpdevmem.h" +#include "logging.h" +#include "flow.h" +#include "thread.h" + +#if __cplusplus +} +#endif + +#define LAST_PRIME 111 + +#define MIN_RX_BUFFER_TOTAL_SIZE (1 << 28) +#define GPUMEM_ALIGNMENT (1UL << 21) +#define GPUMEM_MINSZ 0x400000 + +#define multiplier (1 << 16) + +#define TEST_PREFIX "ncdevmem" +#define NUM_PAGES 16000 + +struct TcpdRxBlock { + uint64_t gpu_offset; + size_t size; + uint64_t paddr; +}; + +/* Fills buf of size n with a repeating sequence of 1 to 111 inclusive + */ +void fill_tx_buffer(void *buf, size_t n) { +#define BUFSIZE 3996 + unsigned char src_buf[BUFSIZE]; + int ptr = 0, i = 0; + + while (i < BUFSIZE) { + src_buf[i] = (i % LAST_PRIME) + 1; + i++; + } + + while (ptr*BUFSIZE + BUFSIZE < n) { + cudaMemcpy((char *)buf + ptr*BUFSIZE, &src_buf, BUFSIZE, cudaMemcpyHostToDevice); + ptr++; + } + + i = ptr*BUFSIZE; + while (i < n) { + cudaMemset((char *)buf + i, (i % LAST_PRIME) + 1, 1); + i++; + } +} + +__global__ void scatter_copy_kernel(long3* scatter_list, uint8_t* dst, + uint8_t* src) { + int block_idx = blockIdx.x; + long3 blk = scatter_list[block_idx]; + long dst_off = blk.x; + long src_off = blk.y; + long sz = blk.z; + + int thread_sz = sz / blockDim.x; + int rem = sz % blockDim.x; + bool extra = (threadIdx.x < rem); + int thread_offset = sz / blockDim.x * threadIdx.x; + thread_offset += (extra) ? threadIdx.x : rem; + + for (int i = 0; i < thread_sz; i++) { + dst[dst_off + thread_offset + i] = src[src_off + thread_offset + i]; + } + if (extra) { + dst[dst_off + thread_offset + thread_sz] = + src[src_off + thread_offset + thread_sz]; + } +} + +void gather_rx_data(struct tcpdevmem_cuda_mbuf *tmbuf) { + int ret; + void *gpu_scatter_list_ = tmbuf->gpu_scatter_list_; + std::vector *scattered_data_ = (std::vector *)tmbuf->scattered_data_; + void *gpu_rx_mem_ = tmbuf->gpu_rx_mem_; + void *rx_buff_ = tmbuf->gpu_gen_mem_; + + ret = cudaMemcpyAsync(gpu_scatter_list_, + scattered_data_->data(), + scattered_data_->size() * sizeof(long3), + cudaMemcpyHostToDevice); + if (ret) + return; + + scatter_copy_kernel<<size(), 256, 0>>>( + (long3*)gpu_scatter_list_, (uint8_t*)gpu_rx_mem_, (uint8_t*)rx_buff_); +} + +int get_gpumem_dmabuf_pages_fd(const std::string& gpu_pci_addr, + const std::string& nic_pci_addr, void* gpu_mem, + size_t gpu_mem_sz, int* dma_buf_fd, bool is_client, + struct thread *t) { + int err, ret; + + cuMemGetHandleForAddressRange((void*)dma_buf_fd, (CUdeviceptr)gpu_mem, + gpu_mem_sz, CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, + 0); + + if (*dma_buf_fd < 0) + PLOG_FATAL(t->cb, "cuMemGetHandleForAddressRange"); + + LOG_INFO(t->cb, "Registered dmabuf region 0x%p of %lu Bytes\n", + gpu_mem, gpu_mem_sz); + + struct dma_buf_create_pages_info frags_create_info; + frags_create_info.dma_buf_fd = *dma_buf_fd; + frags_create_info.create_page_pool = is_client ? 0 : 1; + + uint16_t pci_bdf[3]; + ret = sscanf(nic_pci_addr.c_str(), "0000:%hx:%hx.%hx", &pci_bdf[0], + &pci_bdf[1], &pci_bdf[2]); + frags_create_info.pci_bdf[0] = pci_bdf[0]; + frags_create_info.pci_bdf[1] = pci_bdf[1]; + frags_create_info.pci_bdf[2] = pci_bdf[2]; + if (ret != 3) { + err = -EINVAL; + goto err_close_dmabuf; + } + + ret = ioctl(*dma_buf_fd, DMA_BUF_CREATE_PAGES, &frags_create_info); + if (ret < 0) { + PLOG_ERROR(t->cb, "get dma_buf frags"); + err = -EIO; + goto err_close_dmabuf; + } + return ret; + +err_close_dmabuf: + close(*dma_buf_fd); + return err; +} + +int tcpd_cuda_setup_alloc(const struct options *opts, void **f_mbuf, struct thread *t) +{ + bool is_client = opts->client; + void *gpu_gen_mem_; + int gpu_mem_fd_; + int dma_buf_fd_; + struct tcpdevmem_cuda_mbuf *tmbuf; + const char *gpu_pci_addr = opts->tcpd_gpu_pci_addr; + const char *nic_pci_addr = opts->tcpd_nic_pci_addr; + size_t alloc_size = opts->tcpd_phys_len; + + tmbuf = + (struct tcpdevmem_cuda_mbuf *)calloc(1, sizeof(struct tcpdevmem_cuda_mbuf)); + if (!tmbuf) { + exit(EXIT_FAILURE); + } + + if (alloc_size % GPUMEM_ALIGNMENT != 0) { + alloc_size += GPUMEM_ALIGNMENT - (alloc_size % GPUMEM_ALIGNMENT); + } + + cudaMalloc(&gpu_gen_mem_, alloc_size); + if (is_client && opts->tcpd_validate) { + fill_tx_buffer(gpu_gen_mem_, alloc_size); + cudaDeviceSynchronize(); + } + unsigned int flag = 1; + cuPointerSetAttribute(&flag, + CU_POINTER_ATTRIBUTE_SYNC_MEMOPS, + (CUdeviceptr)gpu_gen_mem_); + + gpu_mem_fd_ = get_gpumem_dmabuf_pages_fd(gpu_pci_addr, nic_pci_addr, + gpu_gen_mem_, alloc_size, + &dma_buf_fd_, is_client, t); + + if (gpu_mem_fd_ < 0) + LOG_FATAL(t->cb, "get_gpumem_dmabuf_pages_fd"); + + if (!is_client) + install_flow_steering(opts, gpu_mem_fd_, t); + + *f_mbuf = tmbuf; + tmbuf->gpu_mem_fd_ = gpu_mem_fd_; + tmbuf->dma_buf_fd_ = dma_buf_fd_; + tmbuf->gpu_gen_mem_ = gpu_gen_mem_; + tmbuf->cpy_buffer = malloc(opts->buffer_size); + tmbuf->vectors = new std::vector(); + tmbuf->tokens = new std::vector(); + tmbuf->bytes_received = 0; + tmbuf->bytes_sent = 0; + + cudaMalloc(&tmbuf->gpu_rx_mem_, opts->buffer_size); + cudaMalloc(&tmbuf->gpu_scatter_list_, opts->buffer_size); + tmbuf->rx_blks_ = new std::vector(); + tmbuf->scattered_data_ = new std::vector(); + return 0; +} + +int tcpd_send(int socket, void *buf, size_t n, int flags, struct thread *t) { + int gpu_mem_fd_; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + char offsetbuf[CMSG_SPACE(sizeof(uint32_t) * 2)]; + struct tcpdevmem_cuda_mbuf *tmbuf; + + if (!buf) return -1; + + tmbuf = (struct tcpdevmem_cuda_mbuf *)buf; + gpu_mem_fd_ = tmbuf->gpu_mem_fd_; + + memset(&msg, 0, sizeof(msg)); + + iov.iov_base = NULL; + iov.iov_len = n - tmbuf->bytes_sent; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + msg.msg_control = offsetbuf; + msg.msg_controllen = sizeof(offsetbuf); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_DEVMEM_OFFSET; + cmsg->cmsg_len = CMSG_LEN(sizeof(int) * 2); + *((int*)CMSG_DATA(cmsg)) = gpu_mem_fd_; + ((int *)CMSG_DATA(cmsg))[1] = (int)tmbuf->bytes_sent; + + ssize_t bytes_sent = sendmsg(socket, &msg, MSG_ZEROCOPY | MSG_DONTWAIT); + if (bytes_sent < 0 && errno != EWOULDBLOCK && errno != EAGAIN) + PLOG_FATAL(t->cb, "sendmsg"); + + if (bytes_sent == 0) + PLOG_FATAL(t->cb, "sendmsg sent 0 bytes"); + + tmbuf->bytes_sent += bytes_sent; + if (tmbuf->bytes_sent == n) + tmbuf->bytes_sent = 0; + + return bytes_sent; +} + +int tcpd_recv(int socket, void *f_mbuf, size_t n, int flags, struct thread *t) { + struct iovec iov; + struct msghdr msg_local; + struct msghdr *msg; + struct tcpdevmem_cuda_mbuf *tmbuf; + int ret, client_fd; + int buffer_size = n; + size_t total_received = 0; + unsigned char *cpy_buffer; + const struct options *opts = t->opts; + std::vector *vectors; + std::vector *tokens; + std::vector *rx_blks_; + std::vector *scattered_data_; + + if (!f_mbuf) return -1; + + tmbuf = (struct tcpdevmem_cuda_mbuf *)f_mbuf; + cpy_buffer = (unsigned char *)tmbuf->cpy_buffer; + vectors = (std::vector *)tmbuf->vectors; + tokens = (std::vector *)tmbuf->tokens; + rx_blks_ = (std::vector *)tmbuf->rx_blks_; + scattered_data_ = (std::vector *)tmbuf->scattered_data_; + + client_fd = socket; + + char buf_dummy[n]; + char offsetbuf[CMSG_SPACE(sizeof(iov) * 10000)]; + msg = &msg_local; + + memset(msg, 0, sizeof(struct msghdr)); + + iov.iov_base = buf_dummy; + iov.iov_len = n - tmbuf->bytes_received; + msg->msg_iov = &iov; + msg->msg_iovlen = 1; + + msg->msg_control = offsetbuf; + msg->msg_controllen = sizeof(offsetbuf); + + rx_blks_->clear(); + + ssize_t received = recvmsg(socket, msg, MSG_SOCK_DEVMEM | MSG_DONTWAIT); + if (received < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + LOG_ERROR(t->cb, "%s %d: recvmsg returned < 0\n", __func__, __LINE__); + return -1; + } else if (received < 0) { + LOG_ERROR(t->cb, "%s %d\n", __func__, __LINE__); + return -1; + } else if (received == 0) { + LOG_ERROR(t->cb, "Client exited\n"); + return -1; + } + + if (msg->msg_flags & MSG_CTRUNC) { + LOG_ERROR(t->cb, "fatal, cmsg truncated, current msg_controllen"); + } + + struct cmsghdr *cm = NULL; + struct devmemvec *devmemvec = NULL; + for (cm = CMSG_FIRSTHDR(msg); cm; cm = CMSG_NXTHDR(msg, cm)) { + if (cm->cmsg_level != SOL_SOCKET || + (cm->cmsg_type != SCM_DEVMEM_OFFSET && + cm->cmsg_type != SCM_DEVMEM_HEADER)) { + continue; + } + + devmemvec = (struct devmemvec *)CMSG_DATA(cm); + + if (cm->cmsg_type == SCM_DEVMEM_HEADER) { + // TODO: process data copied from skb's linear + // buffer. + fprintf(stderr, "\n\nSCM_DEVMEM_HEADER. devmemvec->frag_size=%u\n", + devmemvec->frag_size); + exit(1); + } + + struct devmemtoken token = { devmemvec->frag_token, 1 }; + struct TcpdRxBlock blk; + + if (devmemvec->frag_size > PAGE_SIZE) + continue; + + blk.gpu_offset = (uint64_t)devmemvec->frag_offset; + blk.size = devmemvec->frag_size; + rx_blks_->emplace_back(blk); + + total_received += devmemvec->frag_size; + + vectors->emplace_back(*devmemvec); + tokens->push_back(token); + } + + size_t dst_offset = tmbuf->bytes_received; + for (int i = 0; i < rx_blks_->size(); i++) { + struct TcpdRxBlock blk = rx_blks_->at(i); + size_t off = (size_t)blk.gpu_offset; + scattered_data_->emplace_back( + make_long3((long)dst_offset, (long)off, (long)blk.size)); + + dst_offset += blk.size; + } + tmbuf->bytes_received += received; + + /* Once we've received fragments totaling buffer_size, we can copy from the + * CUDA buffer to a user-space buffer, and free the fragments in the CUDA + * buffer. + */ + if (tmbuf->bytes_received == buffer_size) { + if (opts->tcpd_rx_cpy) { + gather_rx_data(tmbuf); + cudaDeviceSynchronize(); + } + /* There is a performance impact when we cudaMemcpy from the CUDA buffer to + * the userspace buffer, so it's gated by a flag + */ + if (opts->tcpd_validate) { + for (int idx = 0; idx < vectors->size(); idx++) { + struct devmemvec vec = (*vectors)[idx]; + struct devmemtoken token = (*tokens)[idx]; + + /* copy each fragment to the cpy_buffer in order, i.e. + * 1st fragment will occuply bytes [0-4095], 2nd fragment will + * occupy bytes [4096-8191], etc. + */ + cudaMemcpy(cpy_buffer + (vec.frag_token - 1) * PAGE_SIZE, + (char *)tmbuf->gpu_gen_mem_ + vec.frag_offset, + vec.frag_size, + cudaMemcpyDeviceToHost); + } + + /* Ensure the sequence is what we expect: + * a repeating sequence of 1 to LAST_PRIME inclusive + */ + cudaDeviceSynchronize(); + int i = 0; + int expected_val; + while (i < buffer_size) { + expected_val = (i % LAST_PRIME) + 1; + if (cpy_buffer[i] != expected_val) { + LOG_WARN(t->cb, + "Thread %i - incorrect byte %i, expected %i, got %i", + t->index, + i, + expected_val, + cpy_buffer[i]); + break; + } + i++; + } + } + + ret = setsockopt(client_fd, SOL_SOCKET, + SO_DEVMEM_DONTNEED, tokens->data(), + tokens->size() * sizeof(devmemtoken)); + if (ret) + PLOG_FATAL(t->cb, "setsockopt DONTNEED failed"); + + vectors->clear(); + tokens->clear(); + rx_blks_->clear(); + scattered_data_->clear(); + tmbuf->bytes_received = 0; + } + return total_received; +} + +int cuda_flow_cleanup(void *f_mbuf) { + struct tcpdevmem_cuda_mbuf *t_mbuf = (struct tcpdevmem_cuda_mbuf *)f_mbuf; + close(t_mbuf->gpu_mem_fd_); + close(t_mbuf->dma_buf_fd_); + cudaFree(t_mbuf->gpu_gen_mem_); + free(t_mbuf->cpy_buffer); + free(t_mbuf->tokens); + free(t_mbuf->vectors); + + cudaFree(t_mbuf->gpu_rx_mem_); + cudaFree(t_mbuf->gpu_scatter_list_); + free(t_mbuf->rx_blks_); + free(t_mbuf->scattered_data_); + return 0; +} diff --git a/tcpdevmem_cuda.h b/tcpdevmem_cuda.h new file mode 100644 index 0000000..bdf12b3 --- /dev/null +++ b/tcpdevmem_cuda.h @@ -0,0 +1,41 @@ +#ifndef THIRD_PARTY_NEPER_DEVMEM_H_ +#define THIRD_PARTY_NEPER_DEVMEM_H_ + +#if __cplusplus +extern "C" { +#endif + +#include + +#include +#include + +#include "common.h" +#include "flags.h" +#include "lib.h" + +struct tcpdevmem_cuda_mbuf { + int gpu_mem_fd_; + int dma_buf_fd_; + void *gpu_gen_mem_; + void *gpu_rx_mem_; + void *gpu_scatter_list_; + void *scattered_data_; + void *rx_blks_; + void *cpy_buffer; + size_t bytes_received; + size_t bytes_sent; + void *tokens; + void *vectors; +}; + +int tcpd_cuda_setup_alloc(const struct options *opts, void **f_mbuf, struct thread *t); +int cuda_flow_cleanup(void *f_mbuf); +int tcpd_send(int socket, void *buf, size_t n, int flags, struct thread *t); +int tcpd_recv(int fd, void *f_mbuf, size_t n, int flags, struct thread *t); + +#if __cplusplus +} +#endif + +#endif // THIRD_PARTY_NEPER_DEVMEM_H_ diff --git a/tcpdevmem_udmabuf.c b/tcpdevmem_udmabuf.c new file mode 100644 index 0000000..9cbb444 --- /dev/null +++ b/tcpdevmem_udmabuf.c @@ -0,0 +1,267 @@ +#define __iovec_defined 1 + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "flow.h" +#include "lib.h" +#include "logging.h" +#include "tcpdevmem.h" +#include "tcpdevmem_udmabuf.h" +#include "thread.h" + +int udmabuf_setup_alloc(const struct options *opts, void **f_mbuf, struct thread *t) +{ + bool is_client = opts->client; + int devfd; + int memfd; + int buf; + int buf_pages; + int ret; + size_t size = opts->tcpd_phys_len; + + struct tcpdevmem_udmabuf_mbuf *tmbuf; + struct dma_buf_create_pages_info pages_create_info; + struct udmabuf_create create; + + if (f_mbuf == NULL) + return ENOMEM; + + if (*f_mbuf) + return 0; + + tmbuf = (struct tcpdevmem_udmabuf_mbuf *)calloc(1, sizeof(struct tcpdevmem_udmabuf_mbuf)); + if (!tmbuf) + LOG_FATAL(t->cb, "calloc udmabuf"); + + devfd = open("/dev/udmabuf", O_RDWR); + if (devfd < 0) + LOG_FATAL(t->cb, "[skip,no-udmabuf: Unable to access DMA buffer device file]"); + + memfd = memfd_create("udmabuf-test", MFD_ALLOW_SEALING); + if (memfd < 0) + LOG_FATAL(t->cb, "[skip,no-memfd]"); + + ret = fcntl(memfd, F_ADD_SEALS, F_SEAL_SHRINK); + if (ret < 0) + LOG_FATAL(t->cb, "[skip,fcntl-add-seals]"); + + ret = ftruncate(memfd, size); + if (ret == -1) + LOG_FATAL(t->cb, "[FAIL,memfd-truncate]"); + + memset(&create, 0, sizeof(create)); + + create.memfd = memfd; + create.offset = 0; + create.size = size; + LOG_INFO(t->cb, "udmabuf size=%lu", size); + buf = ioctl(devfd, UDMABUF_CREATE, &create); + if (buf < 0) + LOG_FATAL(t->cb, "[FAIL, create udmabuf]"); + + pages_create_info.dma_buf_fd = buf; + pages_create_info.create_page_pool = is_client ? 0 : 1; + + ret = sscanf(opts->tcpd_nic_pci_addr, "0000:%llx:%llx.%llx", + &pages_create_info.pci_bdf[0], + &pages_create_info.pci_bdf[1], + &pages_create_info.pci_bdf[2]); + + if (ret != 3) + LOG_FATAL(t->cb, "[FAIL, parse fail]"); + + buf_pages = ioctl(buf, DMA_BUF_CREATE_PAGES, &pages_create_info); + if (buf_pages < 0) + PLOG_FATAL(t->cb, "ioctl DMA_BUF_CREATE_PAGES: [FAIL, create pages fail]"); + + if (!is_client) + install_flow_steering(opts, buf_pages, t); + + struct dma_buf_sync sync = {0}; + sync.flags = DMA_BUF_SYNC_WRITE | DMA_BUF_SYNC_START; + ioctl(buf, DMA_BUF_IOCTL_SYNC, &sync); + + *f_mbuf = tmbuf; + + tmbuf->devfd = devfd; + tmbuf->memfd = memfd; + tmbuf->buf = buf; + tmbuf->buf_pages = buf_pages; + tmbuf->bytes_sent = 0; + return 0; +} + +int udmabuf_send(int socket, void *f_mbuf, size_t n, int flags, struct thread *t) +{ + int buf_pages, buf; + struct iovec iov; + struct msghdr *msg; + struct cmsghdr *cmsg; + char buf_dummy[n]; + char offsetbuf[CMSG_SPACE(sizeof(uint32_t) * 2)]; + struct tcpdevmem_udmabuf_mbuf *tmbuf; + + if (!f_mbuf) + return -1; + + tmbuf = (struct tcpdevmem_udmabuf_mbuf *)f_mbuf; + buf_pages = tmbuf->buf_pages; + buf = tmbuf->buf; + msg = &tmbuf->msg; + + struct dma_buf_sync sync = {0}; + sync.flags = DMA_BUF_SYNC_WRITE | DMA_BUF_SYNC_START; + ioctl(buf, DMA_BUF_IOCTL_SYNC, &sync); + + char *buf_mem = NULL; + buf_mem = (char *)mmap(NULL, n, PROT_READ | PROT_WRITE, MAP_SHARED, buf, 0); + if (buf_mem == MAP_FAILED) + PLOG_FATAL(t->cb, "mmap()"); + + memcpy(buf_mem, buf_dummy, n); + + sync.flags = DMA_BUF_SYNC_WRITE | DMA_BUF_SYNC_END; + ioctl(buf, DMA_BUF_IOCTL_SYNC, &sync); + + munmap(buf_mem, n); + + memset(msg, 0, sizeof(struct msghdr)); + + iov.iov_base = buf_dummy; + iov.iov_len = n - tmbuf->bytes_sent; + + msg->msg_iov = &iov; + msg->msg_iovlen = 1; + + msg->msg_control = offsetbuf; + msg->msg_controllen = sizeof(offsetbuf); + + cmsg = CMSG_FIRSTHDR(msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_DEVMEM_OFFSET; + cmsg->cmsg_len = CMSG_LEN(sizeof(int) * 2); + *((int *)CMSG_DATA(cmsg)) = buf_pages; + ((int *)CMSG_DATA(cmsg))[1] = (int)tmbuf->bytes_sent; + + ssize_t bytes_sent = sendmsg(socket, msg, MSG_ZEROCOPY); + if (bytes_sent < 0 && errno != EWOULDBLOCK && errno != EAGAIN) + PLOG_FATAL(t->cb, "sendmsg"); + + if (bytes_sent == 0) + PLOG_FATAL(t->cb, "sendmsg sent 0 bytes"); + + tmbuf->bytes_sent += bytes_sent; + if (tmbuf->bytes_sent == n) + tmbuf->bytes_sent = 0; + + return bytes_sent; +} + +int udmabuf_recv(int socket, void *f_mbuf, size_t n, struct thread *t) +{ + struct tcpdevmem_udmabuf_mbuf *tmbuf = (struct tcpdevmem_udmabuf_mbuf *)f_mbuf; + bool is_devmem = false; + size_t total_received = 0; + size_t page_aligned_frags = 0; + size_t non_page_aligned_frags = 0; + unsigned long flow_steering_flakes = 0; + + char iobuf[819200]; + char ctrl_data[sizeof(int) * 20000]; + + struct msghdr msg = {0}; + struct iovec iov = {.iov_base = iobuf, + .iov_len = sizeof(iobuf)}; + + if (!f_mbuf) + return -1; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = ctrl_data; + msg.msg_controllen = sizeof(ctrl_data); + ssize_t ret = recvmsg(socket, &msg, MSG_SOCK_DEVMEM); + if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + return -1; + } + if (ret < 0) + PLOG_FATAL(t->cb, "recvmsg:"); + + if (ret == 0) { + LOG_ERROR(t->cb, "client exited"); + return -1; + } + + struct cmsghdr *cm = NULL; + struct devmemvec *devmemvec = NULL; + for (cm = CMSG_FIRSTHDR(&msg); cm; cm = CMSG_NXTHDR(&msg, cm)) { + if (cm->cmsg_level != SOL_SOCKET || + (cm->cmsg_type != SCM_DEVMEM_OFFSET && + cm->cmsg_type != SCM_DEVMEM_HEADER)) { + LOG_ERROR(t->cb, "found weird cmsg"); + continue; + } + is_devmem = true; + + devmemvec = (struct devmemvec *)CMSG_DATA(cm); + + if (cm->cmsg_type == SCM_DEVMEM_HEADER) + // TODO: process data copied from skb's linear + // buffer. + LOG_FATAL(t->cb, + "SCM_DEVMEM_HEADER. devmemvec->frag_size=%u", + devmemvec->frag_size); + + struct devmemtoken token = {devmemvec->frag_token, 1}; + + total_received += devmemvec->frag_size; + + if (devmemvec->frag_size % PAGE_SIZE) + non_page_aligned_frags++; + else + page_aligned_frags++; + + struct dma_buf_sync sync = {0}; + sync.flags = DMA_BUF_SYNC_READ | DMA_BUF_SYNC_START; + ioctl(tmbuf->buf, DMA_BUF_IOCTL_SYNC, &sync); + + sync.flags = DMA_BUF_SYNC_READ | DMA_BUF_SYNC_END; + ioctl(tmbuf->buf, DMA_BUF_IOCTL_SYNC, &sync); + + ret = setsockopt(socket, SOL_SOCKET, + SO_DEVMEM_DONTNEED, &token, + sizeof(token)); + if (ret) + PLOG_FATAL(t->cb, "DONTNEED failed"); + } + + if (!is_devmem) { + flow_steering_flakes++; + is_devmem = false; + total_received += ret; + } + if (flow_steering_flakes) + LOG_WARN(t->cb, "total_received=%lu flow_steering_flakes=%lu", + total_received, flow_steering_flakes); + + return total_received; +} + +void udmabuf_flow_cleanup(void *f_mbuf) { + struct tcpdevmem_udmabuf_mbuf *t_mbuf = (struct tcpdevmem_udmabuf_mbuf *)f_mbuf; + + close(t_mbuf->buf_pages); + close(t_mbuf->buf); + close(t_mbuf->memfd); + close(t_mbuf->devfd); +} diff --git a/tcpdevmem_udmabuf.h b/tcpdevmem_udmabuf.h new file mode 100644 index 0000000..391caf1 --- /dev/null +++ b/tcpdevmem_udmabuf.h @@ -0,0 +1,38 @@ +#ifndef THIRD_PARTY_NEPER_DEVMEM_UDMABUF_H_ +#define THIRD_PARTY_NEPER_DEVMEM_UDMABUF_H_ + +#if __cplusplus +extern "C" { +#endif + +#include + +#include "common.h" +#include "flags.h" +#include "lib.h" + +#define UDMABUF_CREATE _IOW('u', 0x42, struct udmabuf_create) + +struct tcpdevmem_udmabuf_mbuf { + struct msghdr msg; + int dmabuf_fd; + int pages_fd; + + int devfd; + int memfd; + int buf; + int buf_pages; + size_t bytes_sent; +}; + +int udmabuf_setup_alloc(const struct options *opts, void **f_mbuf, + struct thread *t); +int udmabuf_send(int socket, void *f_mbuf, size_t n, int flags, struct thread *t); +int udmabuf_recv(int socket, void *f_mbuf, size_t n, struct thread *t); +void udmabuf_flow_cleanup(void *f_mbuf); + +#if __cplusplus +} +#endif + +#endif // THIRD_PARTY_NEPER_DEVMEM_UDMABUF_H_ diff --git a/thread.c b/thread.c index dc490a9..9420b6a 100644 --- a/thread.c +++ b/thread.c @@ -29,6 +29,9 @@ #include "rusage.h" #include "snaps.h" #include "stats.h" +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) +#include "tcpdevmem.h" +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ #include "thread.h" #ifndef NO_LIBNUMA @@ -360,6 +363,14 @@ void start_worker_threads(struct options *opts, struct callbacks *cb, allowed_cores = get_cpuset(cpuset, cb); LOG_INFO(cb, "Number of allowed_cores = %d", allowed_cores); +#if defined(WITH_TCPDEVMEM_CUDA) || defined(WITH_TCPDEVMEM_UDMABUF) + /* perform driver reset (on host) in anticipation of TCPDEVMEM run */ + if (opts->tcpd_nic_pci_addr && !opts->client) { + if (driver_reset(opts)) + LOG_FATAL(cb, "TCPDEVMEM driver reset failed"); + } +#endif /* WITH_TCPDEVMEM_CUDA || WITH_TCPDEVMEM_UDMABUF */ + for (i = 0; i < opts->num_threads; i++) { t[i].index = i; t[i].fn = fn;