-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcorrelations.py
More file actions
332 lines (310 loc) · 13.4 KB
/
correlations.py
File metadata and controls
332 lines (310 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""
Cross-pillar correlation engine for SecurityAuditScripts.
Detects compound attack paths that emerge when multiple finding types
co-occur across security pillars. Each rule maps to a real MITRE ATT&CK
tactic and fires based on one of three match strategies:
all — every listed finding_type must be present
any_two — any 2 or more of the listed finding_types must be present
any_one — any single listed finding_type triggers the rule
all_groups — every group must contribute at least one match (AND logic)
Usage:
from correlations import run_correlations
triggered = run_correlations(all_findings_flat)
"""
from __future__ import annotations
from typing import List, Dict, Any
CORRELATION_RULES: List[Dict[str, Any]] = [
{
"id": "CP-01",
"name": "Undetected Credential Theft Path",
"finding_types": ["UserNoMfa", "AuditdNotRunning", "MailboxAuditLoggingDisabled",
"NoDiagnosticLogging", "AdminAuditLoggingDisabled"],
"match": "any_two",
"severity": "CRITICAL",
"mitre_tactic": "Credential Access + Defense Evasion",
"mitre_technique_id": "T1078 + T1562.008",
"narrative": (
"Accounts without MFA combined with disabled audit logging means credential "
"theft goes completely undetected. An attacker can compromise accounts and "
"operate for weeks without triggering any alerts."
),
},
{
"id": "CP-02",
"name": "Lateral Movement via Exposed Remote Services",
"finding_types": ["RDPOpenToAll", "SMBOpenToAll", "WinRMOpenToAll", "DangerousPort",
"DangerousPortOpenToAll"],
"match": "any_two",
"severity": "CRITICAL",
"mitre_tactic": "Lateral Movement",
"mitre_technique_id": "T1021.002",
"narrative": (
"Multiple remote access services are exposed to the internet. An attacker who "
"gains any foothold can pivot laterally across the environment using these open "
"channels, dramatically expanding their blast radius."
),
},
{
"id": "CP-03",
"name": "Privileged Account Compromise Blast Radius",
"finding_types": ["GlobalAdminNoMfa", "TooManyGlobalAdmins"],
"match": "all",
"severity": "CRITICAL",
"mitre_tactic": "Privilege Escalation",
"mitre_technique_id": "T1078.004",
"narrative": (
"Multiple Global Administrator accounts without MFA means a single phished "
"credential provides complete tenant control. The blast radius of one compromised "
"account is the entire Microsoft 365 and Azure environment."
),
},
{
"id": "CP-04",
"name": "Stale Identity Attack Path",
"finding_types": ["StaleUser", "UserNoMfa"],
"match": "all",
"severity": "HIGH",
"mitre_tactic": "Initial Access",
"mitre_technique_id": "T1078",
"narrative": (
"Forgotten accounts with no MFA are prime targets for credential stuffing and "
"password spray attacks. These identities are often unmonitored and their "
"compromise may go unnoticed for extended periods."
),
},
{
"id": "CP-05",
"name": "Cloud Data Exfiltration Path",
"finding_types": ["PublicBlobAccess", "ServicePrincipalBroadScope"],
"match": "all",
"severity": "CRITICAL",
"mitre_tactic": "Exfiltration",
"mitre_technique_id": "T1530",
"narrative": (
"Publicly accessible storage combined with overpermissioned service principals "
"enables mass data exfiltration. An attacker exploiting the service principal "
"can access and export all data in the publicly reachable storage accounts."
),
},
{
"id": "CP-06",
"name": "Kerberos Offline Password Cracking Path",
"finding_types": ["KerberoastableAccount", "WeakDomainPasswordPolicy"],
"match": "all",
"severity": "CRITICAL",
"mitre_tactic": "Credential Access",
"mitre_technique_id": "T1558.003",
"narrative": (
"Kerberoastable service accounts with a weak domain password policy means "
"offline password cracking is viable and likely to succeed. An attacker with "
"any domain user account can harvest service tickets and crack them offline."
),
},
{
"id": "CP-07",
"name": "Persistent External Identity Backdoor",
"finding_types": ["PrivilegedGuest", "SecurityDefaultsDisabled"],
"match": "all",
"severity": "HIGH",
"mitre_tactic": "Persistence",
"mitre_technique_id": "T1078.004",
"narrative": (
"External guest identities with elevated privileges and no security baseline "
"controls create a persistent backdoor. These accounts are harder to monitor "
"and may survive off-boarding processes for internal staff."
),
},
{
"id": "CP-08",
"name": "Silent Email Exfiltration",
"finding_types": ["ExternalMailboxForwarding", "MailboxAuditLoggingDisabled",
"AdminAuditLoggingDisabled", "ExternalInboxForwardRule",
"RemoteDomainAutoForwardEnabled"],
"match": "any_two",
"severity": "CRITICAL",
"mitre_tactic": "Exfiltration",
"mitre_technique_id": "T1114.003",
"narrative": (
"Email is being automatically forwarded to external domains with no audit trail "
"in place. This is a classic data exfiltration technique — all email is silently "
"copied to an attacker-controlled inbox with zero detection capability."
),
},
{
"id": "CP-09",
"name": "Ransomware Impact Path",
"finding_types": ["SoftDeleteDisabled", "PurgeProtectionDisabled", "ImmutabilityDisabled",
"DefenderNotEnabled", "TamperProtectionDisabled", "RtpDisabled",
"VersioningDisabled", "RecycleBinDisabled"],
"match": "any_two",
"severity": "CRITICAL",
"mitre_tactic": "Impact",
"mitre_technique_id": "T1485",
"narrative": (
"Disabled defenses combined with no data recovery safeguards creates an ideal "
"ransomware environment. An attacker can encrypt or destroy data with no detection "
"and the organisation has no recovery path without paying the ransom."
),
},
{
"id": "CP-10",
"name": "Active Directory Full Takeover Path",
"finding_types": ["ExcessiveDomainAdmins", "WeakDomainPasswordPolicy"],
"match": "all",
"severity": "CRITICAL",
"mitre_tactic": "Privilege Escalation",
"mitre_technique_id": "T1078.002",
"narrative": (
"Too many Domain Administrators combined with a weak password policy means one "
"compromised domain admin account results in complete Active Directory takeover. "
"Domain admin credentials are trivial to crack offline with a weak policy."
),
},
{
"id": "CP-11",
"name": "Internet Exposure Without Monitoring",
"finding_types": [],
"match": "all_groups",
"groups": [
["RDPOpenToAll", "SMBOpenToAll", "WinRMOpenToAll", "NoFirewallActive",
"DefaultPolicyAccept", "InboundDefaultAllow", "DangerousPort"],
["AuditdNotRunning", "NoDiagnosticSetting", "NoDiagnosticLogging",
"AdminAuditLoggingDisabled"],
],
"severity": "HIGH",
"mitre_tactic": "Initial Access + Defense Evasion",
"mitre_technique_id": "T1133 + T1562.008",
"narrative": (
"Internet-exposed services with no monitoring or logging capability means "
"compromise attempts and successful breaches go completely undetected. There is "
"no way to know if systems have already been accessed by an unauthorised party."
),
},
{
"id": "CP-12",
"name": "Stale App Credential with Broad Access",
"finding_types": ["StaleAppCredential", "ServicePrincipalBroadScope"],
"match": "all",
"severity": "HIGH",
"mitre_tactic": "Credential Access",
"mitre_technique_id": "T1078.004",
"narrative": (
"Expired credentials on overpermissioned application registrations represent a "
"dormant but high-impact risk. If those credentials were ever stolen or reused, "
"the attacker inherits Directory-level access to your entire tenant."
),
},
{
"id": "CP-13",
"name": "NTLMv1 Credential Relay Attack Path",
"finding_types": ["NtlmV1Enabled", "SMBOpenToAll"],
"match": "all",
"severity": "HIGH",
"mitre_tactic": "Credential Access",
"mitre_technique_id": "T1557",
"narrative": (
"NTLMv1 authentication with SMB exposed to the internet enables pass-the-hash "
"and NTLM relay attacks. An attacker on the network can capture and relay "
"credentials to authenticate as any user without knowing their password."
),
},
{
"id": "CP-14",
"name": "Comprehensive Audit Blindspot",
"finding_types": ["AuditdNotRunning", "MailboxAuditLoggingDisabled",
"AdminAuditLoggingDisabled", "NoDiagnosticLogging",
"NoDiagnosticSetting", "SyslogNotConfigured", "NoLogDestination"],
"match": "any_two",
"severity": "HIGH",
"mitre_tactic": "Defense Evasion",
"mitre_technique_id": "T1562.008",
"narrative": (
"Multiple audit and logging systems are disabled or unconfigured. An attacker "
"operating in this environment has complete freedom of movement with no forensic "
"trail. Incident response and breach investigation will be severely hampered."
),
},
{
"id": "CP-15",
"name": "Credential Stuffing Against Weak Identity Controls",
"finding_types": ["UserPasswordNeverExpires", "UserNoMfa", "WeakPasswordPolicy",
"WeakDomainPasswordPolicy", "UsersMissingMfaRegistration",
"NoMfaCaPolicy"],
"match": "any_two",
"severity": "HIGH",
"mitre_tactic": "Credential Access",
"mitre_technique_id": "T1110",
"narrative": (
"A combination of weak password policies, non-expiring passwords, and missing "
"MFA controls creates ideal conditions for credential stuffing and brute force "
"attacks. Breached credential lists from other services are likely to succeed here."
),
},
]
def _get_present_types(findings: List[Dict[str, Any]]) -> set:
"""Return set of all finding_type values present in the findings list."""
types = set()
for f in findings:
ft = f.get("finding_type") or f.get("FindingType", "")
if ft:
types.add(ft)
return types
def _matches(rule: Dict[str, Any], present_types: set) -> tuple[bool, list]:
"""Return (fired, contributing_types) for a single rule."""
match = rule.get("match", "all")
if match == "all":
relevant = rule["finding_types"]
matched = [ft for ft in relevant if ft in present_types]
if set(relevant) <= present_types:
return True, matched
return False, []
elif match == "any_two":
relevant = rule["finding_types"]
matched = [ft for ft in relevant if ft in present_types]
if len(matched) >= 2:
return True, matched
return False, []
elif match == "any_one":
relevant = rule["finding_types"]
matched = [ft for ft in relevant if ft in present_types]
if matched:
return True, matched
return False, []
elif match == "all_groups":
groups = rule.get("groups", [])
all_matched = []
for group in groups:
group_matches = [ft for ft in group if ft in present_types]
if not group_matches:
return False, []
all_matched.extend(group_matches)
return True, all_matched
raise ValueError(f"Unknown match type '{match}' in rule '{rule.get('id')}'") # pragma: no cover
def run_correlations(findings: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Evaluate all correlation rules against a flat findings list.
Args:
findings: Flat list of finding dicts from any auditor (already enriched
by validate_finding if available).
Returns:
List of triggered correlation finding dicts, each with keys:
id, name, severity, mitre_tactic, mitre_technique_id,
narrative, contributing_types.
Empty list when no rules fire or findings is empty.
"""
if not findings:
return []
present_types = _get_present_types(findings)
triggered = []
for rule in CORRELATION_RULES:
fired, contributing = _matches(rule, present_types)
if fired:
triggered.append({
"id": rule["id"],
"name": rule["name"],
"severity": rule["severity"],
"mitre_tactic": rule["mitre_tactic"],
"mitre_technique_id": rule["mitre_technique_id"],
"narrative": rule["narrative"],
"contributing_types": contributing,
})
return triggered