use std::{path::{Path, PathBuf}, fs::{File, self, OpenOptions}, io::{BufWriter, Write, self, Read, BufRead}, net::Ipv4Addr, process::{Command, self, Stdio}, time::{Instant, Duration}, thread}; use json::JsonValue; use log::info; use rand::prelude::*; use anyhow::{Context, ensure, anyhow}; mod args; mod error; pub use crate::run::args::*; pub use crate::run::error::*; pub use crate::run::args::TestType::*; pub use crate::run::args::FilterType::*; pub const BENCH_BASE_PATH: &str = "./bench"; pub const BENCH_DATA_PATH: &str = "data"; pub const BENCH_BIN_PATH: &str = "bin"; pub const BENCH_LOG_PATH: &str = "log"; const XDP_LOAD_TIMEOUT_SECS: u64 = 5; const PRIVILEGE_RUNNER: [&str;1] = ["sudo"]; fn log(log_path: &Path, reader: &mut R, name: &str) -> anyhow::Result<()> where R: ?Sized, R: Read { fs::create_dir_all(log_path)?; let path = log_path.join(format!("{}.log", name)); let mut file = OpenOptions::new().append(true).create(true).open(&path) .context(format!("Failed to create logfile {}", path.to_str().unwrap()))?; io::copy(reader, &mut file) .context(format!("Failed to write to logfile {}", path.to_str().unwrap()))?; Ok(()) } fn log_both(log_path: &Path, stderr: &mut R1, stdout: &mut R2, name: &str) -> anyhow::Result<()> where R1: ?Sized, R2: ?Sized, R1: Read, R2: Read { let stderr_name = format!("{}.stderr", name); log(log_path, stderr, &stderr_name.as_str())?; let stdout_name = format!("{}.stdout", name); log(log_path, stdout, &stdout_name.as_str())?; Ok(()) } pub fn run() -> Result<(), anyhow::Error> { clean().context("Cleaning bench folder")?; let cores: u32 = 4; let seed: u64 = 0x1337133713371337; let scan_sizes: Vec = vec![8, 16];//, 24];//,32]; // TODO 8 only for test purposes // let scan_sizes: Vec = vec![24]; // let hit_rates: Vec = vec![0.001, 0.0032,0.01,0.032,0.1]; let hit_rates: Vec = vec![0.02]; // let false_positive_rates: Vec = vec![Baseline, EmptyFilter,Normal(0.1),Normal(0.01),Normal(0.001),Normal(0.0001)]; let false_positive_rates: Vec = vec![Baseline, Normal(0.001), BpfStats(0.001)]; let filter_types: Vec = vec![Bitmap, Bloom]; let baseline_filter_types: Vec = vec![Bitmap]; // let scan_rates: Vec = vec![316_000, 562_000, 1_000_000, 1_780_000, 3_160_000]; let scan_rates: Vec = vec![500000, 629463, 792447, 997631, 1255943, 1581139, 1990536, 2505936, 3154787, 3971641, 5000000]; for scan_size in &scan_sizes { for hit_rate in &hit_rates { let data_args = DataArgs::from(seed, *scan_size, *hit_rate); if data_args.entries == 0 { info!("Skipping {}; no entries", data_args); continue; } info!("Building IP file for {}", data_args); let (ip_file_path, subnet) = build_ip_file(data_args) .context(format!("Building ip file for {}", data_args))?; info!("subnet for {} is {}", data_args, subnet); for test_type in &false_positive_rates { let filter_types = match test_type { Normal(_) | BpfStats(_) => { &filter_types }, Baseline => { &baseline_filter_types } }; for filter_type in filter_types { let bloom_args = FilterArgs::from(data_args, *test_type, *filter_type); info!("Building binaries for {} {}", data_args, bloom_args); build_binaries(data_args, bloom_args) .context(format!("Failed to build binaries for {} {}", data_args, bloom_args))?; let filter_path = match test_type { Normal(_) | BpfStats(_) => { info!("Building filter for {} {}", data_args, bloom_args); let filter_path = build_filter(data_args, bloom_args, *filter_type, ip_file_path.as_path()) .context(format!("Failed to build filter for {} {}", data_args, bloom_args))?; Some(filter_path) }, Baseline => { None } }; for scan_rate in &scan_rates { let scan_args = ScanArgs::new(*scan_rate); let args = BenchArgs {data_args, bloom_filter_args: bloom_args, scan_args}; let run_output = (|| { fs::create_dir_all(args.wd_path()) .context("Failed to create wd") .map_err(|e| (None, e))?; let (handle, stderr_handle, stdout_handle) = match test_type { Normal(_) | BpfStats(_) => { info!("Loading XDP program for {}", args); let (handle, stderr_handle, stdout_handle) = load_xdp(args, filter_path.clone().unwrap().as_path()) .map_err(|(h, e)| (h, e.context(format!("Loading XDP program for {}", args))))?; (Some(handle), Some(stderr_handle), Some(stdout_handle)) }, Baseline => { info!("Not loading XDP program for {}", args); (None, None, None) } }; if let BpfStats(_) = test_type { info!("Enabling bpf_stats"); if let Err(e) = set_bpf_stats(args, true) { return Err((handle, e)); } } info!("Running zmap for {}", args); let zmap_result = run_zmap(args, subnet) .context(format!("Running zmap for {}", args)); if let Err(e) = zmap_result { return Err((handle, e)); } let zmap_output = zmap_result.unwrap(); let bpf_stats = match test_type { BpfStats(_) => { info!("Disabling and collecting bpf_stats"); if let Err(e) = set_bpf_stats(args, false) { return Err((handle, e)); } let bpf_stats_result = read_bpf_stats(args) .context(format!("Failed to read bpf stats for {}", args)); if let Err(e) = bpf_stats_result { return Err((handle, e)); } let bpf_stats = bpf_stats_result.unwrap(); Some(bpf_stats) } _ => { None } }; let responder_output = match test_type { BpfStats(_) | Normal(_) => { info!("Telling 'responder' to unload XDP"); let responder_output = unload_xdp(args, handle.unwrap()) .map_err(|(h, e)| (h, e.context(format!("Could not successfully unload XDP program for {}", args))))?; Some(responder_output) } Baseline => { None } }; Ok((zmap_output, responder_output, bpf_stats, stderr_handle, stdout_handle)) })(); let (zmap_output, _responder_output, bpf_stats, responder_stderr_handle, responder_stdout_handle) = run_output.map_err(|(handle, e)| { if let Some(mut handle) = handle { let kill_result = handle.kill(); if let Err(kill_e) = kill_result { return anyhow!(kill_e) .context(e) .context(format!("Failed to kill responder process for {}; Killed because of reason below", args)); } } e })?; if let Some(responder_stderr_handle) = responder_stderr_handle { responder_stderr_handle.join() .map_err(|_| anyhow!("stderr thread panicked")) .and(responder_stdout_handle.unwrap().join() .map_err(|_| anyhow!("stdout thread panicked"))) .and_then(|res| res) .context(format!("Error occured in a log thread for {}", args))?; } File::create(args.wd_path().join("zmap_stats.txt")) .context(format!("Failed to create zmap_stats.txt file for {}", args))? .write_all(&zmap_output.stderr.as_slice()) .context(format!("Failed to write to zmap_stats.txt file for {}", args))?; if let Some(bpf_stats) = bpf_stats { File::create(args.wd_path().join("bpf_stats.json")) .context(format!("Failed to create bpf_stats.json file for {}", args))? .write_all(format!("{{\"run_count\": {}, \"run_time\": {}, \"mem_lock\": {}}}", bpf_stats.0, bpf_stats.1, bpf_stats.2).as_bytes()) .context(format!("Failed to write to bpf_stats.json file for {}", args))?; } } } } } } Ok(()) } fn clean() -> anyhow::Result<()> { let path = PathBuf::from(BENCH_BASE_PATH); if (&path).exists() { fs::remove_dir_all(&path).context(format!("Failed to clean path: {:?}", &path))?; } Ok(()) } fn next_ip(rng: &mut SmallRng, mask: u32) -> u32 { loop { let ip = rng.next_u32() & mask; if ip & 0xff000000 != 0x7f000000 { // can not have ips in 127.0.0.0/8 break ip; } } } fn build_ip_file(data_args: DataArgs) -> anyhow::Result<(PathBuf, Ipv4Addr)> { let mut path = PathBuf::new(); path.push(BENCH_BASE_PATH); path.push(BENCH_DATA_PATH); path.push(data_args.rel_path()); path.push("ips.txt"); fs::create_dir_all(path.parent().unwrap())?; let ip_file = File::create(&path)?; let mut writer = BufWriter::new(ip_file); let mut rng = SmallRng::seed_from_u64(data_args.seed); let lower_subnet_mask = ((1u64 << (data_args.scan_subnet_size)) - 1u64) as u32; let upper_subnet_mask = u32::MAX - lower_subnet_mask; let subnet = next_ip(&mut rng, upper_subnet_mask); for _ in 0..data_args.entries { let ip = subnet | next_ip(&mut rng, lower_subnet_mask); writer.write(Ipv4Addr::from(ip).to_string().as_bytes())?; writer.write(b"\n")?; } Ok((path, Ipv4Addr::from(subnet))) } fn build_binaries(data_args: DataArgs, bloom_args: FilterArgs) -> anyhow::Result<()> { let bin_path = BenchArgs::bin_bin_path(data_args, bloom_args); fs::create_dir_all(&bin_path).context("Failed to create bench dir")?; let output = Command::new("cargo") .args([ "xtask", "build-artifacts", "--output-folder", bin_path.to_str().unwrap() ]) .env("BLOOMFILTER_ADDRESS_BITS", bloom_args.address_bits.to_string()) .env("BLOOMFILTER_ADDRESS_BITS_CHUNK", bloom_args.chunk_address_bits.to_string()) .env("BLOOMFILTER_HASH_COUNT", bloom_args.hash_count.unwrap_or(0).to_string()) .stdin(Stdio::null()) .stderr(Stdio::piped()) .stdout(Stdio::null()) .output() .context("Failed to run cargo xtask build-artifacts")?; let log_path = BenchArgs::bin_log_path(data_args, bloom_args); log_both(&log_path, &mut output.stderr.as_slice(), &mut output.stdout.as_slice(), "build-artifacts")?; ensure!(output.status.success(), CommandError::new(output, log_path)); Ok(()) } fn build_filter(data_args: DataArgs, bloom_args: FilterArgs, filter_type: FilterType, ip_file_path: &Path) -> anyhow::Result { let filter_file = match filter_type { Bloom => "ips.bfb", Bitmap => "ips.fb", }; let path = BenchArgs::bin_wd_path(data_args, bloom_args).join(filter_file); fs::create_dir_all(path.parent().unwrap()).context("Failed to create bench dir")?; let filter_type_string = filter_type.to_string().to_lowercase(); let output = Command::new("/usr/bin/time") .args([ // time args "-o", BenchArgs::bin_wd_path(data_args, bloom_args).join("filter_extern_time.json").to_str().unwrap(), "--format", "{\"clock\": %e, \"cpu_p\": \"%P\", \"kernel_s\": %S, \"user_s\": %U}", // actual command BenchArgs::bin_bin_path(data_args, bloom_args).join("tools/build_filter").to_str().unwrap(), "--force", "--timing-path", BenchArgs::bin_wd_path(data_args, bloom_args).join("filter_intern_time.json").to_str().unwrap(), filter_type_string.as_str(), ip_file_path.to_str().unwrap(), path.to_str().unwrap() ]) .env("RUST_LOG", "info") .stdin(Stdio::null()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .output() .context("Failed to run build_filter binary")?; let log_path = BenchArgs::bin_log_path(data_args, bloom_args); log_both(&log_path, &mut output.stderr.as_slice(), &mut output.stdout.as_slice(), "build-filter")?; ensure!(output.status.success(), CommandError::new(output, log_path)); Ok(path) } type LogJoinHandle = thread::JoinHandle>; fn load_xdp(bench_args: BenchArgs, filter_path: &Path) -> Result<(process::Child, LogJoinHandle, LogJoinHandle), (Option, anyhow::Error)> { let responder_path = bench_args.bin_path().join("responder"); let target= bench_args.bin_path().join("ebpf"); let fd_info_out_path = bench_args.wd_path().join("responder_info.json"); let filter_type = bench_args.bloom_filter_args.filter_type.to_string().to_lowercase(); let mut args = Vec::from(PRIVILEGE_RUNNER); args.extend_from_slice(&[ responder_path.to_str().unwrap(), "--filter-path", filter_path.to_str().unwrap(), "--target", target.to_str().unwrap(), "--fd-info-out-path", fd_info_out_path.to_str().unwrap(), "--filter-type", filter_type.as_str(), ]); println!("{:?}", args); let mut handle = Command::new(args.remove(0)) .args(args) .env("RUST_LOG", "info") .stdin(Stdio::piped()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .spawn() .context("Failed to run responder to load XDP") .map_err(|e| (None, e))?; let mut stderr = handle.stderr.take().unwrap(); let mut stdout = handle.stdout.take().unwrap(); let stderr_handle = thread::spawn(move || log(&bench_args.log_path(), &mut stderr, "responder.stderr")); let stdout_handle = thread::spawn(move || log(&bench_args.log_path(), &mut stdout, "responder.stdout")); if let Err(e) = try_wait_xdp_loaded(bench_args, &mut handle) { return Err((Some(handle), e)); } return Ok((handle, stderr_handle, stdout_handle)); } fn try_wait_xdp_loaded(_bench_args: BenchArgs, handle: &mut process::Child) -> anyhow::Result<()> { let start = Instant::now(); let mut last_ip_link = None; while start.elapsed().as_secs() < XDP_LOAD_TIMEOUT_SECS { if let Some(_) = handle.try_wait()? { return Err(anyhow!("Responder exited too early")); } let output = Command::new("ip") .args(["link", "show", "lo"]) .output() .context("Failed to run 'ip link show lo'")?; let ip_link_info = String::from_utf8(output.stdout)?; last_ip_link = Some(ip_link_info.clone()); if let Some(l) = ip_link_info.lines().skip(2).next() { if let Some(id) = l.strip_prefix(" prog/xdp id") { info!("XDP loaded; id:{}", id); return Ok(()) } } thread::sleep(Duration::from_millis(100)); } Err(anyhow!( "XDP program did not load within timeout ({}); last ip link show lo info: {}", XDP_LOAD_TIMEOUT_SECS, last_ip_link.unwrap_or(String::from("no ip link info")) )) } fn unload_xdp(bench_args: BenchArgs,mut handle: process::Child) -> Result, anyhow::Error)> { let result = handle.stdin.take().unwrap().write(&[b'\n']); if let Err(e) = result { return Err((Some(handle), anyhow!(e))); } let output = handle.wait_with_output().map_err(|e| (None, anyhow!(e)))?; if !output.status.success() { return Err((None, anyhow!(CommandError::new(output, bench_args.log_path())))); } return Ok(output); } fn set_bpf_stats(bench_args: BenchArgs, enabled: bool) -> anyhow::Result<()> { let setting = format!("kernel.bpf_stats_enabled={}", if enabled {1} else {0}); let mut args = Vec::from(PRIVILEGE_RUNNER); args.extend_from_slice(&[ "sysctl", "-w", setting.as_str() ]); let output = Command::new(args.remove(0)) .args(args) .stdin(Stdio::null()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .output() .context("Failed to run sysctl")?; let name = format!("sysctl_{}", if enabled {"enable"} else {"disable"}); log_both(&bench_args.log_path(), &mut output.stderr.as_slice(), &mut output.stdout.as_slice(), name.as_str())?; ensure!(output.status.success(), CommandError::new(output, bench_args.log_path())); Ok(()) } fn read_bpf_stats(bench_args: BenchArgs) -> anyhow::Result<(u128, u128, u128)> { // TODO Also collect memlock let mut info = vec![]; File::open(bench_args.wd_path().join("responder_info.json")) .context("Failed to open responder_info.json file")? .read_to_end(&mut info)?; let info = json::parse(String::from_utf8(info)?.as_str())?; if let JsonValue::Object(o) = info { let fd = o.get("fd").ok_or(anyhow!("No key fd found in responder_info.json file"))?.as_u64().unwrap(); let pid = o.get("pid").ok_or(anyhow!("No key pid found in responder_info.json file"))?.as_u64().unwrap(); let mut path = PathBuf::from("/proc"); path.push(pid.to_string()); path.push("fdinfo"); path.push(fd.to_string()); let mut args = Vec::from(PRIVILEGE_RUNNER); args.extend_from_slice(&[ "cat", path.to_str().unwrap() ]); let output = Command::new(args.remove(0)) .args(args) .stdin(Stdio::null()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .output() .context("Failed to read fd info from /proc/[pid]/fdinfo/[fd]")?; log_both(&bench_args.log_path(), &mut output.stderr.as_slice(), &mut output.stdout.as_slice(), "procfs")?; ensure!(output.status.success(), CommandError::new(output, bench_args.log_path())); let mut run_time: Option = None; let mut run_count: Option = None; let mut mem_lock: Option = None; for line in output.stdout.lines() { let line = line?; if let Some(run_time_str) = line.as_str().strip_prefix("run_time_ns:") { run_time = Some(run_time_str.trim().parse()?); } else if let Some(run_count_str) = line.as_str().strip_prefix("run_cnt:") { run_count = Some(run_count_str.trim().parse()?); } else if let Some(mem_lock_str) = line.as_str().strip_prefix("memlock:") { mem_lock = Some(mem_lock_str.trim().parse()?); } } return match (run_count, run_time, mem_lock) { (None, _, _) => Err(anyhow!("Could not read run_cnt from fdinfo file")), (_, None, _) => Err(anyhow!("Could not read run_time_ns from fdinfo file")), (_, _, None) => Err(anyhow!("Could not read mem_lock from fdinfo file")), (Some(run_count), Some(run_time), Some(mem_lock)) => Ok((run_count, run_time, mem_lock)) } } else { return Err(anyhow!("Could not read json object from responder_info.json file")); } } fn run_zmap(bench_args: BenchArgs, subnet: Ipv4Addr) -> anyhow::Result { let subnet_string = format!("{}/{}",subnet, 32 - bench_args.data_args.scan_subnet_size); let output_file = bench_args.wd_path().join("zmap_out_ips.txt"); let rate_string = bench_args.scan_args.rate.to_string(); let interface = match bench_args.bloom_filter_args.test_type { Baseline => "dummyif", _ => "lo", }; let seed = bench_args.data_args.seed.to_string(); let mut args = Vec::from(PRIVILEGE_RUNNER); args.extend_from_slice(&[ "zmap", subnet_string.as_str(), "--target-port=80", "--interface", interface, "--gateway-mac=00:00:00:00:00:00", "--output-file", output_file.to_str().unwrap(), "--rate", rate_string.as_str(), "--sender-threads=7", "--cooldown-time=1", "--seed", seed.as_str(), "--blacklist-file=blocklist", "--max-sendto-failures=-1" ]); let output = Command::new(args.remove(0)) .args(args) .stdin(Stdio::null()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .output() .context("Failed to run zmap")?; log_both(&bench_args.log_path(), &mut output.stderr.as_slice(), &mut output.stdout.as_slice(), "zmap")?; ensure!(output.status.success(), CommandError::new(output, bench_args.log_path())); return Ok(output); }