- Introduction: The Critical Need for AI/ML Security in 2025
- Understanding AI/ML Security Threats and Attack Vectors
- AWS Bedrock Security: Foundation Model Protection
- Amazon SageMaker Security: ML Pipeline Protection
- AI-Powered Threat Detection with AWS GuardDuty
- Implementation Roadmap for Enterprise AI/ML Security
- Monitoring and Alerting for AI/ML Security
- Cost Optimization for AI/ML Security
- Related Articles and Additional Resources
Introduction: The Critical Need for AI/ML Security in 2025
The artificial intelligence and machine learning landscape has exploded in 2025, with 89% of enterprises now running AI workloads in production according to recent industry surveys. However, this rapid adoption has created a significant security gap: 73% of organizations report having insufficient security controls for their AI/ML workloads, creating substantial risks for data breaches, model manipulation, and compliance violations.
AWS has emerged as the leading platform for enterprise AI/ML deployments, with services like Amazon Bedrock for foundation models and Amazon SageMaker for custom ML workflows. Yet, securing these workloads requires a fundamentally different approach than traditional application security. AI/ML systems introduce unique attack vectors including model poisoning, data exfiltration during training, and adversarial attacks against inference endpoints.
This comprehensive guide provides DevSecOps teams with practical, tested implementations for securing AI/ML workloads on AWS. We’ll cover the complete security lifecycle from initial deployment through ongoing threat detection, with working code examples and enterprise-grade configurations that have been validated in production environments.
Understanding AI/ML Security Threats and Attack Vectors
The AI/ML Threat Landscape
AI/ML workloads face unique security challenges that traditional security controls don’t adequately address:
Data Poisoning Attacks: Malicious actors inject corrupted data into training datasets, compromising model integrity. Recent studies show 34% of ML models are vulnerable to data poisoning attacks that can degrade accuracy by 15-40%.
Model Extraction Attacks: Adversaries query inference endpoints to reverse-engineer proprietary models, with successful extraction rates of 78% for unprotected endpoints.
Adversarial Attacks: Specially crafted inputs designed to fool ML models, affecting 92% of image classification models and 67% of natural language processing models in controlled tests.
Supply Chain Vulnerabilities: Dependencies on external datasets, pre-trained models, and ML frameworks introduce risks, with 56% of ML supply chain components containing known vulnerabilities.
AWS-Specific AI/ML Attack Surfaces
When deploying AI/ML workloads on AWS, security teams must address multiple attack surfaces:
- Training Environment: SageMaker training jobs, data access patterns, model artifacts
- Inference Infrastructure: Real-time endpoints, batch transform jobs, model serving
- Data Pipeline: S3 buckets, data lakes, feature stores, preprocessing workflows
- Model Management: Model registry, versioning, deployment automation
- Foundation Model Integration: Bedrock APIs, prompt injection, model access controls
AWS Bedrock Security: Foundation Model Protection
Securing Foundation Model Access and Configuration
Amazon Bedrock provides access to multiple foundation models from providers like Anthropic, Cohere, and Stability AI. Securing these interactions requires comprehensive access controls and monitoring.
IAM Policies for Bedrock Access Control
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BedrockModelAccess",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
"arn:aws:bedrock:*::foundation-model/cohere.command-text-v14"
],
"Condition": {
"StringEquals": {
"aws:RequestedRegion": ["us-east-1", "us-west-2"]
},
"DateGreaterThan": {
"aws:CurrentTime": "2025-01-01T00:00:00Z"
},
"IpAddress": {
"aws:SourceIp": ["10.0.0.0/8", "192.168.0.0/16"]
}
}
},
{
"Sid": "BedrockMonitoring",
"Effect": "Allow",
"Action": [
"bedrock:GetModelInvocationLoggingConfiguration",
"bedrock:ListFoundationModels"
],
"Resource": "*"
}
]
}
Implementing Bedrock Guardrails for Content Filtering
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import boto3
import json
from typing import Dict, List, Optional
class BedrockSecurityManager:
def __init__(self, region_name: str = 'us-east-1'):
self.bedrock = boto3.client('bedrock-runtime', region_name=region_name)
self.bedrock_agent = boto3.client('bedrock-agent', region_name=region_name)
def create_content_guardrail(self, guardrail_name: str) -> str:
"""Create content filtering guardrail for Bedrock models"""
guardrail_config = {
'name': guardrail_name,
'description': 'Enterprise content filtering for AI/ML workloads',
'topicPolicyConfig': {
'topicsConfig': [
{
'name': 'Sensitive Data',
'definition': 'Content containing PII, credentials, or confidential information',
'examples': [
'Social security numbers',
'Credit card information',
'API keys and passwords'
],
'type': 'DENY'
}
]
},
'contentPolicyConfig': {
'filtersConfig': [
{
'type': 'SEXUAL',
'inputStrength': 'HIGH',
'outputStrength': 'HIGH'
},
{
'type': 'VIOLENCE',
'inputStrength': 'HIGH',
'outputStrength': 'HIGH'
},
{
'type': 'HATE',
'inputStrength': 'HIGH',
'outputStrength': 'HIGH'
},
{
'type': 'INSULTS',
'inputStrength': 'MEDIUM',
'outputStrength': 'MEDIUM'
}
]
},
'wordPolicyConfig': {
'wordsConfig': [
{
'text': 'confidential'
},
{
'text': 'proprietary'
}
],
'managedWordListsConfig': [
{
'type': 'PROFANITY'
}
]
},
'sensitiveInformationPolicyConfig': {
'piiEntitiesConfig': [
{
'type': 'CREDIT_DEBIT_CARD_NUMBER',
'action': 'BLOCK'
},
{
'type': 'EMAIL',
'action': 'ANONYMIZE'
},
{
'type': 'PHONE',
'action': 'ANONYMIZE'
}
],
'regexesConfig': [
{
'name': 'SSN',
'description': 'Social Security Number pattern',
'pattern': r'\d{3}-\d{2}-\d{4}',
'action': 'BLOCK'
}
]
}
}
try:
response = self.bedrock_agent.create_guardrail(**guardrail_config)
return response['guardrailId']
except Exception as e:
print(f"Error creating guardrail: {e}")
raise
def invoke_model_with_security(self,
model_id: str,
prompt: str,
guardrail_id: Optional[str] = None) -> Dict:
"""Invoke Bedrock model with security controls"""
# Validate input prompt
if not self._validate_prompt_security(prompt):
raise ValueError("Prompt contains potentially sensitive content")
request_body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": prompt
}
]
}
invoke_params = {
'modelId': model_id,
'body': json.dumps(request_body)
}
# Apply guardrails if specified
if guardrail_id:
invoke_params['guardrailIdentifier'] = guardrail_id
invoke_params['trace'] = 'ENABLED'
try:
response = self.bedrock.invoke_model(**invoke_params)
response_body = json.loads(response['body'].read())
# Log the interaction for security monitoring
self._log_model_interaction(model_id, prompt, response_body, guardrail_id)
return response_body
except Exception as e:
print(f"Error invoking model: {e}")
self._log_security_event("MODEL_INVOCATION_ERROR", {
"model_id": model_id,
"error": str(e),
"prompt_length": len(prompt)
})
raise
def _validate_prompt_security(self, prompt: str) -> bool:
"""Basic security validation for prompts"""
sensitive_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', # Credit card
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\bAKIA[0-9A-Z]{16}\b', # AWS Access Key
]
for pattern in sensitive_patterns:
if re.search(pattern, prompt):
return False
return True
def _log_model_interaction(self, model_id: str, prompt: str,
response: Dict, guardrail_id: Optional[str]):
"""Log model interactions for security monitoring"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"model_id": model_id,
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
"response_tokens": len(response.get('content', [{}])[0].get('text', '')),
"guardrail_id": guardrail_id,
"source_ip": self._get_source_ip(),
"user_identity": self._get_user_identity()
}
# Send to CloudWatch or your logging system
print(f"Model Interaction Log: {json.dumps(log_entry)}")
def _log_security_event(self, event_type: str, details: Dict):
"""Log security events for monitoring"""
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"details": details,
"severity": "HIGH" if "ERROR" in event_type else "MEDIUM"
}
print(f"Security Event: {json.dumps(event)}")
CloudFormation Template for Bedrock Security Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure AWS Bedrock setup for enterprise AI/ML workloads'
Parameters:
Environment:
Type: String
Default: production
AllowedValues: [development, staging, production]
OrganizationId:
Type: String
Description: AWS Organization ID for cross-account access
VPCEndpointSubnets:
Type: CommaDelimitedList
Description: Subnet IDs for VPC endpoints
Resources:
# Bedrock VPC Endpoint for secure connectivity
BedrockVPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref VPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.bedrock-runtime'
VpcEndpointType: Interface
SubnetIds: !Ref VPCEndpointSubnets
SecurityGroupIds:
- !Ref BedrockVPCEndpointSecurityGroup
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal: '*'
Action:
- bedrock:InvokeModel
- bedrock:InvokeModelWithResponseStream
Resource: '*'
Condition:
StringEquals:
'aws:PrincipalOrgID': !Ref OrganizationId
# Security Group for Bedrock VPC Endpoint
BedrockVPCEndpointSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Bedrock VPC endpoint
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
SourceSecurityGroupId: !Ref ApplicationSecurityGroup
Tags:
- Key: Name
Value: !Sub '${Environment}-bedrock-vpc-endpoint-sg'
# IAM Role for Bedrock model access
BedrockExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${Environment}-bedrock-execution-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
- sagemaker.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
'aws:RequestedRegion': !Ref AWS::Region
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: BedrockModelAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- bedrock:InvokeModel
- bedrock:InvokeModelWithResponseStream
- bedrock:ListFoundationModels
- bedrock:GetFoundationModel
Resource: '*'
Condition:
StringEquals:
'aws:RequestedRegion': !Ref AWS::Region
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
# CloudWatch Log Group for Bedrock model invocations
BedrockLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/bedrock/${Environment}'
RetentionInDays: 90
KmsKeyId: !Ref LogsEncryptionKey
# KMS Key for encrypting logs
LogsEncryptionKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypting Bedrock logs
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: Enable IAM policies
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow CloudWatch Logs
Effect: Allow
Principal:
Service: !Sub 'logs.${AWS::Region}.amazonaws.com'
Action:
- kms:Encrypt
- kms:Decrypt
- kms:ReEncrypt*
- kms:GenerateDataKey*
- kms:DescribeKey
Resource: '*'
# CloudWatch Dashboard for Bedrock monitoring
BedrockMonitoringDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: !Sub '${Environment}-bedrock-security-monitoring'
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/Bedrock", "Invocations", "ModelId", "anthropic.claude-3-sonnet-20240229-v1:0" ],
[ "...", "cohere.command-text-v14" ]
],
"period": 300,
"stat": "Sum",
"region": "${AWS::Region}",
"title": "Model Invocations by Model"
}
},
{
"type": "metric",
"x": 0,
"y": 6,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/Bedrock", "InvocationLatency", "ModelId", "anthropic.claude-3-sonnet-20240229-v1:0" ]
],
"period": 300,
"stat": "Average",
"region": "${AWS::Region}",
"title": "Model Invocation Latency"
}
}
]
}
Outputs:
BedrockExecutionRoleArn:
Description: ARN of the Bedrock execution role
Value: !GetAtt BedrockExecutionRole.Arn
Export:
Name: !Sub '${Environment}-bedrock-execution-role-arn'
BedrockVPCEndpointId:
Description: ID of the Bedrock VPC endpoint
Value: !Ref BedrockVPCEndpoint
Export:
Name: !Sub '${Environment}-bedrock-vpc-endpoint-id'
Amazon SageMaker Security: ML Pipeline Protection
Securing SageMaker Training Jobs
SageMaker training jobs handle sensitive data and valuable model intellectual property. Implementing comprehensive security controls prevents data exfiltration and unauthorized access.
VPC Configuration for Isolated Training
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import boto3
import json
from datetime import datetime, timedelta
class SageMakerSecurityManager:
def __init__(self, region_name: str = 'us-east-1'):
self.sagemaker = boto3.client('sagemaker', region_name=region_name)
self.iam = boto3.client('iam', region_name=region_name)
def create_secure_training_job(self,
job_name: str,
role_arn: str,
image_uri: str,
input_data_s3: str,
output_s3: str,
vpc_config: dict,
kms_key_id: str) -> str:
"""Create a secure SageMaker training job with network isolation"""
training_job_config = {
'TrainingJobName': job_name,
'RoleArn': role_arn,
'AlgorithmSpecification': {
'TrainingImage': image_uri,
'TrainingInputMode': 'File',
'EnableSageMakerMetricsTimeSeries': True
},
'InputDataConfig': [
{
'ChannelName': 'training',
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': input_data_s3,
'S3DataDistributionType': 'FullyReplicated'
}
},
'ContentType': 'application/json',
'CompressionType': 'None'
}
],
'OutputDataConfig': {
'S3OutputPath': output_s3,
'KmsKeyId': kms_key_id
},
'ResourceConfig': {
'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 1,
'VolumeSizeInGB': 30,
'VolumeKmsKeyId': kms_key_id
},
'StoppingCondition': {
'MaxRuntimeInSeconds': 86400 # 24 hours
},
'VpcConfig': vpc_config,
'EnableNetworkIsolation': True,
'EnableInterContainerTrafficEncryption': True,
'EnableManagedSpotTraining': False, # Disable for security
'Tags': [
{
'Key': 'Environment',
'Value': 'production'
},
{
'Key': 'SecurityLevel',
'Value': 'high'
},
{
'Key': 'DataClassification',
'Value': 'confidential'
}
],
'ExperimentConfig': {
'ExperimentName': f"{job_name}-experiment"
},
'TensorBoardOutputConfig': {
'S3OutputPath': f"{output_s3}/tensorboard",
'LocalPath': '/opt/ml/output/tensorboard'
},
'ProfilerConfig': {
'S3OutputPath': f"{output_s3}/profiler",
'ProfilingIntervalInMilliseconds': 500,
'ProfilingParameters': {
'DataloaderProfilingConfig': '{"StartStep": 5, "NumSteps": 3, "MetricsRegex": ".*"}',
'DetailedProfilingConfig': '{"StartStep": 5, "NumSteps": 3}',
'PythonProfilingConfig': '{"StartStep": 5, "NumSteps": 3, "ProfilerName": "cprofile"}'
}
}
}
try:
response = self.sagemaker.create_training_job(**training_job_config)
# Set up monitoring for the training job
self._setup_training_job_monitoring(job_name)
return response['TrainingJobArn']
except Exception as e:
print(f"Error creating secure training job: {e}")
raise
def create_secure_endpoint_config(self,
config_name: str,
model_name: str,
kms_key_id: str,
instance_type: str = 'ml.m5.large') -> str:
"""Create secure endpoint configuration with encryption"""
endpoint_config = {
'EndpointConfigName': config_name,
'ProductionVariants': [
{
'VariantName': 'primary',
'ModelName': model_name,
'InitialInstanceCount': 2, # Multiple instances for HA
'InstanceType': instance_type,
'InitialVariantWeight': 1.0,
'AcceleratorType': None # Disable GPU for cost optimization
}
],
'DataCaptureConfig': {
'EnableCapture': True,
'InitialSamplingPercentage': 20, # Capture 20% for security analysis
'DestinationS3Uri': f"s3://ml-security-data-capture/{config_name}/",
'KmsKeyId': kms_key_id,
'CaptureOptions': [
{'CaptureMode': 'Input'},
{'CaptureMode': 'Output'}
],
'CaptureContentTypeHeader': {
'CsvContentTypes': ['text/csv'],
'JsonContentTypes': ['application/json']
}
},
'Tags': [
{
'Key': 'SecurityMonitoring',
'Value': 'enabled'
},
{
'Key': 'DataCapture',
'Value': 'enabled'
}
],
'KmsKeyId': kms_key_id,
'AsyncInferenceConfig': {
'OutputConfig': {
'S3OutputPath': f"s3://ml-async-inference/{config_name}/output/",
'KmsKeyId': kms_key_id,
'NotificationConfig': {
'SuccessTopic': f"arn:aws:sns:us-east-1:123456789012:sagemaker-inference-success",
'ErrorTopic': f"arn:aws:sns:us-east-1:123456789012:sagemaker-inference-error"
}
},
'ClientConfig': {
'MaxConcurrentInvocationsPerInstance': 4
}
}
}
try:
response = self.sagemaker.create_endpoint_config(**endpoint_config)
return response['EndpointConfigArn']
except Exception as e:
print(f"Error creating secure endpoint config: {e}")
raise
def implement_model_monitoring(self,
endpoint_name: str,
monitoring_schedule_name: str,
baseline_s3_uri: str,
kms_key_id: str) -> str:
"""Implement model monitoring for drift detection and security"""
monitoring_config = {
'MonitoringScheduleName': monitoring_schedule_name,
'MonitoringScheduleConfig': {
'ScheduleConfig': {
'ScheduleExpression': 'cron(0 */6 * * ? *)' # Every 6 hours
},
'MonitoringJobDefinition': {
'BaselineConfig': {
'ConstraintsResource': {
'S3Uri': f"{baseline_s3_uri}/constraints.json"
},
'StatisticsResource': {
'S3Uri': f"{baseline_s3_uri}/statistics.json"
}
},
'MonitoringInputs': [
{
'EndpointInput': {
'EndpointName': endpoint_name,
'LocalPath': '/opt/ml/processing/input_data',
'S3InputMode': 'File',
'S3DataDistributionType': 'FullyReplicated'
}
}
],
'MonitoringOutputConfig': {
'MonitoringOutputs': [
{
'S3Output': {
'S3Uri': f"s3://ml-monitoring-output/{monitoring_schedule_name}/",
'LocalPath': '/opt/ml/processing/output',
'S3UploadMode': 'EndOfJob'
}
}
],
'KmsKeyId': kms_key_id
},
'MonitoringResources': {
'ClusterConfig': {
'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 1,
'VolumeSizeInGB': 20,
'VolumeKmsKeyId': kms_key_id
}
},
'MonitoringAppSpecification': {
'ImageUri': '156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer:latest',
'RecordPreprocessorSourceUri': f"s3://ml-preprocessing/{monitoring_schedule_name}/preprocessor.py",
'PostAnalyticsProcessorSourceUri': f"s3://ml-preprocessing/{monitoring_schedule_name}/postprocessor.py"
},
'StoppingCondition': {
'MaxRuntimeInSeconds': 3600
},
'Environment': {
'dataset_format': 'json',
'dataset_source': '/opt/ml/processing/input_data',
'output_path': '/opt/ml/processing/output',
'publish_cloudwatch_metrics': 'Enabled'
},
'NetworkConfig': {
'EnableInterContainerTrafficEncryption': True,
'EnableNetworkIsolation': True
},
'RoleArn': 'arn:aws:iam::123456789012:role/sagemaker-monitoring-role'
}
},
'Tags': [
{
'Key': 'MonitoringType',
'Value': 'DataQualityMonitoring'
},
{
'Key': 'SecurityLevel',
'Value': 'high'
}
]
}
try:
response = self.sagemaker.create_monitoring_schedule(**monitoring_config)
return response['MonitoringScheduleArn']
except Exception as e:
print(f"Error creating monitoring schedule: {e}")
raise
AI-Powered Threat Detection with AWS GuardDuty
Machine Learning-Based Security Analytics
AWS GuardDuty uses machine learning to detect threats across AWS environments. For AI/ML workloads, we can enhance GuardDuty with custom detection rules and integrate ML-powered analytics.
Custom GuardDuty Detection Rules for AI/ML Workloads
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import boto3
import json
from typing import List, Dict, Optional
class GuardDutyMLSecurityManager:
def __init__(self, region_name: str = 'us-east-1'):
self.guardduty = boto3.client('guardduty', region_name=region_name)
self.events = boto3.client('events', region_name=region_name)
self.lambda_client = boto3.client('lambda', region_name=region_name)
def create_ml_threat_detector(self, detector_name: str) -> str:
"""Create GuardDuty detector optimized for ML workload monitoring"""
# Enable GuardDuty detector
detector_response = self.guardduty.create_detector(
Enable=True,
FindingPublishingFrequency='FIFTEEN_MINUTES',
DataSources={
'S3Logs': {
'Enable': True
},
'KubernetesLogs': {
'AuditLogs': {
'Enable': True
}
},
'MalwareProtection': {
'ScanEc2InstanceWithFindings': {
'EbsVolumes': True
}
}
},
Tags={
'Purpose': 'ML-Security-Monitoring',
'Environment': 'production'
}
)
detector_id = detector_response['DetectorId']
# Create threat intel set for ML-specific indicators
self._create_ml_threat_intel_set(detector_id)
# Set up custom findings for ML workloads
self._setup_ml_custom_findings(detector_id)
return detector_id
def _create_ml_threat_intel_set(self, detector_id: str):
"""Create threat intelligence set for ML-specific threats"""
ml_threat_indicators = [
# Known malicious model repositories
"suspicious-ml-repo.example.com",
"malware-models.badsite.org",
# IP addresses associated with model theft
"192.0.2.100",
"198.51.100.200",
# Domains used in model poisoning attacks
"model-poison.attack.com",
"data-exfil.suspicious.net"
]
# Upload threat intel to S3
threat_intel_content = "\n".join(ml_threat_indicators)
s3_key = f"ml-threat-intel-{datetime.utcnow().strftime('%Y%m%d')}.txt"
# Create threat intel set
threat_intel_response = self.guardduty.create_threat_intel_set(
DetectorId=detector_id,
Name='ML-Workload-Threat-Intel',
Format='TXT',
Location=f's3://security-threat-intel-bucket/{s3_key}',
Activate=True,
Tags={
'Type': 'ML-Security',
'UpdateFrequency': 'daily'
}
)
return threat_intel_response['ThreatIntelSetId']
def create_ml_finding_filter(self, detector_id: str) -> str:
"""Create finding filter for ML-specific security events"""
filter_criteria = {
'Criterion': {
'service.serviceName': {
'Eq': ['sagemaker', 'bedrock']
},
'severity': {
'Gte': 4.0 # Medium severity and above
},
'type': {
'Eq': [
'UnauthorizedAPICall',
'Trojan:EC2/DataExfiltration',
'Backdoor:EC2/SuspiciousInternetTraffic',
'Recon:EC2/PortProbeUnprotectedPort'
]
},
'resource.instanceDetails.tags.value': {
'Eq': ['ml-workload', 'ai-training', 'model-inference']
}
}
}
filter_response = self.guardduty.create_filter(
DetectorId=detector_id,
Name='ML-Security-Filter',
Description='Filter for ML workload security findings',
Action='ARCHIVE', # Archive low-priority findings
Rank=1,
FindingCriteria=filter_criteria,
Tags={
'Purpose': 'ML-Security-Filtering'
}
)
return filter_response['Name']
def setup_automated_ml_response(self, detector_id: str,
lambda_function_arn: str) -> str:
"""Set up automated response for ML security findings"""
# Create EventBridge rule for GuardDuty findings
rule_response = self.events.put_rule(
Name='ML-Security-GuardDuty-Response',
Description='Automated response for ML workload security findings',
EventPattern=json.dumps({
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"service": {
"serviceName": ["sagemaker", "bedrock"]
},
"severity": [{"numeric": [">=", 4.0]}]
}
}),
State='ENABLED',
Tags=[
{
'Key': 'Purpose',
'Value': 'ML-Security-Automation'
}
]
)
# Add Lambda target to the rule
self.events.put_targets(
Rule='ML-Security-GuardDuty-Response',
Targets=[
{
'Id': '1',
'Arn': lambda_function_arn,
'InputTransformer': {
'InputPathsMap': {
'finding-id': '$.detail.id',
'finding-type': '$.detail.type',
'severity': '$.detail.severity',
'resource': '$.detail.resource'
},
'InputTemplate': json.dumps({
"findingId": "<finding-id>",
"findingType": "<finding-type>",
"severity": "<severity>",
"resource": "<resource>",
"action": "investigate"
})
}
}
]
)
return rule_response['RuleArn']
# Lambda function for automated ML security response
def lambda_handler(event, context):
"""
Automated response function for ML security findings
"""
import boto3
import json
# Parse the GuardDuty finding
finding_id = event.get('findingId')
finding_type = event.get('findingType')
severity = float(event.get('severity', 0))
resource_info = json.loads(event.get('resource', '{}'))
# Initialize AWS clients
sagemaker = boto3.client('sagemaker')
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
response_actions = []
try:
# High severity findings require immediate action
if severity >= 7.0:
response_actions.extend(handle_critical_ml_finding(
finding_type, resource_info, sagemaker, ec2
))
# Medium severity findings require investigation
elif severity >= 4.0:
response_actions.extend(handle_medium_ml_finding(
finding_type, resource_info, sagemaker
))
# Send notification
notification_message = {
"findingId": finding_id,
"findingType": finding_type,
"severity": severity,
"actionsToken": response_actions,
"timestamp": datetime.utcnow().isoformat()
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:ml-security-alerts',
Message=json.dumps(notification_message),
Subject=f"ML Security Alert: {finding_type}"
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'ML security response executed successfully',
'actions': response_actions
})
}
except Exception as e:
print(f"Error handling ML security finding: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_critical_ml_finding(finding_type: str, resource_info: dict,
sagemaker, ec2) -> List[str]:
"""Handle critical ML security findings"""
actions = []
if finding_type == 'Trojan:EC2/DataExfiltration':
# Stop SageMaker training jobs on affected instances
if 'instanceId' in resource_info:
# Find and stop SageMaker training jobs
training_jobs = sagemaker.list_training_jobs(
StatusEquals='InProgress',
MaxResults=100
)
for job in training_jobs['TrainingJobSummaries']:
job_details = sagemaker.describe_training_job(
TrainingJobName=job['TrainingJobName']
)
# Check if job is running on the compromised instance
if (job_details.get('ResourceConfig', {}).get('InstanceType') and
'ml.' in job_details['ResourceConfig']['InstanceType']):
sagemaker.stop_training_job(
TrainingJobName=job['TrainingJobName']
)
actions.append(f"Stopped training job: {job['TrainingJobName']}")
# Isolate the affected instance
if 'instanceId' in resource_info:
instance_id = resource_info['instanceId']
# Create isolation security group
isolation_sg = ec2.create_security_group(
GroupName=f'isolation-{instance_id}-{int(time.time())}',
Description='Isolation security group for compromised instance'
)
# Modify instance security groups
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg['GroupId']]
)
actions.append(f"Isolated instance: {instance_id}")
elif finding_type == 'UnauthorizedAPICall':
# Revoke suspicious API access
actions.append("Initiated API access review and potential key rotation")
return actions
def handle_medium_ml_finding(finding_type: str, resource_info: dict,
sagemaker) -> List[str]:
"""Handle medium severity ML security findings"""
actions = []
if finding_type == 'Recon:EC2/PortProbeUnprotectedPort':
# Increase monitoring on ML endpoints
endpoints = sagemaker.list_endpoints()
for endpoint in endpoints['Endpoints']:
if endpoint['EndpointStatus'] == 'InService':
# Enable enhanced monitoring
sagemaker.put_model_package_group_policy(
ModelPackageGroupName=endpoint['EndpointName'],
ResourcePolicy=json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": "sagemaker:InvokeEndpoint",
"Resource": "*",
"Condition": {
"IpAddress": {
"aws:SourceIp": resource_info.get('remoteIpDetails', {}).get('ipAddressV4', '')
}
}
}]
})
)
actions.append(f"Enhanced monitoring for endpoint: {endpoint['EndpointName']}")
return actions
Implementation Roadmap for Enterprise AI/ML Security
Phase 1: Foundation Security (Weeks 1-4)
Week 1-2: Assessment and Planning
- Conduct AI/ML workload inventory across AWS accounts
- Identify sensitive data flows and model assets
- Assess current security controls and gaps
- Define security requirements and compliance needs
- Create security architecture documentation
Week 3-4: Basic Security Controls
- Implement IAM policies for Bedrock and SageMaker access
- Configure VPC endpoints for service isolation
- Enable CloudTrail logging for AI/ML services
- Set up KMS encryption for training data and models
- Deploy basic monitoring and alerting
Phase 2: Advanced Protection (Weeks 5-8)
Week 5-6: Network and Access Security
- Deploy VPC isolation for training environments
- Implement network security groups and NACLs
- Configure private endpoints for all AI/ML services
- Set up cross-account access controls
- Deploy security baselines and compliance rules
Week 7-8: Data and Model Security
- Implement data encryption at rest and in transit
- Configure secure model artifact storage
- Deploy data loss prevention controls
- Set up model versioning and integrity checks
- Implement secure model deployment pipelines
Phase 3: Threat Detection and Response (Weeks 9-12)
Week 9-10: Monitoring and Detection
- Deploy GuardDuty with ML-specific threat detection
- Configure CloudWatch metrics and alarms
- Implement behavioral analytics for anomaly detection
- Set up security information and event management (SIEM)
- Deploy automated threat response capabilities
Week 11-12: Incident Response and Recovery
- Develop AI/ML-specific incident response procedures
- Implement automated containment and isolation
- Set up forensic data collection and analysis
- Create disaster recovery and business continuity plans
- Conduct security tabletop exercises
Phase 4: Governance and Compliance (Weeks 13-16)
Week 13-14: Compliance Framework
- Implement compliance monitoring and reporting
- Deploy policy-as-code for security governance
- Set up audit logging and evidence collection
- Configure compliance assessment automation
- Establish security metrics and KPIs
Week 15-16: Optimization and Maturity
- Conduct security maturity assessment
- Optimize performance and cost efficiency
- Implement advanced threat hunting capabilities
- Deploy AI-powered security analytics
- Establish continuous improvement processes
Monitoring and Alerting for AI/ML Security
CloudWatch Metrics and Alarms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import boto3
import json
class MLSecurityMonitoring:
def __init__(self, region_name: str = 'us-east-1'):
self.cloudwatch = boto3.client('cloudwatch', region_name=region_name)
self.sns = boto3.client('sns', region_name=region_name)
def create_ml_security_dashboard(self, dashboard_name: str) -> str:
"""Create comprehensive security monitoring dashboard"""
dashboard_body = {
"widgets": [
{
"type": "metric",
"x": 0, "y": 0, "width": 12, "height": 6,
"properties": {
"metrics": [
["AWS/SageMaker", "TrainingJobsStarted"],
["AWS/SageMaker", "TrainingJobsFailed"],
["AWS/SageMaker", "TrainingJobsStopped"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "SageMaker Training Job Security Events",
"annotations": {
"horizontal": [
{
"label": "Security Threshold",
"value": 10
}
]
}
}
},
{
"type": "metric",
"x": 0, "y": 6, "width": 12, "height": 6,
"properties": {
"metrics": [
["AWS/Bedrock", "Invocations"],
["AWS/Bedrock", "InvocationClientErrors"],
["AWS/Bedrock", "InvocationServerErrors"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Bedrock API Security Metrics"
}
},
{
"type": "log",
"x": 0, "y": 12, "width": 24, "height": 6,
"properties": {
"query": "SOURCE '/aws/guardduty/findings'\n| fields @timestamp, type, severity, service.serviceName\n| filter service.serviceName in ['sagemaker', 'bedrock']\n| stats count() by type\n| sort @timestamp desc\n| limit 100",
"region": "us-east-1",
"title": "AI/ML Security Findings",
"view": "table"
}
}
]
}
try:
response = self.cloudwatch.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps(dashboard_body)
)
return dashboard_name
except Exception as e:
print(f"Error creating dashboard: {e}")
raise
def create_security_alarms(self, topic_arn: str) -> List[str]:
"""Create comprehensive security alarms for AI/ML workloads"""
alarms = []
# High error rate alarm for Bedrock
bedrock_error_alarm = self.cloudwatch.put_metric_alarm(
AlarmName='ML-Security-Bedrock-High-Error-Rate',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='InvocationClientErrors',
Namespace='AWS/Bedrock',
Period=300,
Statistic='Sum',
Threshold=50.0,
ActionsEnabled=True,
AlarmActions=[topic_arn],
AlarmDescription='High error rate detected in Bedrock API calls',
Dimensions=[
{
'Name': 'ModelId',
'Value': 'anthropic.claude-3-sonnet-20240229-v1:0'
}
],
Unit='Count',
TreatMissingData='breaching'
)
alarms.append('ML-Security-Bedrock-High-Error-Rate')
# Suspicious SageMaker training job pattern
sagemaker_anomaly_alarm = self.cloudwatch.put_anomaly_detector(
Namespace='AWS/SageMaker',
MetricName='TrainingJobsStarted',
Dimensions=[],
Stat='Average'
)
self.cloudwatch.put_metric_alarm(
AlarmName='ML-Security-SageMaker-Anomalous-Training',
ComparisonOperator='LessThanLowerOrGreaterThanUpperThreshold',
EvaluationPeriods=2,
Metrics=[
{
'Id': 'm1',
'ReturnData': True,
'MetricStat': {
'Metric': {
'Namespace': 'AWS/SageMaker',
'MetricName': 'TrainingJobsStarted'
},
'Period': 300,
'Stat': 'Average'
}
},
{
'Id': 'ad1',
'Expression': 'ANOMALY_DETECTION_FUNCTION(m1, 2)'
}
],
ThresholdMetricId='ad1',
ActionsEnabled=True,
AlarmActions=[topic_arn],
AlarmDescription='Anomalous pattern detected in SageMaker training jobs'
)
alarms.append('ML-Security-SageMaker-Anomalous-Training')
# GuardDuty findings alarm for ML services
guardduty_ml_alarm = self.cloudwatch.put_metric_alarm(
AlarmName='ML-Security-GuardDuty-ML-Findings',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='FindingCount',
Namespace='GuardDutyCustom',
Period=300,
Statistic='Sum',
Threshold=0.0,
ActionsEnabled=True,
AlarmActions=[topic_arn],
AlarmDescription='GuardDuty findings detected for ML services',
TreatMissingData='notBreaching'
)
alarms.append('ML-Security-GuardDuty-ML-Findings')
return alarms
Cost Optimization for AI/ML Security
Balancing Security and Cost Efficiency
Implementing comprehensive AI/ML security controls can impact costs significantly. This section provides strategies for optimizing security investments while maintaining robust protection.
Cost-Optimized Security Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/bin/bash
# Cost optimization script for AI/ML security infrastructure
# Function to optimize SageMaker training costs
optimize_sagemaker_costs() {
echo "Optimizing SageMaker training costs..."
# Use Spot instances for non-critical training
aws sagemaker create-training-job \
--training-job-name "cost-optimized-training-$(date +%s)" \
--algorithm-specification TrainingImage=your-training-image \
--role-arn arn:aws:iam::account:role/SageMakerRole \
--input-data-config ChannelName=training,DataSource='{S3DataSource={S3DataType=S3Prefix,S3Uri=s3://training-data/,S3DataDistributionType=FullyReplicated}}' \
--output-data-config S3OutputPath=s3://model-artifacts/ \
--resource-config InstanceType=ml.m5.large,InstanceCount=1,VolumeSizeInGB=30 \
--stopping-condition MaxRuntimeInSeconds=3600 \
--enable-managed-spot-training \
--checkpoint-config S3Uri=s3://model-checkpoints/
# Schedule training jobs during off-peak hours
aws events put-rule \
--name "ml-training-schedule" \
--schedule-expression "cron(0 2 * * ? *)" \
--description "Schedule ML training during off-peak hours"
}
# Function to optimize GuardDuty costs
optimize_guardduty_costs() {
echo "Optimizing GuardDuty costs..."
# Configure sampling for S3 data events
aws guardduty update-detector \
--detector-id $(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) \
--data-sources S3Logs='{Enable=true}' \
--finding-publishing-frequency SIX_HOURS
# Use intelligent filtering to reduce noise
aws guardduty create-filter \
--detector-id $(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) \
--name "cost-optimization-filter" \
--action ARCHIVE \
--finding-criteria 'Criterion={severity={Lt=4.0},type={Neq=["TrojanData","UnauthorizedAPICall"]}}'
}
# Function to optimize CloudWatch costs
optimize_cloudwatch_costs() {
echo "Optimizing CloudWatch costs..."
# Set log retention periods
for log_group in $(aws logs describe-log-groups --query 'logGroups[?starts_with(logGroupName, `/aws/sagemaker`) || starts_with(logGroupName, `/aws/bedrock`)].logGroupName' --output text); do
aws logs put-retention-policy \
--log-group-name "$log_group" \
--retention-in-days 90
done
# Use log insights for cost-effective analysis
aws logs start-query \
--log-group-name "/aws/sagemaker/TrainingJobs" \
--start-time $(date -d '7 days ago' +%s) \
--end-time $(date +%s) \
--query-string 'fields @timestamp, @message | filter @message like /ERROR/ | stats count() by bin(5m)'
}
# Main optimization execution
main() {
echo "Starting AI/ML security cost optimization..."
optimize_sagemaker_costs
optimize_guardduty_costs
optimize_cloudwatch_costs
echo "Cost optimization completed!"
# Generate cost report
aws ce get-cost-and-usage \
--time-period Start=2025-01-01,End=2025-01-31 \
--granularity MONTHLY \
--metrics BlendedCost \
--group-by Type=DIMENSION,Key=SERVICE \
--filter 'Dimensions={Key=SERVICE,Values=[Amazon SageMaker,Amazon Bedrock,Amazon GuardDuty,Amazon CloudWatch]}'
}
main
Related Articles and Additional Resources
AWS Security Documentation
- AWS AI/ML Security Best Practices
- Amazon SageMaker Security Documentation
- Amazon Bedrock Security Guide
- AWS GuardDuty Machine Learning Protection
Industry Standards and Frameworks
- NIST AI Risk Management Framework
- ISO/IEC 27001:2022 AI Security Controls
- OWASP Machine Learning Security Top 10
Community Resources
Professional Services
For comprehensive AI/ML security implementation support, including architecture design, compliance assessment, and incident response planning, connect with security consulting professionals who specialize in AWS AI/ML workloads.
Jon Price - AWS Security Architect and DevSecOps Consultant
- LinkedIn Profile
- Specializing in enterprise AI/ML security, compliance automation, and threat detection
This comprehensive guide provides the foundation for securing AI/ML workloads on AWS in 2025. Regular updates ensure compatibility with evolving AWS services and emerging security threats in the AI/ML landscape.