Introduction

The gap between threat detection and effective response remains one of the most dangerous vulnerabilities in cloud security. According to industry data, the average mean time to respond (MTTR) to a cloud security incident still hovers around 277 days, giving adversaries a massive window to move laterally, exfiltrate data, and establish persistence. Organizations experienced an average of 1,925 attacks per week in early 2025, with cloud intrusion attempts jumping 75% year-over-year. The message is clear: manual incident response cannot keep pace.

Event-driven security automation changes the equation. Instead of waiting for a human analyst to triage an alert, review context, and decide on a response, you build pipelines that react in seconds. AWS EventBridge captures security events as they happen. Step Functions orchestrate multi-step response workflows with built-in error handling and human approval gates. And open-source threat intelligence platforms like MISP and OpenCTI enrich every alert with real-world context – is this IP part of a known botnet? Does this behavior match a documented APT campaign?

This guide walks you through building a complete event-driven security automation pipeline on AWS, enriched by open-source threat intelligence. You will deploy EventBridge rules that catch GuardDuty findings, Security Hub alerts, and CloudTrail anomalies. You will wire them into Step Functions state machines that orchestrate enrichment, evaluation, and response. And you will integrate MISP, OpenCTI, and Sigma rules so your automation is powered by community-driven intelligence rather than locked into a single vendor.

The approach follows the red-team.sh philosophy: build it yourself with open source for maximum freedom and flexibility. AWS-native services provide the event backbone and orchestration. Open-source tools provide the intelligence and detection logic that you own, control, and can take anywhere.

Event-Driven Security Architecture Overview

Event-Driven Security Automation Architecture Event-driven security automation pipeline: security sources to EventBridge to Step Functions with MISP enrichment and automated response

The architecture follows a straightforward event-driven pattern with four stages:

1. Event Sources – AWS security services generate structured findings and events. GuardDuty produces threat findings. Security Hub aggregates and normalizes findings from multiple services. CloudTrail logs every API call. AWS Config tracks resource configuration changes.

2. Event Bus – Amazon EventBridge acts as the central nervous system. Rules match specific event patterns and route them to the appropriate response workflow. A dedicated custom event bus isolates security events from application events.

3. Orchestration – AWS Step Functions state machines coordinate the response. Each workflow follows a consistent pattern: classify the event, enrich it with threat intelligence, evaluate it against detection rules, decide on a response action, execute that action, and log everything.

4. Intelligence and Response – Lambda functions query MISP and OpenCTI for threat context, evaluate Sigma rules against the event data, and execute response actions like WAF updates, security group modifications, or IAM policy revocations. All events flow to Security Lake in OCSF format for long-term analytics.

This separation of concerns makes the system maintainable. You can update detection rules without touching response logic. You can add new event sources without modifying existing workflows. You can swap enrichment providers without rebuilding the pipeline.

AWS EventBridge for Security Events

EventBridge is the foundation of event-driven security on AWS. Every AWS security service publishes events to the default event bus. Your job is to write rules that match the events you care about and route them to your automation.

Creating a Dedicated Security Event Bus

Isolating security events on a custom event bus prevents interference with application event processing and provides a clear boundary for IAM policies:

1
2
3
4
5
6
7
8
9
{
  "Name": "security-automation-bus",
  "Description": "Dedicated event bus for security automation workflows",
  "Tags": [
    { "Key": "Application", "Value": "security-automation" },
    { "Key": "Environment", "Value": "production" },
    { "Key": "Owner", "Value": "security-team" }
  ]
}

EventBridge Rule Patterns for Security Events

The power of EventBridge lies in its pattern matching. You define JSON patterns that filter events based on any field in the event payload. Here are essential security event patterns:

GuardDuty High-Severity Findings:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
  "source": ["aws.guardduty"],
  "detail-type": ["GuardDuty Finding"],
  "detail": {
    "severity": [{ "numeric": [">=", 7] }],
    "type": [{
      "prefix": "UnauthorizedAccess"
    }, {
      "prefix": "Recon"
    }, {
      "prefix": "Trojan"
    }, {
      "prefix": "CryptoCurrency"
    }]
  }
}

Security Hub Critical and High Findings:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "source": ["aws.securityhub"],
  "detail-type": ["Security Hub Findings - Imported"],
  "detail": {
    "findings": {
      "Severity": {
        "Label": ["CRITICAL", "HIGH"]
      },
      "Workflow": {
        "Status": ["NEW"]
      },
      "RecordState": ["ACTIVE"]
    }
  }
}

CloudTrail Suspicious API Calls:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
  "source": ["aws.cloudtrail"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventName": [
      "StopLogging",
      "DeleteTrail",
      "UpdateTrail",
      "DisableKey",
      "DeleteFlowLogs",
      "DeleteDetector",
      "DisableGuardDuty",
      "PutBucketPolicy",
      "PutBucketAcl",
      "AuthorizeSecurityGroupIngress"
    ]
  }
}

AWS Config Non-Compliant Resource Changes:

1
2
3
4
5
6
7
8
9
10
{
  "source": ["aws.config"],
  "detail-type": ["Config Rules Compliance Change"],
  "detail": {
    "messageType": ["ComplianceChangeNotification"],
    "newEvaluationResult": {
      "complianceType": ["NON_COMPLIANT"]
    }
  }
}

Cross-Account Event Forwarding

In multi-account AWS Organizations setups, forward security events from member accounts to a central security account:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "Version": "2012-10-17",
  "Statement": [{
    "Sid": "AllowSecurityAccountPutEvents",
    "Effect": "Allow",
    "Principal": {
      "AWS": "arn:aws:iam::SECURITY_ACCOUNT_ID:root"
    },
    "Action": "events:PutEvents",
    "Resource": "arn:aws:events:us-east-1:SECURITY_ACCOUNT_ID:event-bus/security-automation-bus"
  }]
}

Step Functions for Orchestrated Incident Response

EventBridge gets events to the right place. Step Functions decide what happens next. The state machine pattern is ideal for security workflows because it provides built-in retry logic, error handling, parallel execution, human approval gates, and a complete audit trail of every decision.

State Machine Design for Security Incidents

The following ASL (Amazon States Language) definition implements a complete incident response workflow. It classifies the incoming event, enriches it with threat intelligence, evaluates it against Sigma rules, decides on the response severity, executes the appropriate actions, and logs everything to Security Lake:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
{
  "Comment": "Security Incident Response Orchestrator",
  "StartAt": "ClassifyEvent",
  "States": {
    "ClassifyEvent": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:classify-security-event",
      "ResultPath": "$.classification",
      "Next": "EnrichParallel",
      "Retry": [{
        "ErrorEquals": ["States.TaskFailed"],
        "IntervalSeconds": 2,
        "MaxAttempts": 3,
        "BackoffRate": 2.0
      }],
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "LogFailure",
        "ResultPath": "$.error"
      }]
    },
    "EnrichParallel": {
      "Type": "Parallel",
      "ResultPath": "$.enrichment",
      "Next": "EvaluateSigmaRules",
      "Branches": [
        {
          "StartAt": "MISPEnrichment",
          "States": {
            "MISPEnrichment": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:misp-enrichment",
              "End": true,
              "Retry": [{
                "ErrorEquals": ["States.TaskFailed"],
                "IntervalSeconds": 5,
                "MaxAttempts": 2,
                "BackoffRate": 2.0
              }]
            }
          }
        },
        {
          "StartAt": "OpenCTIEnrichment",
          "States": {
            "OpenCTIEnrichment": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:opencti-enrichment",
              "End": true,
              "Retry": [{
                "ErrorEquals": ["States.TaskFailed"],
                "IntervalSeconds": 5,
                "MaxAttempts": 2,
                "BackoffRate": 2.0
              }]
            }
          }
        }
      ],
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "EvaluateSigmaRules",
        "ResultPath": "$.enrichmentError"
      }]
    },
    "EvaluateSigmaRules": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:sigma-rule-evaluator",
      "ResultPath": "$.sigmaResults",
      "Next": "DetermineResponseLevel"
    },
    "DetermineResponseLevel": {
      "Type": "Choice",
      "Choices": [
        {
          "And": [
            { "Variable": "$.classification.severity", "StringEquals": "CRITICAL" },
            { "Variable": "$.enrichment[0].iocMatch", "BooleanEquals": true }
          ],
          "Next": "AutomatedCriticalResponse"
        },
        {
          "Variable": "$.classification.severity",
          "StringEquals": "HIGH",
          "Next": "HumanApprovalRequired"
        }
      ],
      "Default": "LogAndMonitor"
    },
    "AutomatedCriticalResponse": {
      "Type": "Parallel",
      "Next": "LogToSecurityLake",
      "Branches": [
        {
          "StartAt": "BlockIP",
          "States": {
            "BlockIP": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:block-ip-waf",
              "End": true
            }
          }
        },
        {
          "StartAt": "IsolateResource",
          "States": {
            "IsolateResource": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:isolate-resource",
              "End": true
            }
          }
        },
        {
          "StartAt": "NotifySecurityTeam",
          "States": {
            "NotifySecurityTeam": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:notify-security-team",
              "End": true
            }
          }
        }
      ]
    },
    "HumanApprovalRequired": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/ACCOUNT_ID/security-approval-queue",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "incident.$": "$",
          "message": "Security incident requires human approval before automated response"
        }
      },
      "TimeoutSeconds": 3600,
      "Next": "AutomatedCriticalResponse",
      "Catch": [{
        "ErrorEquals": ["States.Timeout"],
        "Next": "EscalateTimeout",
        "ResultPath": "$.timeoutError"
      }]
    },
    "EscalateTimeout": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:escalate-timeout",
      "Next": "LogToSecurityLake"
    },
    "LogAndMonitor": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:log-and-monitor",
      "Next": "LogToSecurityLake"
    },
    "LogToSecurityLake": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:log-to-security-lake",
      "End": true
    },
    "LogFailure": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:log-failure",
      "End": true
    }
  }
}

The key design decisions in this state machine:

  • Parallel enrichment – MISP and OpenCTI are queried simultaneously, cutting enrichment time in half.
  • Graceful degradation – If enrichment fails, the workflow continues with what it has rather than stopping entirely.
  • Tiered response – Critical events with confirmed IOC matches get automated response. High-severity events without confirmed IOCs require human approval. Everything else is logged and monitored.
  • Approval timeout – If no human responds within an hour, the workflow escalates rather than letting the incident sit.

Lambda Functions for Enrichment and Response

Each Lambda function in the pipeline has a single responsibility. This makes them independently testable, deployable, and replaceable.

MISP Enrichment Function

This Lambda function takes security event indicators (IP addresses, domains, file hashes) and queries your MISP instance for matching threat intelligence:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import json
import os
import logging
from typing import Any
from urllib.parse import urljoin

import boto3
import urllib3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

http = urllib3.PoolManager()

MISP_URL = os.environ["MISP_URL"]
MISP_API_KEY_PARAM = os.environ["MISP_API_KEY_PARAM"]

ssm = boto3.client("ssm")


def get_misp_api_key() -> str:
    """Retrieve MISP API key from SSM Parameter Store."""
    response = ssm.get_parameter(
        Name=MISP_API_KEY_PARAM,
        WithDecryption=True,
    )
    return response["Parameter"]["Value"]


def extract_indicators(event: dict[str, Any]) -> dict[str, list[str]]:
    """Extract IOCs from security event for MISP lookup."""
    indicators: dict[str, list[str]] = {
        "ip_addresses": [],
        "domains": [],
        "file_hashes": [],
    }

    detail = event.get("detail", {})

    # GuardDuty finding indicators
    if "service" in detail:
        action = detail.get("service", {}).get("action", {})

        # Network connection IPs
        network_info = action.get("networkConnectionAction", {})
        remote_ip = (
            network_info
            .get("remoteIpDetails", {})
            .get("ipAddressV4", "")
        )
        if remote_ip:
            indicators["ip_addresses"].append(remote_ip)

        # DNS query domains
        dns_action = action.get("dnsRequestAction", {})
        domain = dns_action.get("domain", "")
        if domain:
            indicators["domains"].append(domain)

    # CloudTrail source IP
    source_ip = detail.get("sourceIPAddress", "")
    if source_ip and not source_ip.endswith(".amazonaws.com"):
        indicators["ip_addresses"].append(source_ip)

    return indicators


def search_misp(api_key: str, indicator_type: str, value: str) -> dict:
    """Search MISP for a specific indicator."""
    url = urljoin(MISP_URL, "/attributes/restSearch")
    headers = {
        "Authorization": api_key,
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    type_mapping = {
        "ip_addresses": "ip-dst",
        "domains": "domain",
        "file_hashes": "md5",
    }

    payload = {
        "returnFormat": "json",
        "type": type_mapping.get(indicator_type, "text"),
        "value": value,
        "enforceWarninglist": True,
        "includeCorrelations": True,
        "limit": 10,
    }

    response = http.request(
        "POST",
        url,
        body=json.dumps(payload).encode("utf-8"),
        headers=headers,
        timeout=10.0,
    )

    if response.status == 200:
        return json.loads(response.data.decode("utf-8"))
    logger.error("MISP search failed: %s %s", response.status, response.data)
    return {"response": {"Attribute": []}}


def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Lambda handler for MISP threat intelligence enrichment."""
    api_key = get_misp_api_key()
    indicators = extract_indicators(event)

    enrichment_results = {
        "iocMatch": False,
        "matchedIndicators": [],
        "threatLevel": "unknown",
        "relatedEvents": [],
        "tags": [],
    }

    for indicator_type, values in indicators.items():
        for value in values:
            logger.info("Searching MISP for %s: %s", indicator_type, value)
            result = search_misp(api_key, indicator_type, value)

            attributes = (
                result.get("response", {}).get("Attribute", [])
            )

            if attributes:
                enrichment_results["iocMatch"] = True
                enrichment_results["matchedIndicators"].append({
                    "type": indicator_type,
                    "value": value,
                    "mispEventCount": len(attributes),
                    "tags": [
                        tag.get("name", "")
                        for attr in attributes[:3]
                        for tag in attr.get("Tag", [])
                    ],
                    "threatLevel": attributes[0].get(
                        "event_threat_level_id", "unknown"
                    ),
                })

    # Determine overall threat level from matches
    if enrichment_results["matchedIndicators"]:
        threat_levels = [
            m.get("threatLevel", "4")
            for m in enrichment_results["matchedIndicators"]
        ]
        # MISP threat levels: 1=High, 2=Medium, 3=Low, 4=Undefined
        min_level = min(int(tl) for tl in threat_levels if tl.isdigit())
        level_map = {1: "high", 2: "medium", 3: "low", 4: "undefined"}
        enrichment_results["threatLevel"] = level_map.get(
            min_level, "unknown"
        )

    logger.info(
        "MISP enrichment complete. IOC match: %s",
        enrichment_results["iocMatch"],
    )
    return enrichment_results

OpenCTI Enrichment Function

OpenCTI provides a complementary intelligence layer with STIX2-structured data and relationship mapping:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import json
import os
import logging
from typing import Any

import boto3
import urllib3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

http = urllib3.PoolManager()

OPENCTI_URL = os.environ["OPENCTI_URL"]
OPENCTI_TOKEN_PARAM = os.environ["OPENCTI_TOKEN_PARAM"]

ssm = boto3.client("ssm")


def get_opencti_token() -> str:
    """Retrieve OpenCTI token from SSM Parameter Store."""
    response = ssm.get_parameter(
        Name=OPENCTI_TOKEN_PARAM,
        WithDecryption=True,
    )
    return response["Parameter"]["Value"]


def query_opencti(token: str, observable_value: str) -> dict:
    """Query OpenCTI GraphQL API for observable enrichment."""
    url = f"{OPENCTI_URL}/graphql"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    query = """
    query SearchObservable($value: String!) {
      stixCyberObservables(
        search: $value
        first: 5
      ) {
        edges {
          node {
            id
            entity_type
            observable_value
            x_opencti_score
            createdBy { name }
            objectLabel { value }
            indicators {
              edges {
                node {
                  name
                  pattern
                  indicator_types
                  confidence
                  validFrom
                  validUntil
                }
              }
            }
            stixCoreRelationships {
              edges {
                node {
                  relationship_type
                  to {
                    ... on AttackPattern { name x_mitre_id }
                    ... on IntrusionSet { name }
                    ... on Malware { name }
                    ... on Campaign { name }
                  }
                }
              }
            }
          }
        }
      }
    }
    """

    payload = {
        "query": query,
        "variables": {"value": observable_value},
    }

    response = http.request(
        "POST",
        url,
        body=json.dumps(payload).encode("utf-8"),
        headers=headers,
        timeout=15.0,
    )

    if response.status == 200:
        return json.loads(response.data.decode("utf-8"))
    logger.error("OpenCTI query failed: %s", response.status)
    return {"data": {"stixCyberObservables": {"edges": []}}}


def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Lambda handler for OpenCTI threat intelligence enrichment."""
    token = get_opencti_token()

    # Extract indicators from the event (reuse same extraction logic)
    detail = event.get("detail", {})
    observables = []

    source_ip = detail.get("sourceIPAddress", "")
    if source_ip and not source_ip.endswith(".amazonaws.com"):
        observables.append(source_ip)

    service = detail.get("service", {})
    action = service.get("action", {})
    remote_ip = (
        action
        .get("networkConnectionAction", {})
        .get("remoteIpDetails", {})
        .get("ipAddressV4", "")
    )
    if remote_ip:
        observables.append(remote_ip)

    domain = (
        action
        .get("dnsRequestAction", {})
        .get("domain", "")
    )
    if domain:
        observables.append(domain)

    enrichment = {
        "source": "opencti",
        "matchFound": False,
        "observables": [],
        "relatedThreatActors": [],
        "attackPatterns": [],
        "campaigns": [],
        "confidence": 0,
    }

    for observable in observables:
        result = query_opencti(token, observable)
        edges = (
            result
            .get("data", {})
            .get("stixCyberObservables", {})
            .get("edges", [])
        )

        for edge in edges:
            node = edge.get("node", {})
            enrichment["matchFound"] = True
            enrichment["confidence"] = max(
                enrichment["confidence"],
                node.get("x_opencti_score", 0),
            )

            # Extract related threat actors and attack patterns
            for rel_edge in (
                node
                .get("stixCoreRelationships", {})
                .get("edges", [])
            ):
                rel_node = rel_edge.get("node", {})
                rel_to = rel_node.get("to", {})
                rel_type = rel_node.get("relationship_type", "")

                if rel_type in ("indicates", "uses"):
                    if "x_mitre_id" in rel_to:
                        enrichment["attackPatterns"].append({
                            "name": rel_to.get("name"),
                            "mitreId": rel_to.get("x_mitre_id"),
                        })
                    elif rel_type == "attributed-to":
                        enrichment["relatedThreatActors"].append(
                            rel_to.get("name")
                        )

    logger.info(
        "OpenCTI enrichment complete. Match: %s, Confidence: %d",
        enrichment["matchFound"],
        enrichment["confidence"],
    )
    return enrichment

Automated IP Blocking Function

When the orchestrator confirms a malicious IP through threat intel enrichment, this function updates AWS WAF and VPC Network ACLs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import json
import os
import logging
from datetime import datetime, timezone
from typing import Any

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

waf = boto3.client("wafv2")
ec2 = boto3.client("ec2")

WAF_IP_SET_ID = os.environ["WAF_BLOCK_IP_SET_ID"]
WAF_IP_SET_NAME = os.environ["WAF_BLOCK_IP_SET_NAME"]
WAF_SCOPE = os.environ.get("WAF_SCOPE", "REGIONAL")
NACL_ID = os.environ.get("NACL_ID", "")


def block_ip_waf(ip_address: str) -> dict:
    """Add malicious IP to WAF IP set for blocking."""
    # Get current IP set to retrieve lock token
    ip_set = waf.get_ip_set(
        Name=WAF_IP_SET_NAME,
        Scope=WAF_SCOPE,
        Id=WAF_IP_SET_ID,
    )

    current_addresses = ip_set["IPSet"]["Addresses"]
    cidr = f"{ip_address}/32"

    if cidr in current_addresses:
        logger.info("IP %s already blocked in WAF", ip_address)
        return {"action": "already_blocked", "ip": ip_address}

    current_addresses.append(cidr)

    waf.update_ip_set(
        Name=WAF_IP_SET_NAME,
        Scope=WAF_SCOPE,
        Id=WAF_IP_SET_ID,
        Addresses=current_addresses,
        LockToken=ip_set["LockToken"],
        Description=f"Auto-blocked {ip_address} at "
                    f"{datetime.now(timezone.utc).isoformat()}",
    )

    logger.info("Blocked IP %s in WAF IP set", ip_address)
    return {"action": "blocked", "ip": ip_address, "target": "waf"}


def block_ip_nacl(ip_address: str) -> dict:
    """Add deny rule to Network ACL for the malicious IP."""
    if not NACL_ID:
        return {"action": "skipped", "reason": "No NACL configured"}

    # Find the lowest available rule number above 50
    nacl = ec2.describe_network_acls(
        NetworkAclIds=[NACL_ID],
    )

    existing_rules = [
        entry["RuleNumber"]
        for entry in nacl["NetworkAcls"][0]["Entries"]
        if not entry["Egress"]
    ]

    rule_number = 50
    while rule_number in existing_rules:
        rule_number += 1

    ec2.create_network_acl_entry(
        NetworkAclId=NACL_ID,
        RuleNumber=rule_number,
        Protocol="-1",
        RuleAction="deny",
        Egress=False,
        CidrBlock=f"{ip_address}/32",
    )

    logger.info(
        "Blocked IP %s in NACL %s rule %d",
        ip_address, NACL_ID, rule_number,
    )
    return {
        "action": "blocked",
        "ip": ip_address,
        "target": "nacl",
        "ruleNumber": rule_number,
    }


def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Lambda handler to block malicious IPs via WAF and NACL."""
    detail = event.get("detail", {})

    # Extract IP from various event formats
    ip_address = detail.get("sourceIPAddress", "")
    if not ip_address:
        service = detail.get("service", {})
        ip_address = (
            service
            .get("action", {})
            .get("networkConnectionAction", {})
            .get("remoteIpDetails", {})
            .get("ipAddressV4", "")
        )

    if not ip_address:
        return {"action": "no_ip_found", "event": event}

    results = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "ip": ip_address,
        "waf": block_ip_waf(ip_address),
        "nacl": block_ip_nacl(ip_address),
    }

    return results

MISP Threat Intelligence Integration

MISP (Malware Information Sharing Platform) is the backbone of community-driven threat intelligence. Running your own MISP instance on AWS gives you full control over your threat intelligence pipeline, access to dozens of community feeds, and the ability to share intelligence with trusted partners.

Deploying MISP on AWS

The most reliable way to deploy MISP on AWS is with a containerized setup on an EC2 instance. Here is a Docker Compose configuration optimized for AWS deployment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# docker-compose.yml for MISP on AWS EC2
version: "3.8"

services:
  misp-core:
    image: ghcr.io/misp/misp-docker/misp-core:latest
    restart: unless-stopped
    ports:
      - "443:443"
      - "80:80"
    environment:
      MISP_BASEURL: "https://misp.internal.yourdomain.com"
      MISP_ORG: "YourOrganization"
      MISP_EMAIL: "misp@yourdomain.com"
      MYSQL_HOST: "misp-db"
      MYSQL_DATABASE: "misp"
      MYSQL_USER: "misp"
      MYSQL_PASSWORD_FILE: "/run/secrets/mysql_password"
      REDIS_HOST: "misp-redis"
    volumes:
      - misp-data:/var/www/MISP/app/files
      - misp-config:/var/www/MISP/app/Config
      - misp-certs:/etc/nginx/certs
    depends_on:
      - misp-db
      - misp-redis
    secrets:
      - mysql_password

  misp-db:
    image: mysql:8.0
    restart: unless-stopped
    environment:
      MYSQL_DATABASE: "misp"
      MYSQL_USER: "misp"
      MYSQL_PASSWORD_FILE: "/run/secrets/mysql_password"
      MYSQL_ROOT_PASSWORD_FILE: "/run/secrets/mysql_root_password"
    volumes:
      - mysql-data:/var/lib/mysql
    secrets:
      - mysql_password
      - mysql_root_password

  misp-redis:
    image: redis:7-alpine
    restart: unless-stopped
    volumes:
      - redis-data:/data

  misp-modules:
    image: ghcr.io/misp/misp-docker/misp-modules:latest
    restart: unless-stopped
    environment:
      REDIS_BACKEND: "misp-redis"
    depends_on:
      - misp-redis

volumes:
  misp-data:
  misp-config:
  misp-certs:
  mysql-data:
  redis-data:

secrets:
  mysql_password:
    file: ./secrets/mysql_password.txt
  mysql_root_password:
    file: ./secrets/mysql_root_password.txt

MISP Feed Integration Script

Automate the ingestion of community threat intelligence feeds into your MISP instance:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
"""MISP feed management and synchronization script.

Configures and enables community threat intelligence feeds
for automated ingestion into your MISP instance.
"""

import json
import logging
import sys

from pymisp import PyMISP

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Community feeds to enable
FEEDS = [
    {
        "name": "CIRCL OSINT Feed",
        "provider": "CIRCL",
        "url": "https://www.circl.lu/doc/misp/feed-osint/",
        "source_format": "misp",
        "enabled": True,
        "caching_enabled": True,
    },
    {
        "name": "Botvrij.eu",
        "provider": "Botvrij",
        "url": "https://www.botvrij.eu/data/feed-osint/",
        "source_format": "misp",
        "enabled": True,
        "caching_enabled": True,
    },
    {
        "name": "abuse.ch URLhaus",
        "provider": "abuse.ch",
        "url": "https://urlhaus.abuse.ch/downloads/misp/",
        "source_format": "misp",
        "enabled": True,
        "caching_enabled": True,
    },
    {
        "name": "abuse.ch Feodo Tracker",
        "provider": "abuse.ch",
        "url": "https://feodotracker.abuse.ch/downloads/misp/",
        "source_format": "misp",
        "enabled": True,
        "caching_enabled": True,
    },
    {
        "name": "PhishTank",
        "provider": "PhishTank",
        "url": "https://data.phishtank.com/data/online-valid.csv",
        "source_format": "csv",
        "enabled": True,
        "caching_enabled": True,
    },
]


def configure_feeds(misp_url: str, misp_key: str) -> None:
    """Configure and enable threat intelligence feeds in MISP."""
    misp = PyMISP(misp_url, misp_key, ssl=True)

    existing_feeds = misp.feeds()
    existing_names = {
        f["Feed"]["name"]
        for f in existing_feeds
        if "Feed" in f
    }

    for feed_config in FEEDS:
        if feed_config["name"] in existing_names:
            logger.info("Feed already exists: %s", feed_config["name"])
            continue

        feed = misp.add_feed(
            name=feed_config["name"],
            provider=feed_config["provider"],
            url=feed_config["url"],
            source_format=feed_config["source_format"],
            enabled=feed_config["enabled"],
            caching_enabled=feed_config["caching_enabled"],
            lookup_visible=True,
        )
        logger.info("Added feed: %s", feed_config["name"])

    # Trigger a fetch for all enabled feeds
    logger.info("Fetching all enabled feeds...")
    misp.fetch_feed("all")
    logger.info("Feed fetch initiated for all enabled feeds.")


def main() -> None:
    if len(sys.argv) != 3:
        print(f"Usage: {sys.argv[0]} <misp_url> <misp_api_key>")
        sys.exit(1)

    configure_feeds(sys.argv[1], sys.argv[2])


if __name__ == "__main__":
    main()

OpenCTI as Threat Intel Management Layer

While MISP excels at indicator sharing and community feeds, OpenCTI provides a knowledge management layer that maps relationships between threat actors, campaigns, attack patterns, and observables using the STIX2 standard. Running both together gives you the best of both worlds: MISP for rapid indicator ingestion and sharing, OpenCTI for strategic intelligence analysis and MITRE ATT&CK mapping.

OpenCTI integrates natively with MISP through its connector architecture. The MISP connector imports events and attributes from your MISP instance, maps them to STIX2 objects, and builds relationship graphs automatically. This means your Lambda enrichment functions can query either platform depending on the intelligence need – MISP for fast IOC lookups, OpenCTI for strategic context like related threat actors and attack patterns.

Key capabilities that make OpenCTI valuable in this pipeline:

  • STIX2 Native – All data is structured using the STIX2 standard, making it interoperable with any STIX-compatible tool.
  • Relationship Mapping – Automatically builds graphs connecting observables to indicators, indicators to threat actors, and threat actors to campaigns.
  • Confidence Scoring – Every piece of intelligence includes a confidence score, helping your automation make better decisions about response severity.
  • MITRE ATT&CK Integration – Threat intelligence is mapped to ATT&CK techniques, giving your security team immediate context about adversary behavior.
  • GraphQL API – The API is purpose-built for programmatic access, making Lambda integration straightforward.

Sigma Rules: Detection-as-Code That Works Anywhere

Sigma rules are the open-source answer to vendor-locked detection content. A Sigma rule defines a detection pattern in a YAML format that can be compiled to any SIEM query language, EventBridge pattern, CloudWatch Logs Insights query, or Athena SQL. You write the detection logic once and deploy it everywhere.

Sigma Rule Examples for AWS CloudTrail

Detect CloudTrail Logging Disabled:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
title: AWS CloudTrail Logging Disabled
id: d7b7a68c-7e4f-4d62-bf8e-6a1e3e47c5f0
status: stable
description: |
  Detects when CloudTrail logging is stopped or the trail is
  deleted, a common technique used by attackers to cover their tracks.
references:
  - https://attack.mitre.org/techniques/T1562/008/
author: red-team.sh
date: 2026/04/15
tags:
  - attack.defense_evasion
  - attack.t1562.008
logsource:
  product: aws
  service: cloudtrail
detection:
  selection:
    eventName:
      - StopLogging
      - DeleteTrail
      - UpdateTrail
    eventSource: cloudtrail.amazonaws.com
  condition: selection
falsepositives:
  - Legitimate administrative trail maintenance
  - Infrastructure as Code deployments
level: high

Detect GuardDuty Detector Disabled:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
title: AWS GuardDuty Detector Disabled or Deleted
id: a3e2c8b1-9f5d-4a67-b2c1-8d4e7f6a9b3c
status: stable
description: |
  Detects attempts to disable or delete GuardDuty detectors,
  which would blind the organization to threat detection.
references:
  - https://attack.mitre.org/techniques/T1562/001/
author: red-team.sh
date: 2026/04/15
tags:
  - attack.defense_evasion
  - attack.t1562.001
logsource:
  product: aws
  service: cloudtrail
detection:
  selection:
    eventName:
      - DeleteDetector
      - UpdateDetector
      - DisassociateMembers
    eventSource: guardduty.amazonaws.com
  filter_update:
    eventName: UpdateDetector
    requestParameters.enable: true
  condition: selection and not filter_update
falsepositives:
  - Legitimate detector reconfiguration
level: critical

Detect Unauthorized IAM Policy Attachment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
title: Sensitive IAM Policy Attached to User or Role
id: f8d2a1c7-6b3e-4f91-a5d8-2c7e9b4f1a6d
status: experimental
description: |
  Detects when highly privileged IAM policies are attached
  to users or roles, potentially indicating privilege escalation.
references:
  - https://attack.mitre.org/techniques/T1098/
author: red-team.sh
date: 2026/04/15
tags:
  - attack.persistence
  - attack.privilege_escalation
  - attack.t1098
logsource:
  product: aws
  service: cloudtrail
detection:
  selection:
    eventName:
      - AttachUserPolicy
      - AttachRolePolicy
      - AttachGroupPolicy
      - PutUserPolicy
      - PutRolePolicy
    eventSource: iam.amazonaws.com
  sensitive_policies:
    requestParameters.policyArn|contains:
      - "AdministratorAccess"
      - "PowerUserAccess"
      - "IAMFullAccess"
      - "SecurityAudit"
  condition: selection and sensitive_policies
falsepositives:
  - Legitimate IAM administration
  - Automated role provisioning during deployments
level: high

Sigma Rule Evaluator Lambda

This function loads Sigma rules at runtime and evaluates incoming CloudTrail events against them:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import json
import os
import logging
from typing import Any
from pathlib import Path

import boto3
import yaml

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client("s3")

SIGMA_RULES_BUCKET = os.environ["SIGMA_RULES_BUCKET"]
SIGMA_RULES_PREFIX = os.environ.get("SIGMA_RULES_PREFIX", "sigma-rules/aws/")


def load_sigma_rules() -> list[dict]:
    """Load Sigma rules from S3 bucket."""
    rules = []
    paginator = s3.get_paginator("list_objects_v2")

    for page in paginator.paginate(
        Bucket=SIGMA_RULES_BUCKET,
        Prefix=SIGMA_RULES_PREFIX,
    ):
        for obj in page.get("Contents", []):
            if obj["Key"].endswith((".yml", ".yaml")):
                response = s3.get_object(
                    Bucket=SIGMA_RULES_BUCKET,
                    Key=obj["Key"],
                )
                rule_content = response["Body"].read().decode("utf-8")
                try:
                    rule = yaml.safe_load(rule_content)
                    rule["_source_key"] = obj["Key"]
                    rules.append(rule)
                except yaml.YAMLError as e:
                    logger.error(
                        "Failed to parse rule %s: %s", obj["Key"], e
                    )

    logger.info("Loaded %d Sigma rules", len(rules))
    return rules


def evaluate_selection(
    event_detail: dict, selection: dict
) -> bool:
    """Evaluate a Sigma selection block against an event."""
    for field, expected_values in selection.items():
        # Handle pipe modifiers like |contains
        field_name = field.split("|")[0]
        modifier = field.split("|")[1] if "|" in field else None

        # Navigate nested fields with dot notation
        value = event_detail
        for part in field_name.split("."):
            if isinstance(value, dict):
                value = value.get(part)
            else:
                value = None
                break

        if value is None:
            return False

        if not isinstance(expected_values, list):
            expected_values = [expected_values]

        if modifier == "contains":
            if not any(
                str(ev).lower() in str(value).lower()
                for ev in expected_values
            ):
                return False
        else:
            if str(value) not in [str(ev) for ev in expected_values]:
                return False

    return True


def evaluate_rule(
    rule: dict, event_detail: dict
) -> dict[str, Any] | None:
    """Evaluate a single Sigma rule against a CloudTrail event."""
    detection = rule.get("detection", {})
    condition = detection.get("condition", "")

    # Extract named detection blocks
    blocks = {
        k: v for k, v in detection.items()
        if k not in ("condition", "timeframe")
    }

    # Evaluate each block
    block_results = {}
    for name, selection in blocks.items():
        if isinstance(selection, dict):
            block_results[name] = evaluate_selection(
                event_detail, selection
            )
        elif isinstance(selection, list):
            block_results[name] = any(
                evaluate_selection(event_detail, sel)
                for sel in selection
                if isinstance(sel, dict)
            )

    # Simple condition evaluation
    matched = False
    if " and not " in condition:
        parts = condition.split(" and not ")
        positive = parts[0].strip()
        negative = parts[1].strip()
        matched = (
            block_results.get(positive, False)
            and not block_results.get(negative, False)
        )
    elif " and " in condition:
        parts = condition.split(" and ")
        matched = all(
            block_results.get(p.strip(), False) for p in parts
        )
    elif " or " in condition:
        parts = condition.split(" or ")
        matched = any(
            block_results.get(p.strip(), False) for p in parts
        )
    else:
        matched = block_results.get(condition.strip(), False)

    if matched:
        return {
            "ruleTitle": rule.get("title"),
            "ruleId": rule.get("id"),
            "level": rule.get("level", "medium"),
            "tags": rule.get("tags", []),
            "description": rule.get("description", ""),
            "sourceFile": rule.get("_source_key", ""),
        }
    return None


def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Evaluate incoming event against all loaded Sigma rules."""
    rules = load_sigma_rules()
    event_detail = event.get("detail", event)

    matches = []
    for rule in rules:
        result = evaluate_rule(rule, event_detail)
        if result:
            matches.append(result)

    highest_level = "informational"
    level_order = {
        "informational": 0, "low": 1, "medium": 2,
        "high": 3, "critical": 4,
    }
    for match in matches:
        if level_order.get(
            match["level"], 0
        ) > level_order.get(highest_level, 0):
            highest_level = match["level"]

    return {
        "sigmaMatches": len(matches),
        "highestLevel": highest_level,
        "matchedRules": matches,
        "evaluatedRuleCount": len(rules),
    }

Complete Pipeline: End-to-End Automated Response

Let us walk through a concrete scenario to see how all the pieces work together.

Scenario: Compromised IAM Credentials

  1. Detection – An attacker uses stolen IAM credentials from an unusual geographic location. GuardDuty detects the anomaly and generates an UnauthorizedAccess:IAMUser/ConsoleLoginSuccess.B finding with severity 8.

  2. Event Routing – The GuardDuty finding is published to the default EventBridge bus. Our rule matches on severity >= 7 and UnauthorizedAccess prefix, routing the event to the Step Functions state machine.

  3. Classification – The classify Lambda function examines the finding type, extracts the source IP, affected IAM principal, and affected resources. It tags the event as credential-compromise with severity CRITICAL.

  4. Parallel Enrichment – Two Lambda functions execute simultaneously:
    • The MISP enrichment function searches for the source IP across all community feeds. It finds the IP listed in abuse.ch feeds as part of a known credential-stuffing botnet.
    • The OpenCTI enrichment function queries the STIX2 knowledge base and finds the IP associated with a threat actor group known for targeting AWS environments, mapped to MITRE ATT&CK technique T1078 (Valid Accounts).
  5. Sigma Evaluation – The Sigma rule evaluator checks the CloudTrail event associated with the finding. It matches the “Console Login from Unusual Country” rule at high level.

  6. Response Decision – The Choice state evaluates: severity is CRITICAL and IOC match is true. This triggers the automated critical response path without human approval.

  7. Automated Response – Three actions execute in parallel:
    • WAF IP set updated to block the source IP.
    • IAM access key deactivated and session tokens revoked for the compromised user.
    • Security team notified via SNS with full enrichment context.
  8. Logging – The complete incident timeline, enrichment data, and response actions are written to Security Lake in OCSF format.

Total elapsed time from detection to response: under 30 seconds.

Scenario: Unauthorized API Reconnaissance

An attacker begins enumerating your AWS environment by calling DescribeInstances, ListBuckets, GetAccountAuthorizationDetails, and other reconnaissance APIs from a newly created IAM role. CloudTrail captures every call. EventBridge matches on the suspicious API pattern. The Step Functions workflow enriches the source with MISP (no match – this is a new IP) and evaluates Sigma rules (matches “AWS API Reconnaissance Activity” at medium level). Because the severity is medium and there is no confirmed IOC, the workflow routes to the log-and-monitor path, creating a Security Hub finding and alerting the SOC for manual investigation.

Scenario: S3 Bucket Policy Made Public

AWS Config detects a non-compliant resource change when a developer accidentally makes an S3 bucket publicly accessible. The Config compliance change event hits EventBridge, triggers the state machine, and the response workflow immediately reverts the bucket policy and notifies the bucket owner. No threat intel enrichment is needed here – the response is purely policy-driven.

Terraform: Deploying the Complete Pipeline

Infrastructure as Code ensures your security automation is reproducible, version-controlled, and auditable. Here is the Terraform configuration for the complete pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
# main.tf - Event-Driven Security Automation Pipeline

terraform {
  required_version = ">= 1.5"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }

  backend "s3" {
    bucket = "skynet-tf-state-prod"
    key    = "security-automation/terraform.tfstate"
    region = "us-east-1"
  }
}

provider "aws" {
  region = var.aws_region

  default_tags {
    tags = {
      Application = "security-automation"
      Environment = var.environment
      Owner       = "security-team"
      Costcenter  = "security"
      Customer    = "internal"
    }
  }
}

variable "aws_region" {
  default = "us-east-1"
}

variable "environment" {
  default = "prod"
}

variable "misp_url" {
  type      = string
  sensitive = true
}

# -------------------------------------------------------------------
# Custom EventBridge Bus
# -------------------------------------------------------------------
resource "aws_cloudwatch_event_bus" "security" {
  name = "security-automation-bus"

  tags = {
    Application = "security-automation"
  }
}

# -------------------------------------------------------------------
# EventBridge Rules
# -------------------------------------------------------------------
resource "aws_cloudwatch_event_rule" "guardduty_high" {
  name           = "guardduty-high-severity"
  description    = "Capture high-severity GuardDuty findings"
  event_bus_name = "default"

  event_pattern = jsonencode({
    source      = ["aws.guardduty"]
    detail-type = ["GuardDuty Finding"]
    detail = {
      severity = [{ numeric = [">=", 7] }]
    }
  })
}

resource "aws_cloudwatch_event_rule" "securityhub_critical" {
  name           = "securityhub-critical-findings"
  description    = "Capture critical Security Hub findings"
  event_bus_name = "default"

  event_pattern = jsonencode({
    source      = ["aws.securityhub"]
    detail-type = ["Security Hub Findings - Imported"]
    detail = {
      findings = {
        Severity = { Label = ["CRITICAL", "HIGH"] }
        Workflow = { Status = ["NEW"] }
      }
    }
  })
}

resource "aws_cloudwatch_event_rule" "cloudtrail_suspicious" {
  name           = "cloudtrail-suspicious-api"
  description    = "Capture suspicious CloudTrail API calls"
  event_bus_name = "default"

  event_pattern = jsonencode({
    source      = ["aws.cloudtrail"]
    detail-type = ["AWS API Call via CloudTrail"]
    detail = {
      eventName = [
        "StopLogging", "DeleteTrail", "DeleteDetector",
        "DisableKey", "DeleteFlowLogs", "PutBucketAcl",
      ]
    }
  })
}

# -------------------------------------------------------------------
# Step Functions State Machine
# -------------------------------------------------------------------
resource "aws_sfn_state_machine" "incident_response" {
  name     = "security-incident-response"
  role_arn = aws_iam_role.step_functions.arn

  definition = templatefile("${path.module}/state-machine.asl.json", {
    account_id   = data.aws_caller_identity.current.account_id
    region       = var.aws_region
    approval_queue = aws_sqs_queue.approval.url
  })

  logging_configuration {
    log_destination        = "${aws_cloudwatch_log_group.step_functions.arn}:*"
    include_execution_data = true
    level                  = "ERROR"
  }
}

# -------------------------------------------------------------------
# EventBridge Targets -> Step Functions
# -------------------------------------------------------------------
resource "aws_cloudwatch_event_target" "guardduty_to_sfn" {
  rule      = aws_cloudwatch_event_rule.guardduty_high.name
  target_id = "guardduty-to-step-functions"
  arn       = aws_sfn_state_machine.incident_response.arn
  role_arn  = aws_iam_role.eventbridge_sfn.arn
}

resource "aws_cloudwatch_event_target" "securityhub_to_sfn" {
  rule      = aws_cloudwatch_event_rule.securityhub_critical.name
  target_id = "securityhub-to-step-functions"
  arn       = aws_sfn_state_machine.incident_response.arn
  role_arn  = aws_iam_role.eventbridge_sfn.arn
}

resource "aws_cloudwatch_event_target" "cloudtrail_to_sfn" {
  rule      = aws_cloudwatch_event_rule.cloudtrail_suspicious.name
  target_id = "cloudtrail-to-step-functions"
  arn       = aws_sfn_state_machine.incident_response.arn
  role_arn  = aws_iam_role.eventbridge_sfn.arn
}

# -------------------------------------------------------------------
# Lambda Functions
# -------------------------------------------------------------------
resource "aws_lambda_function" "misp_enrichment" {
  function_name = "misp-enrichment"
  role          = aws_iam_role.lambda_enrichment.arn
  handler       = "misp_enrichment.handler"
  runtime       = "python3.12"
  timeout       = 30
  memory_size   = 256

  filename         = data.archive_file.misp_enrichment.output_path
  source_code_hash = data.archive_file.misp_enrichment.output_base64sha256

  environment {
    variables = {
      MISP_URL           = var.misp_url
      MISP_API_KEY_PARAM = "/security-automation/${var.environment}/misp-api-key"
    }
  }

  layers = [aws_lambda_layer_version.common_deps.arn]
}

resource "aws_lambda_function" "opencti_enrichment" {
  function_name = "opencti-enrichment"
  role          = aws_iam_role.lambda_enrichment.arn
  handler       = "opencti_enrichment.handler"
  runtime       = "python3.12"
  timeout       = 30
  memory_size   = 256

  filename         = data.archive_file.opencti_enrichment.output_path
  source_code_hash = data.archive_file.opencti_enrichment.output_base64sha256

  environment {
    variables = {
      OPENCTI_URL         = "https://opencti.internal.yourdomain.com"
      OPENCTI_TOKEN_PARAM = "/security-automation/${var.environment}/opencti-token"
    }
  }

  layers = [aws_lambda_layer_version.common_deps.arn]
}

resource "aws_lambda_function" "sigma_evaluator" {
  function_name = "sigma-rule-evaluator"
  role          = aws_iam_role.lambda_enrichment.arn
  handler       = "sigma_evaluator.handler"
  runtime       = "python3.12"
  timeout       = 60
  memory_size   = 512

  filename         = data.archive_file.sigma_evaluator.output_path
  source_code_hash = data.archive_file.sigma_evaluator.output_base64sha256

  environment {
    variables = {
      SIGMA_RULES_BUCKET = aws_s3_bucket.sigma_rules.id
      SIGMA_RULES_PREFIX = "sigma-rules/aws/"
    }
  }

  layers = [aws_lambda_layer_version.common_deps.arn]
}

resource "aws_lambda_function" "block_ip" {
  function_name = "block-ip-waf"
  role          = aws_iam_role.lambda_response.arn
  handler       = "block_ip.handler"
  runtime       = "python3.12"
  timeout       = 15
  memory_size   = 128

  filename         = data.archive_file.block_ip.output_path
  source_code_hash = data.archive_file.block_ip.output_base64sha256

  environment {
    variables = {
      WAF_BLOCK_IP_SET_ID   = aws_wafv2_ip_set.blocked_ips.id
      WAF_BLOCK_IP_SET_NAME = aws_wafv2_ip_set.blocked_ips.name
      WAF_SCOPE             = "REGIONAL"
    }
  }
}

# -------------------------------------------------------------------
# WAF IP Set for Automated Blocking
# -------------------------------------------------------------------
resource "aws_wafv2_ip_set" "blocked_ips" {
  name               = "automated-blocked-ips"
  scope              = "REGIONAL"
  ip_address_version = "IPV4"
  addresses          = []

  description = "IPs blocked by automated security response"
}

# -------------------------------------------------------------------
# S3 Bucket for Sigma Rules
# -------------------------------------------------------------------
resource "aws_s3_bucket" "sigma_rules" {
  bucket = "security-automation-sigma-rules-${data.aws_caller_identity.current.account_id}"
}

resource "aws_s3_bucket_versioning" "sigma_rules" {
  bucket = aws_s3_bucket.sigma_rules.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "sigma_rules" {
  bucket = aws_s3_bucket.sigma_rules.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "aws:kms"
    }
  }
}

resource "aws_s3_bucket_public_access_block" "sigma_rules" {
  bucket = aws_s3_bucket.sigma_rules.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# -------------------------------------------------------------------
# SQS Queue for Human Approval
# -------------------------------------------------------------------
resource "aws_sqs_queue" "approval" {
  name                       = "security-incident-approval"
  message_retention_seconds  = 86400
  visibility_timeout_seconds = 3600

  sqs_managed_sse_enabled = true
}

# -------------------------------------------------------------------
# CloudWatch Log Group for Step Functions
# -------------------------------------------------------------------
resource "aws_cloudwatch_log_group" "step_functions" {
  name              = "/aws/stepfunctions/security-incident-response"
  retention_in_days = 90
}

# -------------------------------------------------------------------
# SNS Topic for Security Alerts
# -------------------------------------------------------------------
resource "aws_sns_topic" "security_alerts" {
  name = "security-automation-alerts"
}

# -------------------------------------------------------------------
# IAM Roles (abbreviated for clarity)
# -------------------------------------------------------------------
data "aws_caller_identity" "current" {}

resource "aws_iam_role" "step_functions" {
  name = "security-automation-step-functions"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "states.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role" "eventbridge_sfn" {
  name = "security-automation-eventbridge-sfn"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "events.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "eventbridge_start_sfn" {
  name = "start-step-functions"
  role = aws_iam_role.eventbridge_sfn.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect   = "Allow"
      Action   = "states:StartExecution"
      Resource = aws_sfn_state_machine.incident_response.arn
    }]
  })
}

resource "aws_iam_role" "lambda_enrichment" {
  name = "security-automation-lambda-enrichment"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "lambda_enrichment" {
  name = "enrichment-permissions"
  role = aws_iam_role.lambda_enrichment.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "ssm:GetParameter",
          "ssm:GetParameters",
        ]
        Resource = "arn:aws:ssm:${var.aws_region}:${data.aws_caller_identity.current.account_id}:parameter/security-automation/*"
      },
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:ListBucket",
        ]
        Resource = [
          aws_s3_bucket.sigma_rules.arn,
          "${aws_s3_bucket.sigma_rules.arn}/*",
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
    ]
  })
}

resource "aws_iam_role" "lambda_response" {
  name = "security-automation-lambda-response"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "lambda_response" {
  name = "response-permissions"
  role = aws_iam_role.lambda_response.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "wafv2:GetIPSet",
          "wafv2:UpdateIPSet",
        ]
        Resource = aws_wafv2_ip_set.blocked_ips.arn
      },
      {
        Effect = "Allow"
        Action = [
          "ec2:CreateNetworkAclEntry",
          "ec2:DescribeNetworkAcls",
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "iam:UpdateAccessKey",
          "iam:ListAccessKeys",
        ]
        Resource = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:user/*"
      },
      {
        Effect = "Allow"
        Action = [
          "sns:Publish",
        ]
        Resource = aws_sns_topic.security_alerts.arn
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
    ]
  })
}

# -------------------------------------------------------------------
# Outputs
# -------------------------------------------------------------------
output "event_bus_arn" {
  value = aws_cloudwatch_event_bus.security.arn
}

output "state_machine_arn" {
  value = aws_sfn_state_machine.incident_response.arn
}

output "sigma_rules_bucket" {
  value = aws_s3_bucket.sigma_rules.id
}

Security Lake Integration for Long-Term Analytics

AWS Security Lake normalizes all security data into the Open Cybersecurity Schema Framework (OCSF) format and stores it in Apache Parquet in S3. This gives you a unified data lake where GuardDuty findings, CloudTrail events, VPC Flow Logs, and your custom automation logs all share the same schema.

Writing Automation Results to Security Lake

Your Step Functions workflow should write every incident response action to Security Lake for long-term analytics:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
import os
import logging
from datetime import datetime, timezone
from typing import Any

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client("s3")

SECURITY_LAKE_BUCKET = os.environ["SECURITY_LAKE_BUCKET"]
SECURITY_LAKE_PREFIX = os.environ.get(
    "SECURITY_LAKE_PREFIX", "ext/custom-security-automation/"
)


def to_ocsf_detection_finding(event: dict[str, Any]) -> dict:
    """Convert automation result to OCSF Detection Finding format."""
    now = datetime.now(timezone.utc)

    return {
        "metadata": {
            "version": "1.1.0",
            "product": {
                "name": "Security Automation Pipeline",
                "vendor_name": "red-team.sh",
                "feature": {"name": "Automated Incident Response"},
            },
            "log_name": "security-automation",
            "logged_time": int(now.timestamp() * 1000),
        },
        "class_uid": 2004,  # Detection Finding
        "category_uid": 2,  # Findings
        "type_uid": 200401,  # Detection Finding: Create
        "severity_id": _severity_to_id(
            event.get("classification", {}).get("severity", "MEDIUM")
        ),
        "time": int(now.timestamp() * 1000),
        "activity_id": 1,  # Create
        "finding_info": {
            "title": event.get(
                "classification", {}
            ).get("type", "Security Incident"),
            "uid": event.get(
                "detail", {}
            ).get("id", "unknown"),
            "types": [
                event.get(
                    "classification", {}
                ).get("category", "unknown")
            ],
        },
        "enrichments": _build_enrichments(event),
        "remediation": {
            "desc": json.dumps(
                event.get("responseActions", {})
            ),
        },
        "status_id": 1,  # New
    }


def _severity_to_id(severity: str) -> int:
    mapping = {
        "INFORMATIONAL": 1,
        "LOW": 2,
        "MEDIUM": 3,
        "HIGH": 4,
        "CRITICAL": 5,
    }
    return mapping.get(severity.upper(), 0)


def _build_enrichments(event: dict[str, Any]) -> list[dict]:
    enrichments = []
    misp_data = event.get("enrichment", [{}])[0] if event.get("enrichment") else {}
    if misp_data.get("iocMatch"):
        enrichments.append({
            "name": "MISP Threat Intelligence",
            "data": json.dumps(misp_data.get("matchedIndicators", [])),
            "provider": "MISP",
            "type": "threat_intelligence",
        })
    return enrichments


def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Write incident response results to Security Lake in OCSF format."""
    ocsf_record = to_ocsf_detection_finding(event)

    now = datetime.now(timezone.utc)
    key = (
        f"{SECURITY_LAKE_PREFIX}"
        f"region={os.environ.get('AWS_REGION', 'us-east-1')}/"
        f"year={now.year}/month={now.month:02d}/day={now.day:02d}/"
        f"incident-{now.strftime('%Y%m%dT%H%M%SZ')}.json"
    )

    s3.put_object(
        Bucket=SECURITY_LAKE_BUCKET,
        Key=key,
        Body=json.dumps(ocsf_record).encode("utf-8"),
        ContentType="application/json",
        ServerSideEncryption="aws:kms",
    )

    logger.info("Wrote OCSF record to s3://%s/%s", SECURITY_LAKE_BUCKET, key)
    return {"logged": True, "key": key}

Querying Security Lake with Athena

Once your automation data flows into Security Lake, you can query it with Athena for trend analysis and metrics:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
-- Find all automated responses in the last 7 days
SELECT
    finding_info.title AS finding_type,
    severity_id,
    COUNT(*) AS incident_count,
    AVG(
        CAST(
            json_extract_scalar(remediation.desc, '$.elapsed_seconds')
            AS DOUBLE
        )
    ) AS avg_response_seconds
FROM security_lake_db.custom_security_automation
WHERE time > (CAST(now() AS BIGINT) - 604800000)
GROUP BY finding_info.title, severity_id
ORDER BY incident_count DESC;

-- Identify top threat intelligence matches
SELECT
    json_extract_scalar(enrichment, '$.provider') AS intel_source,
    json_extract_scalar(enrichment, '$.name') AS enrichment_type,
    COUNT(*) AS match_count
FROM security_lake_db.custom_security_automation
CROSS JOIN UNNEST(enrichments) AS t(enrichment)
WHERE time > (CAST(now() AS BIGINT) - 2592000000)
GROUP BY 1, 2
ORDER BY match_count DESC;

-- Mean time to respond by severity
SELECT
    CASE severity_id
        WHEN 5 THEN 'CRITICAL'
        WHEN 4 THEN 'HIGH'
        WHEN 3 THEN 'MEDIUM'
        WHEN 2 THEN 'LOW'
        ELSE 'INFO'
    END AS severity,
    COUNT(*) AS total_incidents,
    AVG(
        CAST(
            json_extract_scalar(remediation.desc, '$.elapsed_seconds')
            AS DOUBLE
        )
    ) AS avg_mttr_seconds
FROM security_lake_db.custom_security_automation
WHERE time > (CAST(now() AS BIGINT) - 2592000000)
GROUP BY severity_id
ORDER BY severity_id DESC;

Monitoring and Tuning Your Automation

Deploying automation is not the end – it is the beginning. You need to monitor the pipeline itself for health, accuracy, and performance.

Key Metrics to Track

Pipeline Health:

  • Step Functions execution success/failure rate
  • Lambda function error rates and duration
  • EventBridge rule invocation counts
  • MISP and OpenCTI API response times and availability

Detection Quality:

  • True positive rate – confirmed incidents that were correctly automated
  • False positive rate – benign events that triggered automated response
  • False negative rate – incidents missed by the pipeline (discovered through other means)
  • Sigma rule match distribution – which rules are firing and how often

Response Effectiveness:

  • Mean time to detect (MTTD) – time from event occurrence to pipeline trigger
  • Mean time to respond (MTTR) – time from pipeline trigger to response completion
  • Escalation rate – how often the pipeline requires human intervention
  • Rollback rate – how often automated responses are reversed

CloudWatch Dashboard

Create a CloudWatch dashboard that tracks these metrics. Set alarms for:

  • Step Functions execution failures exceeding 5% in any 15-minute window
  • Lambda enrichment function timeouts (MISP or OpenCTI unavailable)
  • EventBridge rule invocation spikes (possible event storm or attack)
  • Approval queue depth exceeding 10 messages (human bottleneck)

Tuning Your Sigma Rules

Sigma rules require ongoing tuning. Review matched rules weekly:

  1. High false positive rules – tighten the detection criteria or add filter conditions for known legitimate activity.
  2. Rules that never fire – verify the rule logic against actual CloudTrail event formats. The field names may have changed.
  3. Missing coverage – compare your Sigma rule set against the MITRE ATT&CK matrix for AWS. Identify techniques without detection coverage.
  4. Severity calibration – adjust rule severity based on your environment. A rule that is critical in a production account might be low in a sandbox account.

MISP Feed Maintenance

Review your MISP feeds monthly:

  • Disable feeds with high false positive rates
  • Add new community feeds as they become available
  • Monitor feed freshness – stale feeds provide outdated intelligence
  • Review warninglist effectiveness – ensure legitimate services (CDNs, cloud providers) are properly excluded

Implementation Roadmap

Week 1-2: Foundation

  • Deploy EventBridge rules for GuardDuty, Security Hub, and CloudTrail
  • Create the Step Functions state machine with basic classification and logging
  • Deploy the initial set of Lambda functions (classify, log, notify)

Week 3-4: Enrichment

  • Deploy MISP on EC2 with Docker Compose
  • Configure community threat intelligence feeds
  • Deploy and integrate the MISP enrichment Lambda
  • (Optional) Deploy OpenCTI and connect to MISP

Week 5-6: Detection and Response

  • Upload initial Sigma rule set to S3
  • Deploy the Sigma evaluator Lambda
  • Implement automated response functions (IP blocking, resource isolation)
  • Configure human approval workflows for high-severity events

Week 7-8: Analytics and Tuning

  • Enable Security Lake and configure custom source
  • Build Athena queries and CloudWatch dashboards
  • Tune Sigma rules based on initial findings
  • Document runbooks for human escalation scenarios

Conclusion

Event-driven security automation with EventBridge, Step Functions, and open-source threat intelligence gives you the fastest path from detection to response. The AWS-native event backbone handles scale and reliability. Open-source tools like MISP, OpenCTI, and Sigma rules give you freedom – freedom to choose your intelligence sources, freedom to write detection logic that works across any platform, and freedom to avoid vendor lock-in on the intelligence that matters most.

The pipeline we built responds to critical threats in under 30 seconds, enriches every alert with community threat intelligence, evaluates events against vendor-neutral detection rules, and logs everything in OCSF format for long-term analytics. More importantly, every component is replaceable. You can swap MISP for a commercial threat intel platform. You can replace Sigma rules with Splunk queries. You can add new event sources without touching existing workflows. The architecture supports evolution.

Start with the foundation: EventBridge rules catching your highest-value security events. Add Step Functions orchestration when you need multi-step workflows. Layer in MISP enrichment when you want threat context. Deploy Sigma rules when you want detection-as-code. Each layer adds value independently, and together they form a complete security orchestration platform that you own and control.

Build it yourself. Own your intelligence. Automate your defense.


Jon (JR) Price is a DevSecOps engineer and cloud security architect specializing in AWS security automation. Connect with him on LinkedIn to discuss event-driven security architectures and open-source threat intelligence.

Updated: