Compliance Platforms Integration
This guide explains how to integrate CSE into compliance management platforms, enabling automatic control mapping and unified reporting across multiple frameworks.
Integration Benefits
Compliance platforms that consume CSE data can offer their users:
- Automatic control mapping: Findings automatically map to relevant framework controls without manual tagging
- Multi-framework views: The same finding shows its impact across HIPAA, SOC 2, ISO 27001, and other frameworks
- Cross-tool aggregation: Findings from different scanners correlate via shared signal IDs
- Reduced configuration: Pre-built mappings eliminate per-customer framework setup
Architecture Overview
Implementation Steps
1. Ingest CSE Findings
Parse incoming findings and validate against the CSE schema:
import json
from jsonschema import validate, ValidationError
class CSEFindingIngester:
def __init__(self, schema_path: str):
with open(schema_path) as f:
self.schema = json.load(f)
def ingest(self, finding_data: dict) -> dict:
"""Validate and normalize a CSE finding."""
try:
validate(instance=finding_data, schema=self.schema)
except ValidationError as e:
raise ValueError(f"Invalid CSE finding: {e.message}")
return {
"id": finding_data["id"],
"signal_id": finding_data["signal_id"],
"observed_at": finding_data["observed_at"],
"status": finding_data["status"],
"resource": self._extract_resource(finding_data["artifact"]),
"evidence": finding_data.get("evidence", {}),
"source_tool": finding_data.get("source", {}).get("tool"),
"raw_finding": finding_data
}
def _extract_resource(self, artifact: dict) -> dict:
"""Normalize resource information."""
return {
"type": artifact["type"],
"id": artifact.get("resource_id"),
"provider": artifact.get("provider"),
"region": artifact.get("region")
}2. Resolve Signal Mappings
Fetch control mappings from the CSE Registry API:
import requests
from functools import lru_cache
class CSEMappingResolver:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.cseregistry.org/v1"
@lru_cache(maxsize=2000)
def get_mappings(self, signal_id: str) -> list:
"""Fetch and cache control mappings for a signal."""
response = requests.get(
f"{self.base_url}/signals/{signal_id}/mappings",
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code == 404:
return []
response.raise_for_status()
return response.json()["data"]["mappings"]
@lru_cache(maxsize=2000)
def get_signal(self, signal_id: str) -> dict:
"""Fetch signal details."""
response = requests.get(
f"{self.base_url}/signals/{signal_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()["data"]
def resolve_finding(self, finding: dict) -> dict:
"""Enrich a finding with signal details and mappings."""
signal_id = finding["signal_id"]
signal = self.get_signal(signal_id)
mappings = self.get_mappings(signal_id)
return {
**finding,
"signal": signal,
"control_mappings": mappings,
"affected_frameworks": list(set(m["framework"] for m in mappings))
}3. Map to Framework Controls
Build a unified view showing which controls are affected across frameworks:
class ControlMapper:
def __init__(self, resolver: CSEMappingResolver):
self.resolver = resolver
def map_findings_to_controls(self, findings: list) -> dict:
"""
Map a list of findings to framework controls.
Returns a structure grouped by framework and control.
"""
control_map = {}
for finding in findings:
enriched = self.resolver.resolve_finding(finding)
for mapping in enriched["control_mappings"]:
framework = mapping["framework"]
control_id = mapping["control_id"]
key = f"{framework}:{control_id}"
if key not in control_map:
control_map[key] = {
"framework": framework,
"control_id": control_id,
"control_title": mapping["control_title"],
"findings": [],
"signals": set()
}
control_map[key]["findings"].append({
"finding_id": finding["id"],
"signal_id": finding["signal_id"],
"relationship": mapping["relationship"],
"severity": enriched["signal"]["severity"],
"resource": finding.get("resource")
})
control_map[key]["signals"].add(finding["signal_id"])
# Convert sets to lists for serialization
for control in control_map.values():
control["signals"] = list(control["signals"])
control["finding_count"] = len(control["findings"])
return control_map
def generate_framework_report(
self,
findings: list,
framework: str
) -> dict:
"""Generate a compliance report for a specific framework."""
control_map = self.map_findings_to_controls(findings)
framework_controls = {
k: v for k, v in control_map.items()
if v["framework"] == framework
}
return {
"framework": framework,
"generated_at": datetime.utcnow().isoformat(),
"total_findings": len(findings),
"affected_controls": len(framework_controls),
"controls": list(framework_controls.values())
}4. Aggregate Across Tools
Deduplicate and correlate findings from multiple security tools:
class FindingAggregator:
def aggregate_by_resource(self, findings: list) -> dict:
"""
Group findings by resource, correlating across tools.
Same resource + same signal = same issue.
"""
resource_map = {}
for finding in findings:
resource_id = finding.get("resource", {}).get("id")
signal_id = finding["signal_id"]
key = f"{resource_id}:{signal_id}"
if key not in resource_map:
resource_map[key] = {
"resource_id": resource_id,
"signal_id": signal_id,
"first_seen": finding["observed_at"],
"last_seen": finding["observed_at"],
"sources": [],
"status": "open"
}
# Track which tools found this issue
resource_map[key]["sources"].append({
"tool": finding.get("source_tool"),
"finding_id": finding["id"],
"observed_at": finding["observed_at"]
})
# Update timestamps
if finding["observed_at"] < resource_map[key]["first_seen"]:
resource_map[key]["first_seen"] = finding["observed_at"]
if finding["observed_at"] > resource_map[key]["last_seen"]:
resource_map[key]["last_seen"] = finding["observed_at"]
return resource_mapCaching Strategy
Signal definitions and mappings rarely change. Cache them aggressively:
import redis
import json
from datetime import timedelta
class CachedCSEResolver:
def __init__(self, api_key: str, redis_client: redis.Redis):
self.api_key = api_key
self.redis = redis_client
self.ttl = timedelta(hours=24)
def get_signal(self, signal_id: str) -> dict:
cache_key = f"cse:signal:{signal_id}"
# Try cache first
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
signal = self._fetch_signal(signal_id)
# Cache for 24 hours
self.redis.setex(
cache_key,
self.ttl,
json.dumps(signal)
)
return signal
def get_mappings(self, signal_id: str) -> list:
cache_key = f"cse:mappings:{signal_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
mappings = self._fetch_mappings(signal_id)
self.redis.setex(
cache_key,
self.ttl,
json.dumps(mappings)
)
return mappings
def warm_cache(self, signal_ids: list):
"""Pre-populate cache for known signals."""
for signal_id in signal_ids:
self.get_signal(signal_id)
self.get_mappings(signal_id)UI Integration
Display CSE data in your compliance dashboard:
Finding Detail View
// React component example
function FindingDetail({ finding, signal, mappings }) {
return (
<div>
<h2>{signal.title}</h2>
<Badge severity={signal.severity}>{signal.severity}</Badge>
<section>
<h3>Description</h3>
<p>{signal.description}</p>
</section>
<section>
<h3>Affected Controls</h3>
<table>
<thead>
<tr>
<th>Framework</th>
<th>Control</th>
<th>Relationship</th>
</tr>
</thead>
<tbody>
{mappings.map(m => (
<tr key={`${m.framework}-${m.control_id}`}>
<td>{m.framework}</td>
<td>{m.control_id}: {m.control_title}</td>
<td><RelationshipBadge type={m.relationship} /></td>
</tr>
))}
</tbody>
</table>
</section>
<section>
<h3>Resource</h3>
<ResourceCard resource={finding.artifact} />
</section>
</div>
);
}Best Practices
- Cache signal data: Mappings change infrequently; cache for 24+ hours
- Handle missing mappings: Not all signals have mappings to all frameworks
- Show relationship types: Primary vs supporting mappings have different compliance implications
- Deduplicate cross-tool: Same resource + same signal = same issue
- Track source tools: Show users which tools found each issue