-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
96 lines (68 loc) · 3.11 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from pycti import OpenCTIApiClient
import stix2
import stix2.v20.common
import json
import requests
import logging
import math
import os
log = logging.getLogger(__name__)
logging.basicConfig(
format='%(asctime)s %(levelname)-4s %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.DEBUG
)
def main():
octi_url = os.getenv("OPENCTI_URL")
octi_token = os.getenv("OPENCTI_TOKEN")
try:
api = OpenCTIApiClient(octi_url, octi_token, log_level="debug")
except Exception as e:
log.error(str(e))
batch_size = 100
has_next = True
after = ""
header = {
"apiKey": os.getenv("NIST_API_KEY")
}
while has_next:
resp = api.vulnerability.list(getAll=False, first=batch_size, after=after, withPagination=True)
vulns = resp["entities"]
log.debug("fetched {} vulnerabilities".format(len(vulns)))
if resp["pagination"]["hasNextPage"]:
after = resp["pagination"]["endCursor"]
else:
has_next = False
for vuln in vulns:
log.info("processing vulnerability '{}'".format(vuln["name"]))
log.debug("fetching vulnerability data from NIST api")
resp = requests.get("https://services.nvd.nist.gov/rest/json/cves/2.0?cveId="+vuln["name"], headers=header)
if resp.status_code == 200:
data = resp.json()
for nist_vuln in data["vulnerabilities"]:
if not nist_vuln["cve"].get("configurations"):
continue
for cfg in nist_vuln["cve"]["configurations"]:
for node in cfg["nodes"]:
for cpe in node["cpeMatch"]:
if cpe["vulnerable"]:
software_result = api.stix_cyber_observable.list(
filters={"mode": "and", "filterGroups": [], "filters": [{"key": "cpe", "values": cpe["criteria"]}]}
)
if len(software_result) == 0:
log.error("no observable found for '{}'".format(cpe["criteria"]))
continue
if len(software_result) > 1:
log.error("more then 1 software observable found")
continue
result = api.stix_core_relationship.create(
fromId=software_result[0]["id"],
toId=vuln["id"],
relationship_type="has"
)
if result.get("id") != None:
log.info("created relationship '{}' from {} to {}".format(result["id"], vuln["name"], cpe["criteria"]))
else:
log.error("status code: {}".format(resp.status_code))
if __name__ == "__main__":
main()