Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ Cargo.lock
#Cargo.lock

.idea

.devcontainer/
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ backtrace = { version = "0.3" }
once_cell = "1.9"
libc = "^0.2.66"
log = "0.4"
nix = { version = "0.26", default-features = false, features = ["signal", "fs"] }
nix = { version = "0.26", default-features = false, features = [
"signal",
"fs",
] }
parking_lot = "0.12"
tempfile = "3.1"
thiserror = "1.0"
findshlibs = "0.10"
cfg-if = "1.0"
smallvec = "1.7"

inferno = { version = "0.11", default-features = false, features = ["nameattr"], optional = true }
inferno = { version = "0.11", default-features = false, features = [
"nameattr",
], optional = true }
prost = { version = "0.11", optional = true }
prost-derive = { version = "0.11", optional = true }
protobuf = { version = "2.0", optional = true }
criterion = {version = "0.4", optional = true}
criterion = { version = "0.4", optional = true }

[target.'cfg(target_os = "linux")'.dependencies]
errno = { version = "0.2.8" }

[dependencies.symbolic-demangle]
version = "10.1"
Expand Down
154 changes: 133 additions & 21 deletions src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ use std::convert::TryInto;
use std::os::raw::c_int;
use std::time::SystemTime;

#[cfg(not(target_os = "linux"))]
use nix::sys::signal;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use smallvec::SmallVec;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

#[cfg(any(
target_arch = "x86_64",
target_arch = "aarch64",
Expand All @@ -32,7 +36,7 @@ pub struct Profiler {
pub(crate) data: Collector<UnresolvedFrames>,
sample_counter: i32,

running: bool,
running: Arc<AtomicBool>,

#[cfg(all(any(
target_arch = "x86_64",
Expand Down Expand Up @@ -137,10 +141,19 @@ impl ProfilerGuardBuilder {
}

match profiler.start() {
Ok(()) => Ok(ProfilerGuard::<'static> {
profiler: &PROFILER,
timer: Some(Timer::new(self.frequency)),
}),
Ok(()) => {
#[cfg(target_os = "linux")]
{
let running_arc = profiler.running.clone();
std::thread::spawn(move || {
fire_rt_signals(running_arc, self.frequency as u64)
});
}
Ok(ProfilerGuard::<'static> {
profiler: &PROFILER,
timer: Some(Timer::new(self.frequency)),
})
}
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -176,6 +189,7 @@ impl ProfilerGuard<'_> {

impl<'a> Drop for ProfilerGuard<'a> {
fn drop(&mut self) {
#[cfg(not(target_os = "linux"))]
drop(self.timer.take());

match self.profiler.write().as_mut() {
Expand Down Expand Up @@ -368,7 +382,7 @@ impl Profiler {
Ok(Profiler {
data: Collector::new()?,
sample_counter: 0,
running: false,
running: Arc::new(AtomicBool::new(false)),

#[cfg(all(any(
target_arch = "x86_64",
Expand Down Expand Up @@ -399,11 +413,11 @@ impl Profiler {
impl Profiler {
pub fn start(&mut self) -> Result<()> {
log::info!("starting cpu profiler");
if self.running {
if self.running.load(Ordering::SeqCst) {
Err(Error::Running)
} else {
self.register_signal_handler()?;
self.running = true;
self.running.store(true, Ordering::SeqCst);

Ok(())
}
Expand All @@ -412,14 +426,14 @@ impl Profiler {
fn init(&mut self) -> Result<()> {
self.sample_counter = 0;
self.data = Collector::new()?;
self.running = false;
self.running.store(false, Ordering::SeqCst);

Ok(())
}

pub fn stop(&mut self) -> Result<()> {
log::info!("stopping cpu profiler");
if self.running {
if self.running.load(Ordering::SeqCst) {
self.unregister_signal_handler()?;
self.init()?;

Expand All @@ -430,22 +444,52 @@ impl Profiler {
}

fn register_signal_handler(&self) -> Result<()> {
let handler = signal::SigHandler::SigAction(perf_signal_handler);
let sigaction = signal::SigAction::new(
handler,
// SA_RESTART will only restart a syscall when it's safe to do so,
// e.g. when it's a blocking read(2) or write(2). See man 7 signal.
signal::SaFlags::SA_SIGINFO | signal::SaFlags::SA_RESTART,
signal::SigSet::empty(),
);
unsafe { signal::sigaction(signal::SIGPROF, &sigaction) }?;
#[cfg(target_os = "linux")]
{
use std::mem::MaybeUninit;
let mut sigaction: libc::sigaction = unsafe { MaybeUninit::zeroed().assume_init() };
sigaction.sa_sigaction = perf_signal_handler as usize;
sigaction.sa_flags = libc::SA_RESTART | libc::SA_SIGINFO;

unsafe {
let res = libc::sigaction(
libc::SIGRTMIN(),
&sigaction as *const _,
std::ptr::null::<libc::sigaction>() as *mut libc::sigaction,
);
if res == -1 {
return Err(Error::NixError(nix::errno::from_i32(errno::errno().0)));
}
};
}

#[cfg(not(target_os = "linux"))]
{
let handler = signal::SigHandler::SigAction(perf_signal_handler);
let sigaction = signal::SigAction::new(
handler,
// SA_RESTART will only restart a syscall when it's safe to do so,
// e.g. when it's a blocking read(2) or write(2). See man 7 signal.
signal::SaFlags::SA_SIGINFO | signal::SaFlags::SA_RESTART,
signal::SigSet::empty(),
);
unsafe { signal::sigaction(signal::SIGPROF, &sigaction) }?;
}

Ok(())
}

fn unregister_signal_handler(&self) -> Result<()> {
let handler = signal::SigHandler::SigIgn;
unsafe { signal::signal(signal::SIGPROF, handler) }?;
#[cfg(target_os = "linux")]
unsafe {
let res = libc::signal(libc::SIGRTMIN(), libc::SIG_IGN);
if res == libc::SIG_ERR {
return Err(Error::NixError(nix::errno::from_i32(errno::errno().0)));
}
};

#[cfg(not(target_os = "linux"))]
unsafe { signal::signal(signal::SIGPROF, signal::SigHandler::SigIgn) }?;

Ok(())
}
Expand All @@ -465,6 +509,74 @@ impl Profiler {
}
}

// The kernel thread ID ( the same returned by a call to gettid(2))
// is not the same thing as the thread ID returned by pthread_self().
//
// Here we fetch a list of kernel thread IDs from /proc/{pid}/task
#[cfg(target_os = "linux")]
fn get_tids(pid_t: u32) -> Vec<u32> {
std::fs::read_dir(format!("/proc/{}/task", pid_t))
.unwrap()
.into_iter()
.filter_map(|entry| entry.map_or(None, |entry| entry.file_name().into_string().ok()))
.filter_map(|tid| tid.parse::<u32>().ok())
.collect()
}

#[cfg(target_os = "linux")]
fn fire_rt_signals(running: Arc<AtomicBool>, frequency: u64) {
use std::{thread, time::Duration};

let pid = nix::unistd::Pid::this().as_raw() as u32;
let prof_tid = unsafe { libc::syscall(libc::SYS_gettid) as u32 };

let interval = 1e6 as u64 / frequency;
let tv_usec = interval % 1e6 as u64;

while running.load(Ordering::SeqCst) {
let tids = get_tids(pid);
for tid in tids {
// skip the thread running this function
if tid == prof_tid {
continue;
}
signal_thread(pid, tid, libc::SIGRTMIN());
thread::sleep(Duration::from_micros(tv_usec));
}
}
}

// Sends signal @signum to thread @tid of process group @pid
// Returns -1 on a failure and sets errno appropriately
// (see man rt_tgsigqueueinfo). Retuns 0 on success.
#[cfg(target_os = "linux")]
fn signal_thread(pid: u32, tid: u32, signum: i32) -> isize {
let mut siginfo: libc::siginfo_t = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };

siginfo.si_signo = signum;
siginfo.si_code = -2; // SI_QUEUE code

// NB: we can't use the sigqueue() syscall to deliver a signal to a precise
// thread since the kernel is free to deliver such a signal to any thread of that
// process group.
// We can't use pthread_sigqueue syscall neither since it expects a p_thread
// while we've collected Kernel Thread IDs (tid).
// We will use the rt_tgsigqueueinfo instead, that sends the signal and data to the
// single thread specified by the combination of tgid, a thread group ID, and tid,
// a thread in that thread group.
//
// see: https://man7.org/linux/man-pages/man2/rt_sigqueueinfo.2.html
unsafe {
libc::syscall(
libc::SYS_rt_tgsigqueueinfo,
pid as usize,
tid as usize,
signum as usize,
&siginfo as *const _ as usize,
) as isize
}
}

#[cfg(test)]
#[cfg(target_os = "linux")]
mod tests {
Expand Down
46 changes: 30 additions & 16 deletions src/timer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::os::raw::c_int;
#[cfg(not(target_os = "linux"))]
use std::ptr::null_mut;
use std::time::{Duration, Instant, SystemTime};

Expand All @@ -18,10 +19,12 @@ struct Itimerval {
pub it_value: Timeval,
}

#[cfg(not(target_os = "linux"))]
extern "C" {
fn setitimer(which: c_int, new_value: *mut Itimerval, old_value: *mut Itimerval) -> c_int;
}

#[cfg(not(target_os = "linux"))]
const ITIMER_PROF: c_int = 2;

pub struct Timer {
Expand All @@ -32,23 +35,29 @@ pub struct Timer {

impl Timer {
pub fn new(frequency: c_int) -> Timer {
let interval = 1e6 as i64 / i64::from(frequency);
let it_interval = Timeval {
tv_sec: interval / 1e6 as i64,
tv_usec: interval % 1e6 as i64,
};
let it_value = it_interval.clone();
// we need to set a timer only for non-linux OS
// since for linux we'll have an external thread
// sending real-time signals to the existing threads
#[cfg(not(target_os = "linux"))]
{
let interval = 1e6 as i64 / i64::from(frequency);
let it_interval = Timeval {
tv_sec: interval / 1e6 as i64,
tv_usec: interval % 1e6 as i64,
};
let it_value = it_interval.clone();

unsafe {
setitimer(
ITIMER_PROF,
&mut Itimerval {
it_interval,
it_value,
},
null_mut(),
)
};
unsafe {
setitimer(
ITIMER_PROF,
&mut Itimerval {
it_interval,
it_value,
},
null_mut(),
)
};
}

Timer {
frequency,
Expand All @@ -68,13 +77,18 @@ impl Timer {
}
}

#[cfg(not(target_os = "linux"))]
impl Drop for Timer {
fn drop(&mut self) {
let it_interval = Timeval {
tv_sec: 0,
tv_usec: 0,
};
let it_value = it_interval.clone();

// the timer was set only for non-linux OS
// as a consequence, we'll only need to set
// it back to 0 in that case
unsafe {
setitimer(
ITIMER_PROF,
Expand Down