Security Tools Integration

This guide explains how to integrate CSE into security scanners, vulnerability assessment tools, and detection systems. By emitting CSE-compatible findings, your tools become interoperable with the broader compliance ecosystem.

Why Integrate with CSE?

Tools that emit CSE signals provide immediate value to users:

  • Automatic framework mapping: Findings map to HIPAA, SOC 2, ISO 27001, and other frameworks without additional configuration
  • Cross-tool correlation: Findings from your tool correlate with findings from other CSE-compatible tools
  • Compliance reporting: Users can generate unified compliance reports across their security stack
  • Reduced integration effort: GRC platforms that support CSE can consume your output without custom parsers

Integration Levels

CSE supports three levels of integration, allowing you to adopt incrementally:

LevelDescriptionEffort
Level 1: ReferenceInclude CSE signal IDs in your existing output formatLow
Level 2: Finding FormatEmit findings in the standard CSE finding formatMedium
Level 3: Full SupportInclude artifacts and pass full schema validationHigher

Level 1: Signal Reference

The simplest integration: add a CSE signal ID to your existing finding format. This enables downstream systems to look up framework mappings.

// Your existing finding format
{
  "rule_id": "S3-001",
  "message": "S3 bucket without encryption",
  "resource": "arn:aws:s3:::my-bucket",
  "severity": "HIGH",

  // Add CSE reference
  "cse_signal_id": "CSE-HIPAA-TECH-ENCRYPT-REST-001"
}

Signal Mapping

Create a mapping between your internal rule IDs and CSE signals:

signal_mappings.yaml
# signal_mappings.yaml
mappings:
  - rule_id: "S3-001"
    cse_signal_id: "CSE-HIPAA-TECH-ENCRYPT-REST-001"

  - rule_id: "SG-SSH-001"
    cse_signal_id: "CSE-CMMC-COMMS-UNRESTRICTED-SSH-001"

  - rule_id: "IAM-MFA-001"
    cse_signal_id: "CSE-CMMC-ACCESS-MFA-001"

  # Rules without CSE mapping
  - rule_id: "CUSTOM-001"
    cse_signal_id: null
import yaml

class SignalMapper:
    def __init__(self, mapping_file):
        with open(mapping_file) as f:
            data = yaml.safe_load(f)
        self.mappings = {m['rule_id']: m['cse_signal_id'] for m in data['mappings']}

    def get_cse_signal(self, rule_id):
        return self.mappings.get(rule_id)

# In your scanner
mapper = SignalMapper('signal_mappings.yaml')

def emit_finding(rule_id, resource, message):
    finding = {
        'rule_id': rule_id,
        'resource': resource,
        'message': message,
        'cse_signal_id': mapper.get_cse_signal(rule_id)
    }
    return finding

Level 2: Finding Format

Emit findings in the standard CSE format for full interoperability:

import json
from datetime import datetime
from typing import Optional, Dict, Any
import hashlib

class CSEFindingEmitter:
    def __init__(self, tool_name: str, tool_version: str):
        self.tool_name = tool_name
        self.tool_version = tool_version
        self.scan_id = self._generate_scan_id()

    def _generate_scan_id(self) -> str:
        timestamp = datetime.utcnow().isoformat()
        return f"scan-{hashlib.sha256(timestamp.encode()).hexdigest()[:12]}"

    def _generate_finding_id(self, signal_id: str, resource_id: str) -> str:
        key = f"{signal_id}:{resource_id}:{self.scan_id}"
        return f"f-{hashlib.sha256(key.encode()).hexdigest()[:16]}"

    def emit_finding(
        self,
        signal_id: str,
        artifact: Dict[str, Any],
        evidence: Dict[str, Any],
        severity_override: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Emit a CSE-compatible finding."""

        finding = {
            "id": self._generate_finding_id(signal_id, artifact.get("resource_id", "")),
            "signal_id": signal_id,
            "observed_at": datetime.utcnow().isoformat() + "Z",
            "status": "open",
            "artifact": artifact,
            "evidence": evidence,
            "source": {
                "tool": self.tool_name,
                "tool_version": self.tool_version,
                "scan_id": self.scan_id
            }
        }

        if severity_override:
            finding["severity_override"] = severity_override

        if metadata:
            finding["metadata"] = metadata

        return finding

    def emit_findings_batch(self, findings: list) -> Dict[str, Any]:
        """Emit a batch of findings in CSE format."""
        return {
            "schema_version": "1.0.0",
            "generated_at": datetime.utcnow().isoformat() + "Z",
            "source": {
                "tool": self.tool_name,
                "tool_version": self.tool_version,
                "scan_id": self.scan_id
            },
            "findings": findings
        }


# Usage in your scanner
emitter = CSEFindingEmitter("acme-cloud-scanner", "2.1.0")

# When you detect a condition
finding = emitter.emit_finding(
    signal_id="CSE-HIPAA-TECH-ENCRYPT-REST-001",
    artifact={
        "type": "cloud_resource",
        "provider": "aws",
        "service": "s3",
        "resource_id": "arn:aws:s3:::patient-data-bucket",
        "region": "us-east-1"
    },
    evidence={
        "encryption_enabled": False,
        "bucket_name": "patient-data-bucket",
        "versioning_enabled": True
    },
    metadata={
        "account_id": "123456789012",
        "environment": "production"
    }
)

print(json.dumps(finding, indent=2))

Output Example

{
  "id": "f-a1b2c3d4e5f67890",
  "signal_id": "CSE-HIPAA-TECH-ENCRYPT-REST-001",
  "observed_at": "2024-12-28T15:30:00Z",
  "status": "open",
  "artifact": {
    "type": "cloud_resource",
    "provider": "aws",
    "service": "s3",
    "resource_id": "arn:aws:s3:::patient-data-bucket",
    "region": "us-east-1"
  },
  "evidence": {
    "encryption_enabled": false,
    "bucket_name": "patient-data-bucket",
    "versioning_enabled": true
  },
  "source": {
    "tool": "acme-cloud-scanner",
    "tool_version": "2.1.0",
    "scan_id": "scan-abc123def456"
  },
  "metadata": {
    "account_id": "123456789012",
    "environment": "production"
  }
}

Level 3: Full Support

For complete CSE compliance, include artifacts and validate against the JSON schema:

import json
import hashlib
from jsonschema import validate

# Load CSE schema
with open('schemas/finding.schema.json') as f:
    FINDING_SCHEMA = json.load(f)

class FullCSEEmitter(CSEFindingEmitter):
    def emit_finding_with_artifact(
        self,
        signal_id: str,
        artifact: dict,
        evidence: dict,
        raw_artifact_data: dict
    ) -> tuple:
        """Emit a finding with a linked artifact."""

        finding = self.emit_finding(signal_id, artifact, evidence)

        # Create linked artifact
        artifact_record = {
            "id": f"art-{finding['id'][2:]}",
            "finding_id": finding["id"],
            "type": "api_response",
            "captured_at": finding["observed_at"],
            "source": {
                "service": artifact.get("service"),
                "resource_id": artifact.get("resource_id")
            },
            "content": raw_artifact_data,
            "integrity": {
                "hash_sha256": hashlib.sha256(
                    json.dumps(raw_artifact_data, sort_keys=True).encode()
                ).hexdigest()
            }
        }

        # Validate against schema
        validate(instance=finding, schema=FINDING_SCHEMA)

        return finding, artifact_record

Signal Discovery

Before mapping your rules to CSE signals, discover relevant signals:

# Search for encryption-related signals
curl "https://api.cseregistry.org/v1/search?q=encryption" \
  -H "Authorization: Bearer YOUR_API_KEY"

# List all signals in a specific category
curl "https://api.cseregistry.org/v1/signals?category=ACCESS" \
  -H "Authorization: Bearer YOUR_API_KEY"

# Get signal details to verify it matches your detection
curl "https://api.cseregistry.org/v1/signals/CSE-HIPAA-TECH-ENCRYPT-REST-001" \
  -H "Authorization: Bearer YOUR_API_KEY"

Best Practices

1. Map to the Most Specific Signal

Choose the signal that most precisely matches what your tool detects:

# Good: Specific signal for S3 encryption at rest
"CSE-HIPAA-TECH-ENCRYPT-REST-001"

# Less ideal: Generic encryption signal
"CSE-GEN-TECH-ENCRYPTION-001"

2. Include Sufficient Evidence

Provide enough evidence for users to understand and reproduce the finding:

# Good: Detailed evidence
"evidence": {
  "encryption_enabled": false,
  "encryption_type": null,
  "kms_key_id": null,
  "bucket_name": "my-bucket",
  "checked_properties": ["ServerSideEncryptionConfiguration"]
}

# Less ideal: Minimal evidence
"evidence": {
  "compliant": false
}

3. Use Consistent Resource Identifiers

Use the cloud provider's canonical identifier format:

# AWS: Use ARN format
"resource_id": "arn:aws:s3:::my-bucket"

# Azure: Use full resource ID
"resource_id": "/subscriptions/.../resourceGroups/.../providers/..."

# GCP: Use full resource name
"resource_id": "projects/my-project/buckets/my-bucket"

4. Handle Missing Mappings

Not all detections will map to CSE signals. Handle this gracefully:

def emit_finding(rule_id, ...):
    signal_id = mapper.get_cse_signal(rule_id)

    if signal_id:
        # Emit CSE-compatible finding
        return emit_cse_finding(signal_id, ...)
    else:
        # Emit in your native format
        return emit_native_finding(rule_id, ...)

Testing Your Integration

# Install validation tools
npm install -g ajv-cli

# Download CSE schemas
curl -O https://raw.githubusercontent.com/cse-registry/cse-registry/main/schemas/finding.schema.json

# Validate your output
ajv validate -s finding.schema.json -d my-findings.json

# Output on success
my-findings.json valid

Next Steps