API Examples
Practical code examples for integrating with the CSE Registry API in various programming languages.
Python
Basic Usage
import requests
API_KEY = "your_api_key"
BASE_URL = "https://api.cseregistry.org/v1"
headers = {
"Authorization": f"Bearer {API_KEY}"
}
# List all high-severity HIPAA signals
response = requests.get(
f"{BASE_URL}/signals",
headers=headers,
params={
"domain": "HIPAA",
"severity": "high",
"per_page": 50
}
)
data = response.json()
for signal in data["data"]:
print(f"{signal['id']}: {signal['title']}")With Error Handling and Retry
import requests
import time
from typing import Optional
class CSEClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.cseregistry.org/v1"
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}"
})
def _request(self, method: str, endpoint: str, **kwargs) -> dict:
url = f"{self.base_url}{endpoint}"
max_retries = 3
for attempt in range(max_retries):
response = self.session.request(method, url, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
wait_time = min(retry_after * (2 ** attempt), 300)
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.json()
raise Exception("Max retries exceeded")
def get_signal(self, signal_id: str) -> dict:
"""Fetch a specific signal by ID."""
return self._request("GET", f"/signals/{signal_id}")["data"]
def list_signals(
self,
domain: Optional[str] = None,
category: Optional[str] = None,
severity: Optional[str] = None,
page: int = 1,
per_page: int = 20
) -> dict:
"""List signals with optional filtering."""
params = {"page": page, "per_page": per_page}
if domain:
params["domain"] = domain
if category:
params["category"] = category
if severity:
params["severity"] = severity
return self._request("GET", "/signals", params=params)
def get_mappings(self, signal_id: str) -> list:
"""Get control mappings for a signal."""
return self._request("GET", f"/signals/{signal_id}/mappings")["data"]["mappings"]
def search(self, query: str, domain: Optional[str] = None) -> list:
"""Full-text search across signals."""
params = {"q": query}
if domain:
params["domain"] = domain
return self._request("GET", "/search", params=params)["data"]
# Usage
client = CSEClient("your_api_key")
# Get a specific signal
signal = client.get_signal("CSE-HIPAA-TECH-ENCRYPT-REST-001")
print(f"Signal: {signal['title']}")
print(f"Severity: {signal['severity']}")
# Get mappings
mappings = client.get_mappings("CSE-HIPAA-TECH-ENCRYPT-REST-001")
for m in mappings:
print(f" {m['framework']} {m['control_id']}: {m['control_title']}")
# Search for encryption-related signals
results = client.search("encryption", domain="HIPAA")
print(f"\nFound {len(results)} encryption-related signals")JavaScript / TypeScript
Basic Usage
const API_KEY = 'your_api_key';
const BASE_URL = 'https://api.cseregistry.org/v1';
async function listSignals(domain, severity) {
const params = new URLSearchParams({
domain,
severity,
per_page: '50'
});
const response = await fetch(`${BASE_URL}/signals?${params}`, {
headers: {
'Authorization': `Bearer ${API_KEY}`
}
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return response.json();
}
// Usage
const { data: signals, meta } = await listSignals('HIPAA', 'high');
console.log(`Found ${meta.total} signals`);
signals.forEach(s => console.log(`${s.id}: ${s.title}`));TypeScript Client Class
interface Signal {
id: string;
domain: string;
category: string;
title: string;
description: string;
severity: 'critical' | 'high' | 'medium' | 'low' | 'info';
tags: string[];
version: string;
}
interface Mapping {
framework: string;
control_id: string;
control_title: string;
relationship: 'primary' | 'supporting' | 'partial';
rationale: string;
}
interface ListResponse<T> {
data: T[];
meta: {
total: number;
page: number;
per_page: number;
total_pages: number;
};
}
class CSEClient {
private baseUrl = 'https://api.cseregistry.org/v1';
constructor(private apiKey: string) {}
private async request<T>(endpoint: string, params?: Record<string, string>): Promise<T> {
const url = new URL(`${this.baseUrl}${endpoint}`);
if (params) {
Object.entries(params).forEach(([k, v]) => url.searchParams.set(k, v));
}
const response = await fetch(url.toString(), {
headers: { 'Authorization': `Bearer ${this.apiKey}` }
});
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get('Retry-After') || '60');
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
return this.request(endpoint, params);
}
if (!response.ok) {
const error = await response.json();
throw new Error(error.error?.message || 'API request failed');
}
return response.json();
}
async getSignal(id: string): Promise<Signal> {
const response = await this.request<{ data: Signal }>(`/signals/${id}`);
return response.data;
}
async listSignals(options?: {
domain?: string;
category?: string;
severity?: string;
page?: number;
perPage?: number;
}): Promise<ListResponse<Signal>> {
const params: Record<string, string> = {};
if (options?.domain) params.domain = options.domain;
if (options?.category) params.category = options.category;
if (options?.severity) params.severity = options.severity;
if (options?.page) params.page = String(options.page);
if (options?.perPage) params.per_page = String(options.perPage);
return this.request('/signals', params);
}
async getMappings(signalId: string): Promise<Mapping[]> {
const response = await this.request<{ data: { mappings: Mapping[] } }>(
`/signals/${signalId}/mappings`
);
return response.data.mappings;
}
async search(query: string, domain?: string): Promise<Signal[]> {
const params: Record<string, string> = { q: query };
if (domain) params.domain = domain;
const response = await this.request<{ data: Signal[] }>('/search', params);
return response.data;
}
}
// Usage
const client = new CSEClient('your_api_key');
const signal = await client.getSignal('CSE-HIPAA-TECH-ENCRYPT-REST-001');
console.log(signal.title);
const mappings = await client.getMappings(signal.id);
mappings.forEach(m => console.log(`${m.framework} ${m.control_id}`));Go
package main
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
)
const baseURL = "https://api.cseregistry.org/v1"
type Signal struct {
ID string `json:"id"`
Domain string `json:"domain"`
Category string `json:"category"`
Title string `json:"title"`
Description string `json:"description"`
Severity string `json:"severity"`
Tags []string `json:"tags"`
}
type SignalResponse struct {
Data Signal `json:"data"`
}
type ListResponse struct {
Data []Signal `json:"data"`
Meta struct {
Total int `json:"total"`
Page int `json:"page"`
PerPage int `json:"per_page"`
TotalPages int `json:"total_pages"`
} `json:"meta"`
}
type CSEClient struct {
apiKey string
httpClient *http.Client
}
func NewCSEClient(apiKey string) *CSEClient {
return &CSEClient{
apiKey: apiKey,
httpClient: &http.Client{},
}
}
func (c *CSEClient) request(endpoint string, params url.Values) (*http.Response, error) {
reqURL := baseURL + endpoint
if params != nil {
reqURL += "?" + params.Encode()
}
req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
return c.httpClient.Do(req)
}
func (c *CSEClient) GetSignal(id string) (*Signal, error) {
resp, err := c.request("/signals/"+id, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result SignalResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result.Data, nil
}
func (c *CSEClient) ListSignals(domain, severity string) (*ListResponse, error) {
params := url.Values{}
if domain != "" {
params.Set("domain", domain)
}
if severity != "" {
params.Set("severity", severity)
}
resp, err := c.request("/signals", params)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result ListResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
func main() {
client := NewCSEClient("your_api_key")
// Get a specific signal
signal, err := client.GetSignal("CSE-HIPAA-TECH-ENCRYPT-REST-001")
if err != nil {
panic(err)
}
fmt.Printf("Signal: %s\n", signal.Title)
fmt.Printf("Severity: %s\n", signal.Severity)
// List high-severity HIPAA signals
list, err := client.ListSignals("HIPAA", "high")
if err != nil {
panic(err)
}
fmt.Printf("\nFound %d signals\n", list.Meta.Total)
for _, s := range list.Data {
fmt.Printf(" %s: %s\n", s.ID, s.Title)
}
}cURL
# Set your API key
export CSE_API_KEY="your_api_key"
# List signals
curl "https://api.cseregistry.org/v1/signals?domain=HIPAA&severity=high" \
-H "Authorization: Bearer $CSE_API_KEY"
# Get a specific signal
curl "https://api.cseregistry.org/v1/signals/CSE-HIPAA-TECH-ENCRYPT-REST-001" \
-H "Authorization: Bearer $CSE_API_KEY"
# Get mappings for a signal
curl "https://api.cseregistry.org/v1/signals/CSE-HIPAA-TECH-ENCRYPT-REST-001/mappings" \
-H "Authorization: Bearer $CSE_API_KEY"
# Search signals
curl "https://api.cseregistry.org/v1/search?q=encryption" \
-H "Authorization: Bearer $CSE_API_KEY"
# Get registry statistics
curl "https://api.cseregistry.org/v1/stats" \
-H "Authorization: Bearer $CSE_API_KEY"
# Pretty print with jq
curl -s "https://api.cseregistry.org/v1/signals?domain=HIPAA" \
-H "Authorization: Bearer $CSE_API_KEY" | jq '.data[] | {id, title, severity}'Common Patterns
Paginating Through All Results
async function getAllSignals(client, domain) {
const allSignals = [];
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await client.listSignals({
domain,
page,
perPage: 100
});
allSignals.push(...response.data);
hasMore = page < response.meta.total_pages;
page++;
}
return allSignals;
}
const allHIPAASignals = await getAllSignals(client, 'HIPAA');
console.log(`Total: ${allHIPAASignals.length} HIPAA signals`);Building a Compliance Report
async function generateComplianceReport(client, findings) {
const report = {
generated: new Date().toISOString(),
findings: [],
controlsSummary: {}
};
for (const finding of findings) {
// Fetch signal details
const signal = await client.getSignal(finding.signal_id);
// Fetch framework mappings
const mappings = await client.getMappings(finding.signal_id);
report.findings.push({
...finding,
signal,
affected_controls: mappings.map(m => ({
framework: m.framework,
control: m.control_id,
relationship: m.relationship
}))
});
// Aggregate controls
for (const mapping of mappings) {
const key = `${mapping.framework}:${mapping.control_id}`;
if (!report.controlsSummary[key]) {
report.controlsSummary[key] = {
framework: mapping.framework,
control: mapping.control_id,
title: mapping.control_title,
finding_count: 0
};
}
report.controlsSummary[key].finding_count++;
}
}
return report;
}