diff --git a/src/oc_stat.erl b/src/oc_stat.erl index 03f709d..b8c736f 100644 --- a/src/oc_stat.erl +++ b/src/oc_stat.erl @@ -35,8 +35,8 @@ -define(RECORD(Tags, MeasureName, Value), begin - Module = oc_stat_measure:measure_module(MeasureName), - Module:record(Tags, Value), + Module = oc_stat_measure:record_module(MeasureName), + Module:record(MeasureName, Tags, Value), ok end). diff --git a/src/oc_stat_collector.erl b/src/oc_stat_collector.erl new file mode 100644 index 0000000..ed35a2f --- /dev/null +++ b/src/oc_stat_collector.erl @@ -0,0 +1,42 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2018, OpenCensus Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc Worker process for sending recorded stats. +%% @end +%%%----------------------------------------------------------------------- +-module(oc_stat_collector). + +-export([start_link/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +init([]) -> + {ok, #{}}. + +handle_call(_, _, State) -> + {noreply, State}. + +handle_cast(_, State) -> + {noreply, State}. + +handle_info({record, MeasureName, Tags, Value}, State) -> + Module = oc_stat_measure:measure_module(MeasureName), + Module:record(MeasureName, Tags, Value), + {noreply, State}. diff --git a/src/oc_stat_collectors.erl b/src/oc_stat_collectors.erl new file mode 100644 index 0000000..ab50a9f --- /dev/null +++ b/src/oc_stat_collectors.erl @@ -0,0 +1,86 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2018, OpenCensus Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc Starts and monitors stat collecting worker processes. +%% @end +%%%----------------------------------------------------------------------- +-module(oc_stat_collectors). + +-behaviour(gen_server). + +-export([start_link/0, + record/3]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +-record(state, {workers :: [], + num_workers :: integer()}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +record(MeasureName, Tags, Value) -> + Workers = persistent_term:get(?MODULE), + element(erlang:system_info(scheduler_id), Workers) ! {record, MeasureName, Tags, Value}. + +init([]) -> + erlang:process_flag(trap_exit, true), + NumSchedulers = erlang:system_info(schedulers), + Workers = + lists:map(fun(_) -> + {ok, Pid} = oc_stat_collector:start_link(), + Pid + end, lists:seq(1, NumSchedulers)), + persistent_term:put(?MODULE, list_to_tuple(Workers)), + {ok, #state{workers=Workers, + num_workers=NumSchedulers}}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_cast(_, State) -> + {noreply, State}. + +handle_info({'EXIT', FromPid, _Reason}, State=#state{workers=Workers}) -> + %% hopefully this goes behind any other immediate EXITs in the mailbox + %% so we only update once if more than 1 crash at the same time + self() ! update_workers, + {noreply, State#state{workers=lists:delete(FromPid, Workers)}}; +handle_info(update_workers, State=#state{workers=Workers, + num_workers=NumWorkers}) -> + flush_update_workers(), + case NumWorkers - length(Workers) of + N when N > 0 -> + Workers1 = + lists:map(fun(_) -> + {ok, Pid} = oc_stat_collector:start_link(), + Pid + end, lists:seq(1, N)), + persistent_term:put(?MODULE, list_to_tuple(Workers1++Workers)), + {noreply, State#state{workers=Workers1}}; + _ -> + {noreply, State} + end. + +flush_update_workers() -> + receive + update_workers -> + flush_update_workers() + after + 0 -> + ok + end. diff --git a/src/oc_stat_measure.erl b/src/oc_stat_measure.erl index a30e543..ad03e49 100644 --- a/src/oc_stat_measure.erl +++ b/src/oc_stat_measure.erl @@ -38,6 +38,7 @@ %% codegen -export([measure_module/1, + record_module/1, module_name/1, maybe_module_name/1, regen_record/2, @@ -56,10 +57,11 @@ unit/0, measure/0]). --record(measure, {name :: name(), - module :: module(), - description :: description(), - unit :: unit()}). +-record(measure, {name :: name(), + module :: module(), + record_module :: module(), + description :: description(), + unit :: unit()}). -type name() :: atom() | binary() | string(). -type description() :: binary() | string(). @@ -79,9 +81,11 @@ %% @end -spec new(name(), description(), unit()) -> oc_stat_view:measure(). new(Name, Description, Unit) -> + RecordModule = application:get_env(opencensus, stat_record_module, oc_stat_measure:module_name(Name)), gen_server:call(oc_stat, {measure_register, #measure{name=Name, module=oc_stat_measure:module_name(Name), + record_module=RecordModule, description=Description, unit=Unit}}). %% @doc @@ -163,6 +167,13 @@ measure_module(Name) -> _ -> erlang:error({unknown_measure, Name}) end. +record_module(Name) -> + case ets:lookup(?MEASURES_TABLE, Name) of + [#measure{record_module=Module}] -> + Module; + _ -> erlang:error({unknown_measure, Name}) + end. + %% @private -spec module_name(name()) -> module(). module_name(Name) -> @@ -208,11 +219,11 @@ regen_module(ModuleName, RecordBody, Subs) -> 1}}, {attribute, 1, module, ModuleName}, {attribute, 1, export, - [{record, 2}]}, + [{record, 3}]}, {attribute, 1, export, [{subs, 0}]}, - {function, 1, record, 2, - [{clause, 1, [{var, 1, 'ContextTags'}, {var, 1, 'Value'}], [], + {function, 1, record, 3, + [{clause, 1, [{var, 1, '_MeasureName'}, {var, 1, 'ContextTags'}, {var, 1, 'Value'}], [], RecordBody ++ [{atom, 1, ok}] }]}, {function, 1, subs, 0, diff --git a/src/opencensus_sup.erl b/src/opencensus_sup.erl index 4879373..0112568 100644 --- a/src/opencensus_sup.erl +++ b/src/opencensus_sup.erl @@ -32,6 +32,13 @@ start_link() -> init([]) -> ok = oc_sampler:init(application:get_env(opencensus, sampler, {oc_sampler_always, []})), + StatCollectors = #{id => oc_stat_collectors, + start => {oc_stat_collectors, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [oc_stat_collectors]}, + Reporter = #{id => oc_reporter, start => {oc_reporter, start_link, []}, restart => permanent, @@ -60,6 +67,14 @@ init([]) -> type => worker, modules => [oc_server]}, + Children = + case application:get_env(opencensus, stat_record_module, oc_stat_collectors) of + oc_stat_collectors -> + [Reporter, Exporter, ViewServer, TraceServer, StatCollectors]; + _ -> + [Reporter, Exporter, ViewServer, TraceServer] + end, + {ok, {#{strategy => one_for_one, intensity => 1, - period => 5}, [Reporter, Exporter, ViewServer, TraceServer]}}. + period => 5}, Children}}.