GRC Systems Integration

This guide covers integration patterns for enterprise GRC platforms, enabling automated evidence collection, control testing, and compliance reporting using CSE signals.

GRC Integration Use Cases

  • Automated control testing: Map security tool findings to control objectives automatically
  • Evidence collection: Use CSE findings as audit evidence with full traceability
  • Continuous compliance: Track control status in real-time based on finding streams
  • Cross-framework reporting: Single finding data source for multi-framework compliance

Control Objective Mapping

Map CSE signals to your GRC platform's control library. The CSE mapping dataset provides pre-built relationships to standard framework controls.

class GRCControlMapper:
    def __init__(self, cse_client, grc_client):
        self.cse = cse_client
        self.grc = grc_client

    def sync_control_mappings(self, framework: str):
        """
        Sync CSE mappings to GRC control library.
        Run periodically to keep mappings current.
        """
        # Get all signals with mappings to this framework
        signals = self.cse.list_signals(per_page=100)

        for signal in signals["data"]:
            mappings = self.cse.get_mappings(signal["id"])

            for mapping in mappings:
                if mapping["framework"] != framework:
                    continue

                # Find corresponding control in GRC
                grc_control = self.grc.find_control(
                    framework=framework,
                    control_id=mapping["control_id"]
                )

                if grc_control:
                    # Link CSE signal to GRC control
                    self.grc.add_signal_mapping(
                        control_id=grc_control["id"],
                        signal_id=signal["id"],
                        relationship=mapping["relationship"],
                        rationale=mapping["rationale"]
                    )

    def get_control_status(self, control_id: str) -> dict:
        """
        Calculate control status based on linked CSE findings.
        """
        mappings = self.grc.get_signal_mappings(control_id)
        findings = []

        for mapping in mappings:
            signal_findings = self.grc.get_findings_by_signal(
                mapping["signal_id"]
            )
            findings.extend(signal_findings)

        # Calculate status
        open_findings = [f for f in findings if f["status"] == "open"]
        critical_high = [f for f in open_findings
                        if f["severity"] in ["critical", "high"]]

        if critical_high:
            status = "failing"
        elif open_findings:
            status = "at_risk"
        else:
            status = "passing"

        return {
            "control_id": control_id,
            "status": status,
            "total_findings": len(findings),
            "open_findings": len(open_findings),
            "critical_high_findings": len(critical_high)
        }

Evidence Collection

CSE findings and artifacts serve as audit evidence. Include full traceability from finding to source.

class EvidenceCollector:
    def __init__(self, cse_client, grc_client):
        self.cse = cse_client
        self.grc = grc_client

    def collect_evidence_for_control(
        self,
        control_id: str,
        period_start: str,
        period_end: str
    ) -> dict:
        """
        Collect CSE-based evidence for a control assessment.
        """
        mappings = self.grc.get_signal_mappings(control_id)
        evidence_items = []

        for mapping in mappings:
            findings = self.grc.get_findings_by_signal(
                signal_id=mapping["signal_id"],
                start_date=period_start,
                end_date=period_end
            )

            for finding in findings:
                evidence_items.append({
                    "type": "cse_finding",
                    "finding_id": finding["id"],
                    "signal_id": finding["signal_id"],
                    "observed_at": finding["observed_at"],
                    "status": finding["status"],
                    "resource": finding["artifact"],
                    "evidence_data": finding.get("evidence", {}),
                    "source_tool": finding.get("source", {}).get("tool"),
                    "relationship": mapping["relationship"]
                })

        return {
            "control_id": control_id,
            "period": {"start": period_start, "end": period_end},
            "evidence_count": len(evidence_items),
            "evidence_items": evidence_items,
            "generated_at": datetime.utcnow().isoformat()
        }

    def generate_audit_package(
        self,
        framework: str,
        period_start: str,
        period_end: str
    ) -> dict:
        """
        Generate complete audit evidence package for a framework.
        """
        controls = self.grc.get_controls_by_framework(framework)
        package = {
            "framework": framework,
            "period": {"start": period_start, "end": period_end},
            "generated_at": datetime.utcnow().isoformat(),
            "controls": []
        }

        for control in controls:
            evidence = self.collect_evidence_for_control(
                control["id"],
                period_start,
                period_end
            )

            status = self.calculate_control_status(evidence)

            package["controls"].append({
                "control_id": control["control_id"],
                "control_title": control["title"],
                "status": status,
                "evidence": evidence
            })

        return package

Continuous Compliance

Process findings in real-time to maintain current control status:

class ContinuousComplianceProcessor:
    def __init__(self, grc_client, control_mapper):
        self.grc = grc_client
        self.mapper = control_mapper

    def process_finding(self, finding: dict):
        """
        Process incoming CSE finding and update control status.
        Called via webhook or message queue.
        """
        signal_id = finding["signal_id"]

        # Get all controls affected by this signal
        affected_controls = self.grc.get_controls_by_signal(signal_id)

        for control in affected_controls:
            # Store the finding
            self.grc.store_finding(finding)

            # Recalculate control status
            status = self.mapper.get_control_status(control["id"])

            # Update control in GRC
            self.grc.update_control_status(
                control_id=control["id"],
                status=status["status"],
                last_tested=datetime.utcnow().isoformat(),
                open_findings=status["open_findings"]
            )

            # Check for status change
            if status["status"] != control["previous_status"]:
                self.notify_status_change(control, status)

    def notify_status_change(self, control: dict, new_status: dict):
        """Send notification when control status changes."""
        if new_status["status"] == "failing":
            self.grc.create_issue(
                title=f"Control {control['control_id']} now failing",
                severity="high",
                control_id=control["id"],
                details=new_status
            )

Relationship Types in GRC

CSE mapping relationships have specific implications for control testing:

RelationshipGRC InterpretationStatus Impact
primaryFinding directly indicates control failureOpen finding = control failing
supportingFinding provides additional evidenceContributes to control assessment
partialFinding addresses part of the controlMultiple partials may be needed
def calculate_control_status(evidence: dict) -> str:
    """
    Calculate control status with relationship weighting.
    """
    primary_findings = [
        e for e in evidence["evidence_items"]
        if e["relationship"] == "primary" and e["status"] == "open"
    ]

    # Any open primary finding = control failing
    if primary_findings:
        return "failing"

    supporting_findings = [
        e for e in evidence["evidence_items"]
        if e["relationship"] == "supporting" and e["status"] == "open"
    ]

    # Many supporting findings = at risk
    if len(supporting_findings) > 3:
        return "at_risk"

    # Check for partial coverage
    partial_findings = [
        e for e in evidence["evidence_items"]
        if e["relationship"] == "partial"
    ]

    if partial_findings:
        open_partials = [f for f in partial_findings if f["status"] == "open"]
        if open_partials:
            return "partial_failing"

    return "passing"

Reporting Integration

class ComplianceReporter:
    def generate_executive_summary(
        self,
        framework: str,
        period: dict
    ) -> dict:
        """
        Generate executive summary for compliance reporting.
        """
        controls = self.grc.get_controls_by_framework(framework)

        summary = {
            "framework": framework,
            "period": period,
            "total_controls": len(controls),
            "passing": 0,
            "at_risk": 0,
            "failing": 0,
            "top_issues": []
        }

        issue_counts = {}

        for control in controls:
            status = control["status"]
            summary[status] = summary.get(status, 0) + 1

            if status == "failing":
                findings = self.grc.get_open_findings_for_control(
                    control["id"]
                )
                for f in findings:
                    signal = f["signal_id"]
                    issue_counts[signal] = issue_counts.get(signal, 0) + 1

        # Top issues by occurrence
        sorted_issues = sorted(
            issue_counts.items(),
            key=lambda x: x[1],
            reverse=True
        )[:10]

        for signal_id, count in sorted_issues:
            signal = self.cse.get_signal(signal_id)
            summary["top_issues"].append({
                "signal_id": signal_id,
                "title": signal["title"],
                "severity": signal["severity"],
                "occurrence_count": count
            })

        summary["compliance_score"] = (
            summary["passing"] / summary["total_controls"] * 100
        )

        return summary

Best Practices

  • Sync mappings regularly: Run mapping sync weekly to catch new signals
  • Weight by relationship: Primary findings should have stronger impact than supporting
  • Track evidence lineage: Maintain full chain from finding to source tool
  • Set assessment periods: Align evidence collection with audit periods
  • Handle exceptions: Allow risk acceptance for findings that don't apply

Next Steps