-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathai_analyzer.py
More file actions
117 lines (100 loc) · 5.88 KB
/
Copy pathai_analyzer.py
File metadata and controls
117 lines (100 loc) · 5.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import json
from groq import Groq
from dotenv import load_dotenv
load_dotenv()
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
ATLAS_CORPUS = """
MITRE ATLAS (Adversarial Threat Landscape for Artificial-Intelligence Systems) — Full Technique Reference:
RECONNAISSANCE:
- AML.T0000 - ML Attack Staging: Preparing infrastructure and tools for AI-targeted attacks
- AML.T0047 - Search Victim's Publicly Available Research: Mining papers/repos for model architecture and training details
- AML.T0048 - Acquire Public ML Artifacts: Downloading pretrained models, datasets, embeddings for attack staging
RESOURCE DEVELOPMENT:
- AML.T0016 - Obtain Capabilities: Acquiring adversarial ML tools, attack frameworks, exploit scripts
- AML.T0017 - Develop Capabilities: Building custom adversarial tools, poisoning pipelines, proxy models
- AML.T0019 - Publish Poisoned Datasets: Releasing poisoned data through Hugging Face, GitHub, Kaggle to compromise downstream models
INITIAL ACCESS:
- AML.T0010 - ML Supply Chain Compromise: Attacking upstream ML dependencies, pretrained model repos, or training pipelines
- AML.T0046 - Spearphishing for Information: Targeting ML practitioners to extract model details, API keys, training configs
- AML.T0044 - Full ML Model Access: Gaining white-box access to weights and architecture through insider threat or breach
EXECUTION / MODEL ATTACKS:
- AML.T0051 - LLM Prompt Injection: Crafting inputs to hijack LLM behavior, override system prompts, exfiltrate data, execute unintended actions
- AML.T0054 - LLM Jailbreak: Bypassing LLM safety guardrails, content filters, RLHF alignment using roleplay, encoding, or adversarial prompts
- AML.T0043 - Craft Adversarial Data: Creating inputs to fool ML models at inference — image perturbations, text adversarial examples
- AML.T0020 - Poison Training Data: Injecting malicious samples into training datasets to embed backdoors or degrade performance
- AML.T0018 - Backdoor ML Model: Inserting hidden functionality triggered by specific inputs (trojan triggers)
COLLECTION / EXFILTRATION:
- AML.T0040 - ML Model Inference API Access: Systematic querying of model APIs to extract intelligence, reconstruct training data
- AML.T0024 - Exfiltration via ML Inference API: Using model outputs to leak PII or sensitive training data (membership inference)
- AML.T0005 - Create Proxy ML Model: Building a surrogate model by querying the target to enable transfer attacks and IP theft
- AML.T0056 - LLM Data Leakage: Extracting memorized training data through carefully crafted prompts
"""
SYSTEM_PROMPT = f"""You are ATLAS-GPT, an elite AI security analyst embedded in DarkDecoder.
You specialize in detecting adversarial threats against AI/ML systems using the MITRE ATLAS framework.
{ATLAS_CORPUS}
Analyze the given input — it may be a prompt, user message, system prompt, dataset sample, or model query log.
Identify adversarial patterns, attack techniques, and risks.
Always respond with ONLY valid JSON — no markdown, no text outside JSON.
Required JSON structure:
{{
"input_type": "Prompt Injection | Jailbreak | Data Poisoning | Model Extraction | Membership Inference | Adversarial Input | Benign",
"threat_level": <integer 1-10>,
"threat_label": "CRITICAL | HIGH | MODERATE | LOW | BENIGN",
"attack_summary": "2 sentence summary of what this input is attempting to do",
"plain_english": "explain to a non-technical executive what this attack does and the business impact",
"atlas_techniques": [
{{"id": "AML.TXXXX", "name": "Technique Name", "confidence": "HIGH|MEDIUM|LOW", "description": "how this input uses this technique"}}
],
"attack_goal": "Data Exfiltration | Model Manipulation | Safety Bypass | Reconnaissance | IP Theft | Denial of Service | Unknown",
"target_system": "what type of AI system is being targeted",
"evasion_indicators": ["indicator 1", "indicator 2"],
"adversarial_patterns": ["specific pattern found 1", "specific pattern found 2"],
"business_impact": "High | Medium | Low",
"defenses": [
"specific defensive measure 1",
"specific defensive measure 2",
"specific defensive measure 3"
]
}}
"""
def analyze_ai_threat(content: str, input_type_hint: str = "auto") -> dict:
content = content[:15000]
hint = f"\nInput type context: {input_type_hint}" if input_type_hint != "auto" else ""
last_error = None
for attempt in range(3):
try:
response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Analyze this for AI/ML adversarial threats:{hint}\n\n```\n{content}\n```"}
],
temperature=0.1,
max_tokens=4096,
timeout=30,
)
raw = response.choices[0].message.content.strip()
if raw.startswith("```json"):
raw = raw[7:]
elif raw.startswith("```"):
raw = raw[3:]
if raw.endswith("```"):
raw = raw[:-3]
return json.loads(raw.strip())
except json.JSONDecodeError:
last_error = "Response parsing error"
continue
except Exception as e:
err = str(e)
if "rate_limit" in err.lower() or "429" in err:
raise RuntimeError("Rate limit reached. Please wait 30 seconds and try again.")
if "timeout" in err.lower():
raise RuntimeError("Request timed out. Please try again.")
if "api_key" in err.lower() or "authentication" in err.lower():
raise RuntimeError("Invalid API key. Check your GROQ_API_KEY in .env file.")
last_error = err
if attempt < 2:
continue
raise RuntimeError(f"Analysis failed: {last_error}")
raise RuntimeError(f"Analysis failed after retries: {last_error}")