Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 4f4f020

Browse files
authored
Implement TelemetryProcessors for Azure exporters (#851)
1 parent 4d1d617 commit 4f4f020

File tree

6 files changed

+306
-6
lines changed

6 files changed

+306
-6
lines changed

contrib/opencensus-ext-azure/README.rst

Lines changed: 136 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ This example shows how to send a warning level log to Azure Monitor.
3737
logger.addHandler(AzureLogHandler(connection_string='InstrumentationKey=<your-instrumentation_key-here>'))
3838
logger.warning('Hello, World!')
3939
40+
Correlation
41+
###########
4042

4143
You can enrich the logs with trace IDs and span IDs by using the `logging integration <../opencensus-ext-logging>`_.
4244

@@ -73,6 +75,9 @@ You can enrich the logs with trace IDs and span IDs by using the `logging integr
7375
logger.warning('In the span')
7476
logger.warning('After the span')
7577
78+
Custom Properties
79+
#################
80+
7681
You can also add custom properties to your log messages in the *extra* keyword argument using the custom_dimensions field.
7782

7883
WARNING: For this feature to work, you need to pass a dictionary to the custom_dimensions field. If you pass arguments of any other type, the logger will ignore them.
@@ -89,6 +94,34 @@ WARNING: For this feature to work, you need to pass a dictionary to the custom_d
8994
properties = {'custom_dimensions': {'key_1': 'value_1', 'key_2': 'value_2'}}
9095
logger.warning('action', extra=properties)
9196
97+
Modifying Logs
98+
##############
99+
100+
* You can pass a callback function to the exporter to process telemetry before it is exported.
101+
* Your callback function can return `False` if you do not want this envelope exported.
102+
* Your callback function must accept an [envelope](https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py#L86) data type as its parameter.
103+
* You can see the schema for Azure Monitor data types in the envelopes [here](https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py).
104+
* The `AzureLogHandler` handles `ExceptionData` and `MessageData` data types.
105+
106+
.. code:: python
107+
108+
import logging
109+
110+
from opencensus.ext.azure.log_exporter import AzureLogHandler
111+
112+
logger = logging.getLogger(__name__)
113+
114+
# Callback function to append '_hello' to each log message telemetry
115+
def callback_function(envelope):
116+
envelope.data.baseData.message += '_hello'
117+
return True
118+
119+
handler = AzureLogHandler(connection_string='InstrumentationKey=<your-instrumentation_key-here>')
120+
handler.add_telemetry_processor(callback_function)
121+
logger.addHandler(handler)
122+
logger.warning('Hello, World!')
123+
124+
92125
Metrics
93126
~~~~~~~
94127

@@ -145,6 +178,9 @@ The **Azure Monitor Metrics Exporter** allows you to export metrics to `Azure Mo
145178
if __name__ == "__main__":
146179
main()
147180
181+
Standard Metrics
182+
################
183+
148184
The exporter also includes a set of standard metrics that are exported to Azure Monitor by default.
149185

150186
.. code:: python
@@ -179,6 +215,67 @@ Below is a list of standard metrics that are currently available:
179215
- Process CPU Usage (percentage)
180216
- Process Private Bytes (bytes)
181217

218+
Modifying Metrics
219+
#################
220+
221+
* You can pass a callback function to the exporter to process telemetry before it is exported.
222+
* Your callback function can return `False` if you do not want this envelope exported.
223+
* Your callback function must accept an [envelope](https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py#L86) data type as its parameter.
224+
* You can see the schema for Azure Monitor data types in the envelopes [here](https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py).
225+
* The `MetricsExporter` handles `MetricData` data types.
226+
227+
.. code:: python
228+
229+
import time
230+
231+
from opencensus.ext.azure import metrics_exporter
232+
from opencensus.stats import aggregation as aggregation_module
233+
from opencensus.stats import measure as measure_module
234+
from opencensus.stats import stats as stats_module
235+
from opencensus.stats import view as view_module
236+
from opencensus.tags import tag_map as tag_map_module
237+
238+
stats = stats_module.stats
239+
view_manager = stats.view_manager
240+
stats_recorder = stats.stats_recorder
241+
242+
CARROTS_MEASURE = measure_module.MeasureInt("carrots",
243+
"number of carrots",
244+
"carrots")
245+
CARROTS_VIEW = view_module.View("carrots_view",
246+
"number of carrots",
247+
[],
248+
CARROTS_MEASURE,
249+
aggregation_module.CountAggregation())
250+
251+
# Callback function to only export the metric if value is greater than 0
252+
def callback_function(envelope):
253+
return envelope.data.baseData.metrics[0].value > 0
254+
255+
def main():
256+
# Enable metrics
257+
# Set the interval in seconds in which you want to send metrics
258+
exporter = metrics_exporter.new_metrics_exporter(connection_string='InstrumentationKey=<your-instrumentation-key-here>')
259+
exporter.add_telemetry_processor(callback_function)
260+
view_manager.register_exporter(exporter)
261+
262+
view_manager.register_view(CARROTS_VIEW)
263+
mmap = stats_recorder.new_measurement_map()
264+
tmap = tag_map_module.TagMap()
265+
266+
mmap.measure_int_put(CARROTS_MEASURE, 1000)
267+
mmap.record(tmap)
268+
# Default export interval is every 15.0s
269+
# Your application should run for at least this amount
270+
# of time so the exporter will meet this interval
271+
# Sleep can fulfill this
272+
time.sleep(60)
273+
274+
print("Done recording metrics")
275+
276+
if __name__ == "__main__":
277+
main()
278+
182279
Trace
183280
~~~~~
184281

@@ -197,13 +294,18 @@ This example shows how to send a span "hello" to Azure Monitor.
197294
from opencensus.trace.tracer import Tracer
198295
199296
tracer = Tracer(
200-
exporter=AzureExporter(connection_string='InstrumentationKey=<your-instrumentation-key-here>'),
297+
exporter=AzureExporter(
298+
connection_string='InstrumentationKey=<your-instrumentation-key-here>'
299+
),
201300
sampler=ProbabilitySampler(1.0)
202301
)
203302
204303
with tracer.span(name='hello'):
205304
print('Hello, World!')
206305
306+
Integrations
307+
############
308+
207309
OpenCensus also supports several `integrations <https://github.com/census-instrumentation/opencensus-python#integration>`_ which allows OpenCensus to integrate with third party libraries.
208310

209311
This example shows how to integrate with the `requests <https://2.python-requests.org/en/master/>`_ library.
@@ -225,14 +327,45 @@ This example shows how to integrate with the `requests <https://2.python-request
225327
config_integration.trace_integrations(['requests'])
226328
tracer = Tracer(
227329
exporter=AzureExporter(
228-
# TODO: replace this with your own instrumentation key.
229-
instrumentation_key='00000000-0000-0000-0000-000000000000',
330+
connection_string='InstrumentationKey=<your-instrumentation-key-here>',
230331
),
231332
sampler=ProbabilitySampler(1.0),
232333
)
233334
with tracer.span(name='parent'):
234335
response = requests.get(url='https://www.wikipedia.org/wiki/Rabbit')
235336
337+
Modifying Traces
338+
################
339+
340+
* You can pass a callback function to the exporter to process telemetry before it is exported.
341+
* Your callback function can return `False` if you do not want this envelope exported.
342+
* Your callback function must accept an [envelope](https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py#L86) data type as its parameter.
343+
* You can see the schema for Azure Monitor data types in the envelopes [here](https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py).
344+
* The `AzureExporter` handles `Data` data types.
345+
346+
.. code:: python
347+
348+
import requests
349+
350+
from opencensus.ext.azure.trace_exporter import AzureExporter
351+
from opencensus.trace import config_integration
352+
from opencensus.trace.samplers import ProbabilitySampler
353+
from opencensus.trace.tracer import Tracer
354+
355+
config_integration.trace_integrations(['requests'])
356+
357+
# Callback function to add os_type: linux to span properties
358+
def callback_function(envelope):
359+
envelope.data.baseData.properties['os_type'] = 'linux'
360+
return True
361+
362+
exporter = AzureExporter(
363+
connection_string='InstrumentationKey=<your-instrumentation-key-here>'
364+
)
365+
exporter.add_telemetry_processor(callback_function)
366+
tracer = Tracer(exporter=exporter, sampler=ProbabilitySampler(1.0))
367+
with tracer.span(name='parent'):
368+
response = requests.get(url='https://www.wikipedia.org/wiki/Rabbit')
236369
237370
References
238371
----------
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright 2019, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class ProcessorMixin(object):
21+
"""ProcessorMixin adds the ability to process telemetry processors
22+
23+
Telemetry processors are functions that are called before exporting of
24+
telemetry to possibly modify the envelope contents.
25+
"""
26+
27+
def add_telemetry_processor(self, processor):
28+
"""Adds telemetry processor to the collection. Telemetry processors
29+
will be called one by one before telemetry item is pushed for sending
30+
and in the order they were added.
31+
32+
:param processor: The processor to add.
33+
"""
34+
self._telemetry_processors.append(processor)
35+
36+
def clear_telemetry_processors(self):
37+
"""Removes all telemetry processors"""
38+
self._telemetry_processors = []
39+
40+
def apply_telemetry_processors(self, envelopes):
41+
"""Applies all telemetry processors in the order they were added.
42+
43+
This function will return the list of envelopes to be exported after
44+
each processor has been run sequentially. Individual processors can
45+
throw exceptions and fail, but the applying of all telemetry processors
46+
will proceed (not fast fail). Processors also return True if envelope
47+
should be included for exporting, False otherwise.
48+
49+
:param envelopes: The envelopes to apply each processor to.
50+
"""
51+
filtered_envelopes = []
52+
for envelope in envelopes:
53+
accepted = True
54+
for processor in self._telemetry_processors:
55+
try:
56+
if processor(envelope) is False:
57+
accepted = False
58+
break
59+
except Exception as ex:
60+
logger.warning('Telemetry processor failed with: %s.', ex)
61+
if accepted:
62+
filtered_envelopes.append(envelope)
63+
return filtered_envelopes

contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from opencensus.common.schedule import Queue, QueueEvent, QueueExitEvent
2222
from opencensus.ext.azure.common import Options, utils
23+
from opencensus.ext.azure.common.processor import ProcessorMixin
2324
from opencensus.ext.azure.common.protocol import (
2425
Data,
2526
Envelope,
@@ -118,7 +119,7 @@ def filter(self, record):
118119
return random.random() < self.probability
119120

120121

121-
class AzureLogHandler(TransportMixin, BaseLogHandler):
122+
class AzureLogHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
122123
"""Handler for logging to Microsoft Azure Monitor.
123124
124125
:param options: Options for the log handler.
@@ -137,6 +138,7 @@ def __init__(self, **options):
137138
maintenance_period=self.options.storage_maintenance_period,
138139
retention_period=self.options.storage_retention_period,
139140
)
141+
self._telemetry_processors = []
140142
super(AzureLogHandler, self).__init__()
141143
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))
142144

@@ -148,6 +150,7 @@ def _export(self, batch, event=None): # pragma: NO COVER
148150
try:
149151
if batch:
150152
envelopes = [self.log_record_to_envelope(x) for x in batch]
153+
envelopes = self.apply_telemetry_processors(envelopes)
151154
result = self._transmit(envelopes)
152155
if result > 0:
153156
self.storage.put(envelopes, result)

contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from opencensus.common import utils as common_utils
2121
from opencensus.ext.azure.common import Options, utils
22+
from opencensus.ext.azure.common.processor import ProcessorMixin
2223
from opencensus.ext.azure.common.protocol import (
2324
Data,
2425
DataPoint,
@@ -35,7 +36,7 @@
3536
logger = logging.getLogger(__name__)
3637

3738

38-
class MetricsExporter(object):
39+
class MetricsExporter(ProcessorMixin):
3940
"""Metrics exporter for Microsoft Azure Monitor."""
4041

4142
def __init__(self, options=None):
@@ -46,6 +47,8 @@ def __init__(self, options=None):
4647
if self.options.max_batch_size <= 0:
4748
raise ValueError('Max batch size must be at least 1.')
4849
self.max_batch_size = self.options.max_batch_size
50+
self._telemetry_processors = []
51+
super(MetricsExporter, self).__init__()
4952

5053
def export_metrics(self, metrics):
5154
if metrics:
@@ -75,6 +78,7 @@ def export_metrics(self, metrics):
7578
batched_envelopes = list(common_utils.window(
7679
envelopes, self.max_batch_size))
7780
for batch in batched_envelopes:
81+
batch = self.apply_telemetry_processors(batch)
7882
self._transmit_without_retry(batch)
7983

8084
def create_data_points(self, time_series, metric_descriptor):

contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from opencensus.common.schedule import QueueExitEvent
1818
from opencensus.ext.azure.common import Options, utils
1919
from opencensus.ext.azure.common.exporter import BaseExporter
20+
from opencensus.ext.azure.common.processor import ProcessorMixin
2021
from opencensus.ext.azure.common.protocol import (
2122
Data,
2223
Envelope,
@@ -37,7 +38,7 @@
3738
__all__ = ['AzureExporter']
3839

3940

40-
class AzureExporter(TransportMixin, BaseExporter):
41+
class AzureExporter(BaseExporter, ProcessorMixin, TransportMixin):
4142
"""An exporter that sends traces to Microsoft Azure Monitor.
4243
4344
:param options: Options for the exporter.
@@ -52,6 +53,7 @@ def __init__(self, **options):
5253
maintenance_period=self.options.storage_maintenance_period,
5354
retention_period=self.options.storage_retention_period,
5455
)
56+
self._telemetry_processors = []
5557
super(AzureExporter, self).__init__(**options)
5658

5759
def span_data_to_envelope(self, sd):
@@ -152,6 +154,7 @@ def emit(self, batch, event=None):
152154
try:
153155
if batch:
154156
envelopes = [self.span_data_to_envelope(sd) for sd in batch]
157+
envelopes = self.apply_telemetry_processors(envelopes)
155158
result = self._transmit(envelopes)
156159
if result > 0:
157160
self.storage.put(envelopes, result)

0 commit comments

Comments
 (0)