Redact Sensitive Data from Traces
PII Redaction during tracing to Arize
Example Google Colab
In some situations, there is a need to redact or remove sensitive, personal identifiable information (PII) from your traces. In such a case, using a custom span processor that redacts PII from your spans during OTEL export can be very useful.
The goal of this documentation is to provide an example custom span processor that redacts PII information on any span using Regex patterns. Note that, even though this example uses regex patterns, the span processor can be customized with more advanced PII detection frameworks like Microsoft Presidio. Microsoft Presidio example can also be found in the end of the documentation.
If you want to completely hide the inputs and outputs of your traces, check out the Mask Span Attributes docs.
Let's create an example custom span processor to detect and redact PII data. In OpenTelemetry, every span processor needs to have the following methods:
on_start(span, parent_context)
: Handle span initializationon_end(span)
: Process completed spansshutdown()
: Clean up resourcesforce_flush(timeout_millis)
: Handle immediate processing needs
PII Redaction via Regex Patterns:
We'll use the on_end(span)
to redact PII data from different data types by defining regex patterns. The given example below supports the following types of PII:
Email addresses
Phone numbers (US format)
Social Security Numbers (SSN)
Credit card numbers
IP addresses
Dates of birth
Note that, you can always pass in additional patterns of your choice to the custom spans processor below!
pip install opentelemetry-sdk
pip install opentelemetry-exporter-otlp
import re
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
import json
from typing import Any, Dict, Optional
from opentelemetry.trace import Span
from opentelemetry.sdk.trace import ReadableSpan
class PIIRedactingSpanProcessor(SpanProcessor):
def __init__(self, exporter: SpanExporter, pii_patterns: Optional[Dict[str, str]] = None):
"""
Initialize the PII redacting processor with an exporter and optional patterns.
Args:
exporter: The span exporter to use after PII redaction
pii_patterns: Dictionary of pattern names and their regex patterns
"""
self._exporter = exporter
self._default_patterns = {
'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
'date_of_birth': r'\b\d{2}[-/]\d{2}[-/]\d{4}\b',
}
self._patterns = {**self._default_patterns, **(pii_patterns or {})}
# Compile patterns for better performance
self._compiled_patterns = {
name: re.compile(pattern) for name, pattern in self._patterns.items()
}
def _redact_string(self, value: str) -> str:
"""Redact PII from any string value."""
redacted = value
for pattern_name, pattern in self._compiled_patterns.items():
redacted = pattern.sub(f'[REDACTED_{pattern_name.upper()}]', redacted)
return redacted
def _redact_value(self, value: Any) -> Any:
"""
Redact PII from any value type.
Handles strings, numbers, booleans, lists, and dictionaries.
"""
if isinstance(value, str):
try:
# Try to parse as JSON first
json_obj = json.loads(value)
return json.dumps(self._redact_value(json_obj))
except json.JSONDecodeError:
# If not valid JSON, treat as regular string
return self._redact_string(value)
elif isinstance(value, dict):
return {k: self._redact_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [self._redact_value(item) for item in value]
elif isinstance(value, (int, float, bool, type(None))):
return value
else:
# Convert any other types to string and redact
return self._redact_string(str(value))
def _redact_span_attributes(self, span: ReadableSpan) -> Dict[str, Any]:
"""
Create a new dictionary of redacted span attributes.
"""
redacted_attributes = {}
for key, value in span.attributes.items():
# Skip certain metadata attributes that shouldn't contain PII
if key in {'service.name', 'telemetry.sdk.name', 'telemetry.sdk.version'}:
redacted_attributes[key] = value
continue
try:
redacted_value = self._redact_value(value)
redacted_attributes[key] = redacted_value
except Exception as e:
redacted_attributes[key] = "[REDACTION_ERROR]"
print(f"Error redacting attribute {key}: {str(e)}")
return redacted_attributes
def _create_redacted_span(self, span: ReadableSpan) -> ReadableSpan:
"""
Create a new span with redacted attributes instead of modifying the original.
"""
# Create redacted attributes
redacted_attributes = self._redact_span_attributes(span)
# Create a new span with redacted name and attributes
redacted_name = self._redact_string(span.name)
# Handle events
redacted_events = []
for event in span.events:
redacted_event_attrs = {
k: self._redact_value(v) for k, v in event.attributes.items()
}
# Create new event with redacted attributes
from opentelemetry.sdk.trace import Event
redacted_event = Event(
name=self._redact_string(event.name),
attributes=redacted_event_attrs,
timestamp=event.timestamp
)
redacted_events.append(redacted_event)
# Create new span with redacted data
from opentelemetry.sdk.trace import Span
redacted_span = ReadableSpan(
name=redacted_name,
context=span.get_span_context(),
parent=span.parent,
resource=span.resource,
attributes=redacted_attributes,
events=redacted_events,
links=span.links,
kind=span.kind,
status=span.status,
start_time=span.start_time,
end_time=span.end_time,
instrumentation_info=span.instrumentation_info
)
return redacted_span
def on_start(self, span: Span, parent_context: Optional[Any] = None):
"""Called when a span starts."""
pass
def on_end(self, span: ReadableSpan):
"""Called when a span ends. Creates a redacted copy and exports it."""
redacted_span = self._create_redacted_span(span)
self._exporter.export([redacted_span])
def shutdown(self):
"""Shuts down the processor and exporter."""
self._exporter.shutdown()
def force_flush(self, timeout_millis: int = 30000):
"""Forces flush of pending spans."""
self._exporter.force_flush(timeout_millis)
Once we create a custom span processor, we can simply initialize it and pass it to our tracer provider during our OTEL configurations:
# Set up the tracer provider with the PII processor
tracer_provider = trace_sdk.TracerProvider(
resource=Resource(attributes=trace_attributes)
)
# Create the PII redacting processor with the OTLP exporter
pii_processor = PIIRedactingSpanProcessor(OTLPSpanExporter(endpoint),)
tracer_provider.add_span_processor(pii_processor)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint)))
trace_api.set_tracer_provider(tracer_provider=tracer_provider)
# To get your tracer
tracer = trace_api.get_tracer(__name__)
# Finish automatic instrumentation
OpenAIInstrumentor().instrument()
Once you have the processor set up, any PII data with the defined regex patterns should be automatically redacted.
PII Redaction via Microsoft Presidio:
Some teams would want to use Microsoft Presidio instead of Regex patterns in order to redact PII from traces. An example span processor that leverages Presidio is shown below.
from presidio_analyzer import AnalyzerEngine
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
class PresidioRedactionSpanProcessor(trace_sdk.SpanProcessor):
"""
OpenTelemetry span processor that redacts PII data using Microsoft Presidio.
"""
def __init__(
self,
exporter: SpanExporter,
entities: Optional[List[str]] = None,
language: str = "en"
):
"""
Initialize the PII redacting processor with Presidio and an exporter.
Args:
exporter: The span exporter to use after PII redaction
entities: List of PII entity types to detect and redact.
If None, uses a default set of common PII types.
language: Language to use for NLP analysis
"""
self._exporter = exporter
# Default supported entity types in Presidio
self._default_entities = [
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "US_SSN",
"CREDIT_CARD", "IP_ADDRESS", "DATE_TIME", "US_BANK_NUMBER",
"US_DRIVER_LICENSE", "LOCATION", "NRP", "US_PASSPORT",
"US_ITIN", "CRYPTO", "UK_NHS", "IBAN_CODE"
]
self._entities = entities or self._default_entities
# Set up Presidio engines with proper configuration
nlp_configuration = {
"nlp_engine_name": "spacy",
"models": [
{"lang_code": language, "model_name": "en_core_web_lg"}
]
}
nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine()
self._analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
self._anonymizer = AnonymizerEngine()
# Default operator for anonymization (replacement with entity type)
self._operators = {
entity: OperatorConfig("replace", {"new_value": f"[REDACTED_{entity}]"})
for entity in self._entities
}
def _redact_string(self, value: str) -> str:
"""Redact PII from any string value using Presidio."""
if not value.strip():
return value
try:
# Analyze the text for PII
results = self._analyzer.analyze(
text=value,
entities=self._entities,
language="en"
)
# If PII is found, anonymize it
if results:
anonymized_text = self._anonymizer.anonymize(
text=value,
analyzer_results=results,
operators=self._operators
)
return anonymized_text.text
return value
except Exception as e:
print(f"Error redacting string: {str(e)}")
return "[REDACTION_ERROR]"
def _redact_value(self, value: Any) -> Any:
"""
Redact PII from any value type.
Handles strings, numbers, booleans, lists, and dictionaries.
"""
if isinstance(value, str):
try:
# Try to parse as JSON first
json_obj = json.loads(value)
return json.dumps(self._redact_value(json_obj))
except json.JSONDecodeError:
# If not valid JSON, treat as regular string
return self._redact_string(value)
elif isinstance(value, dict):
return {k: self._redact_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [self._redact_value(item) for item in value]
elif isinstance(value, (int, float, bool, type(None))):
return value
else:
# Convert any other types to string and redact
return self._redact_string(str(value))
def _redact_span_attributes(self, span: ReadableSpan) -> Dict[str, Any]:
"""
Create a new dictionary of redacted span attributes.
"""
redacted_attributes = {}
for key, value in span.attributes.items():
# Skip certain metadata attributes that shouldn't contain PII
if key in {'service.name', 'telemetry.sdk.name', 'telemetry.sdk.version'}:
redacted_attributes[key] = value
continue
try:
redacted_value = self._redact_value(value)
redacted_attributes[key] = redacted_value
except Exception as e:
redacted_attributes[key] = "[REDACTION_ERROR]"
print(f"Error redacting attribute {key}: {str(e)}")
return redacted_attributes
def _create_redacted_span(self, span: ReadableSpan) -> ReadableSpan:
"""
Create a new span with redacted attributes instead of modifying the original.
"""
# Create redacted attributes
redacted_attributes = self._redact_span_attributes(span)
# Redact span name
redacted_name = self._redact_string(span.name)
# Handle events
redacted_events = []
for event in span.events:
redacted_event_attrs = {
k: self._redact_value(v) for k, v in event.attributes.items()
}
# Create new event with redacted attributes
redacted_event = Event(
name=self._redact_string(event.name),
attributes=redacted_event_attrs,
timestamp=event.timestamp
)
redacted_events.append(redacted_event)
# Create new span with redacted data
redacted_span = ReadableSpan(
name=redacted_name,
context=span.get_span_context(),
parent=span.parent,
resource=span.resource,
attributes=redacted_attributes,
events=redacted_events,
links=span.links,
kind=span.kind,
status=span.status,
start_time=span.start_time,
end_time=span.end_time,
instrumentation_scope=span.instrumentation_scope
)
return redacted_span
def on_start(self, span: Span, parent_context: Optional[Any] = None):
"""Called when a span starts."""
pass
def on_end(self, span: ReadableSpan):
"""Called when a span ends. Creates a redacted copy and exports it."""
redacted_span = self._create_redacted_span(span)
self._exporter.export([redacted_span])
def shutdown(self):
"""Shuts down the processor and exporter."""
self._exporter.shutdown()
def force_flush(self, timeout_millis: int = 30000):
"""Forces flush of pending spans."""
self._exporter.force_flush(timeout_millis)
Last updated
Was this helpful?