Skip to content
This repository was archived by the owner on Nov 26, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import unittest
import utils_tests
import trappy
import tempfile
import numpy as np
from trappy.base import trace_parser_explode_array

sys.path.append(os.path.join(utils_tests.TESTS_DIRECTORY, "..", "trappy"))
Expand Down Expand Up @@ -238,3 +240,27 @@ def test_equals_in_field_value(self):
self.assertListEqual(df["my_field"].tolist(),
["foo", "foo=bar", "foo=bar=baz", 1,
"1=2", "1=foo", "1foo=2"])

def test_merge_dfs(self):
trace_file = tempfile.mktemp(dir="/tmp", suffix=".txt")
lines = [
" adbd-5709 [007] 2943.184105: sched_contrib_scale_f: cpu=7 cpu_scale_factor=1\n"
" adbd-5709 [007] 2943.184105: sched_load_avg_cpu: cpu=7 util_avg=825\n"
" ->transport-5713 [006] 2943.184106: sched_load_avg_cpu: cpu=6 util_avg=292\n"
" ->transport-5713 [006] 2943.184107: sched_contrib_scale_f: cpu=6 cpu_scale_factor=2\n"
" adbd-5709 [007] 2943.184108: sched_load_avg_cpu: cpu=7 util_avg=850\n"
" adbd-5709 [007] 2943.184109: sched_contrib_scale_f: cpu=7 cpu_scale_factor=3\n"
" adbd-5709 [007] 2943.184110: sched_load_avg_cpu: cpu=6 util_avg=315\n"
]
with open(trace_file, 'w') as fh:
for line in lines:
fh.write(line)

trace = trappy.ftrace.FTrace(trace_file, events=['sched_contrib_scale_f', 'sched_load_avg_cpu'],
normalize_time=False)

df1 = trace.sched_load_avg_cpu.data_frame[['cpu', 'util_avg', '__line']]
df2 = trace.sched_contrib_scale_f.data_frame[['cpu', 'cpu_scale_factor', '__line']]
df3 = trappy.utils.merge_dfs(df1, df2, 'cpu')
cpu_scale_list = ["NaN" if np.isnan(x) else x for x in df3["cpu_scale_factor"].tolist()]
self.assertListEqual(cpu_scale_list, [1.0, "NaN", 1.0, 2.0])
15 changes: 13 additions & 2 deletions trappy/ftrace.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def get_all_freqs_data(self, map_label):

return ret

def apply_callbacks(self, fn_map):
def apply_callbacks(self, fn_map, *kwarg):
"""
Apply callback functions to trace events in chronological order.

Expand All @@ -429,6 +429,12 @@ def apply_callbacks(self, fn_map):
"sched_switch": callback_fn1,
"sched_wakeup": callback_fn2
})

:param fn_map: A dict of event to function mapping
:type fn_map: dict

:param kwarg: Optional argument to pass to callbacks
:type kwarg: dict
"""
dfs = {event: getattr(self, event).data_frame for event in fn_map.keys()}
events = [event for event in fn_map.keys() if not dfs[event].empty]
Expand All @@ -455,7 +461,12 @@ def getLine(event):
event_dict = {
col: event_tuple[idx] for col, idx in col_idxs[event_name].iteritems()
}
fn_map[event_name](event_dict)

if kwarg:
fn_map[event_name](event_dict, kwarg)
else:
fn_map[event_name](event_dict)

event_row = next(iters[event_name], None)
if event_row:
next_rows[event_name] = event_row
Expand Down
107 changes: 107 additions & 0 deletions trappy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.
#

import pandas as pd
import numpy as np

"""Generic functions that can be used in multiple places in trappy
"""

Expand Down Expand Up @@ -102,3 +105,107 @@ def handle_duplicate_index(data,
dup_index_left += 1

return data.reindex(new_index)

# Iterate fast over all rows in a data frame and apply fn
def apply_callback(df, fn, *kwargs):
"""
A generic API to apply a function onto a data frame and optionally pass
it an argument (kwargs). This is faster than `DataFrame.apply` method.

:param df: DataFrame to apply fn onto
:type df: :mod:`pandas.DataFrame`

:param fn: Function that is applied onto the DataFrame
:type fn: function

:param kwargs: Optional argument to pass to the callback
:type kwargs: dict
"""
iters = df.itertuples()
event_tuple = iters.next()

# Column names beginning with underscore will not be preserved in tuples
# due to constraints on namedtuple field names, so store mappings from
# column name to column number for each trace event.
col_idxs = { name: idx for idx, name in enumerate(['Time'] + df.columns.tolist()) }

while True:
if not event_tuple:
break
event_dict = { col: event_tuple[idx] for col, idx in col_idxs.iteritems() }

if kwargs:
fn(event_dict, kwargs)
else:
fn(event_dict)

event_tuple = next(iters, None)

def merge_dfs(pr_df, sec_df, pivot):
"""
Merge information from secondary DF into a primary DF. During the merge
take into account the latest secondary rows based on time stamp. A pivot
is used to differentiate between rows. The returned DF has same number of
rows as the primary DF.

For example, Say the following is a primary DF:
cpu util_avg __line
Time
2943.184105 7 825 1
2943.184106 6 292 2
2943.184108 7 850 4
2943.184110 6 315 6

And the following is a secondary DF:
cpu cpu_scale_factor __line
Time
2943.184105 7 1 0
2943.184107 6 2 3
2943.184109 7 3 5

And, pivot = 'cpu'. Then, the returned DF will look like:
__line cpu cpu_scale_factor util_avg
Time
2943.184105 1 7 1.0 825.0
2943.184106 2 6 NaN 292.0
2943.184108 4 7 1.0 850.0
2943.184110 6 6 2.0 315.0

:param pr_df: Primary data frame to merge into.
:type pr_df: :mod:`pandas.DataFrame`

:param sec_df: Secondary data frame as source of data.
:type sec_df: :mod:`pandas.DataFrame`
"""

# Keep track of last secondary event
pivot_map = {}

# An array accumating dicts with merged data
merged_data = []
def df_fn(data):
# Store the latest secondary info
if data['Time'][0] == 'secondary':
pivot_map[data[pivot]] = data
# Get rid of primary/secondary labels
data['Time'] = data['Time'][1]
return

# Propogate latest secondary info
for key, value in data.iteritems():
if key == pivot:
continue
# Fast check for if value is nan (faster than np.isnan + try/except)
if value != value and pivot_map.has_key(data[pivot]):
data[key] = pivot_map[data[pivot]][key]

# Get rid of primary/secondary labels
data['Time'] = data['Time'][1]
merged_data.append(data)

df = pd.concat([pr_df, sec_df], keys=['primary', 'secondary']).sort_values(by='__line')
apply_callback(df, df_fn)
merged_df = pd.DataFrame.from_dict(merged_data)
merged_df.set_index('Time', inplace=True)

return merged_df