-
Notifications
You must be signed in to change notification settings - Fork 2
/
rogue.py
364 lines (314 loc) · 14.8 KB
/
rogue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""Python example script showing proper use of the Cisco Sample Code header.
Copyright (c) 2021 Cisco and/or its affiliates.
This software is licensed to you under the terms of the Cisco Sample
Code License, Version 1.1 (the "License"). You may obtain a copy of the
License at
https://developer.cisco.com/docs/licenses
All use of the material herein must be in accordance with the terms of
the License. All rights not expressly granted by the License are
reserved. Unless required by applicable law or agreed to separately in
writing, software distributed under the License is distributed on an "AS
IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
"""
import os
from datetime import datetime
import click
import pandas as pd
import yaml
from netmiko import ConnectHandler
from rogue.common import channel_5G_24G, co_channel_5G, adj_channel_5G
from rogue.ios_xe import ios_rogue_data, ios_ap_data, ios_load_data, ios_clean_air_data, ios_summary_data, ios_version, \
ios_wlan_data
from rogue.wlc_ssh import wlc_ap_data, wlc_rogue_data, wlc_clean_air_data, wlc_summary_data, wlc_sysinfo, wlc_wlan_data
cmd_dict = {
"cisco_wlc_ssh": {
"rogue_summary": "show rogue ap summary",
"rogue": "show rogue ap detailed"
},
"cisco_ios": {
"rogue_summary": "show wireless wps rogue ap summary",
"rogue": "show wireless wps rogue ap detailed"
}
}
g_device_type = {
"aireos": "cisco_wlc_ssh",
"ios": "cisco_ios",
"cisco_wlc_ssh": "aireos",
"cisco_ios": "ios"
}
def rogue_channel_data_ios_5G(_load, _ap, _rogue):
_ap_ = {}
_load_ = {}
_rogue_ = []
for i in _load:
_load_[f'{i["ap_name"]}_{i["slot"]}'] = i
for i in _ap:
ap_key = f'{i["ap_name"]}_{i["slot"]}'
if ap_key in _load_.keys():
i.update(_load_[ap_key])
_ap_[ap_key] = i
for i in _rogue:
ap_key = f'{i["ap_name"]}_{i["slot"]}'
if ap_key in _ap_.keys():
i.update(_ap_[ap_key])
# Co/Adj. channel check
if co_channel_5G(_ap_[ap_key].get("channel"), i.get("rogue_channels")):
i["check_channel"] = "Co Channel"
elif adj_channel_5G(_ap_[ap_key].get("channel"), i.get("rogue_channels")):
i["check_channel"] = "Adjacent Channel"
else:
i["check_channel"] = ""
_rogue_.append(i)
return _rogue_
def rogue_channel_data_wlc_5G(_ap, _rogue):
_ap_ = {}
_rogue_ = []
for i in _ap:
_ap_[i["ap_name"]] = i
for i in _rogue:
if i.get("ap_name") in _ap_.keys():
i.update(_ap_[i["ap_name"]])
# Co/Adj. channel check
if co_channel_5G(i.get("channel"), i.get("rogue_channels")):
i["check_channel"] = "Co Channel"
elif adj_channel_5G(i.get("channel"), i.get("rogue_channels")):
i["check_channel"] = "Adjacent Channel"
else:
i["check_channel"] = ""
_rogue_.append(i)
return _rogue_
data_f_map = {
"cisco_ios": {
"show wireless wps rogue ap summary": ios_summary_data,
"show wireless wps rogue ap detailed": ios_rogue_data,
"show ap dot11 5ghz cleanair air-quality summary": ios_clean_air_data,
"show ap dot11 24ghz cleanair air-quality summary": ios_clean_air_data,
"show ap dot11 5ghz load-info": ios_load_data,
"show ap dot11 24ghz load-info": ios_load_data,
"show ap dot11 5ghz summary": ios_ap_data,
"show ap dot11 24ghz summary": ios_ap_data,
"show version": ios_version,
"5G": rogue_channel_data_ios_5G,
"show wlan summary": ios_wlan_data
},
"cisco_wlc_ssh": {
"show 802.11a cleanair air-quality summary": wlc_clean_air_data,
"show advanced 802.11a summary": wlc_ap_data,
"show rogue ap detailed": wlc_rogue_data,
"show rogue ap summary": wlc_summary_data,
"show sysinfo": wlc_sysinfo,
"5G": rogue_channel_data_wlc_5G,
"show wlan summary": wlc_wlan_data
}
}
@click.group(invoke_without_command=True)
@click.pass_context
def cli(ctx):
if not ctx.invoked_subcommand:
ctx.invoke(run)
@click.command()
@click.option("--client", prompt="请输入访问 WLC 无线控制器的名称 - client_name_location", help="Client_name_location of WLC.")
@click.option("--host", prompt="请输入访问 WLC 无线控制器的IP地址", help="host or IP of WLC.")
@click.option("--username", prompt="请输入访问 WLC 无线控制器的用户名", help="username of WLC.")
@click.password_option(prompt="请输入访问 WLC 无线控制器的密码", help="password of WLC.")
@click.option("--port", default=22, prompt="请输入访问 WLC 无线控制器的 SSH port", help="SSH port of WLC.")
# @click.option('--channel', default="5G", type=click.Choice(['5G', '2.4G', 'all']), prompt="请输入无线信道频段")
@click.option('--device_type', default="ios", type=click.Choice(['ios', 'aireos']), prompt="请输入访问 WLC 无线控制器的 OS", help="运行OS选择方法:C9800=ios、35/55/85 WLC=aireos")
@click.option("--rssi", default=-80, prompt="请输入rogue AP RSSI-dBm 最低值", help="Min RSSI of Rogue AP.")
def init(client, host, username, password, port, rssi, device_type="ios", channel="5G"):
""" 步骤一:交互式生成 config.yml 文件,第一次使用请先运行命令: rogue init。运行该命令,如果文件存在,则修改 config.yml文件;如果不存在,则创建文件。对于熟练使用者,可以直接修改config.yml实现多个控制器的信息获取"""
_wlc = {}
channels_5G = False
channels_24G = False
if channel == "all":
channels_5G = True
channels_24G = True
elif channel == "5G":
channels_5G = True
elif channel == "2.4G":
channels_24G = True
all_commands_dict = {
"cisco_ios": {
"5G": ["show ap dot11 5ghz cleanair air-quality summary", "show ap dot11 5ghz load-info",
"show ap dot11 5ghz summary"],
"2.4G": ["show ap dot11 24ghz cleanair air-quality summary", "show ap dot11 24ghz load-info",
"show ap dot11 24ghz summary"],
"common": ["show version", "show wlan summary"]
},
"cisco_wlc_ssh": {
"5G": ["show 802.11a cleanair air-quality summary", "show advanced 802.11a summary"],
"2.4G": ["show 802.11b cleanair air-quality summary", "show advanced 802.11b summary"],
"common": ["show sysinfo", "show wlan summary"]
}
}
_wlc[client] = {
"host": host,
"username": username,
"password": password,
"port": port,
"capture": True,
"device_type": g_device_type.get(device_type)
}
_config = {
"channels_5G": channels_5G,
"channels_24G": channels_24G,
"rssi_min_dBm": rssi,
"devices": _wlc,
"commands": all_commands_dict
}
# print(_config)
if not os.path.isfile('config.yml'):
# new config.yml file
with open(f'config.yml', "w") as file:
yaml.dump(_config, file)
print("config.yml file created successfully, next step run command: rogue")
else:
# modify the old config.yml file and make sure the new config refreshed
with open(f'config.yml', "r") as file:
_old_config = yaml.load(file, Loader=yaml.FullLoader)
_devices = _old_config.pop("devices")
_old_host = {v.get("host"): k for k, v in _devices.items()}
if host in _old_host.keys():
_devices.pop(_old_host.get(host))
_devices[client] = _wlc[client]
_config.pop("devices")
_config["devices"] = _devices
with open(f'config.yml', "w") as file:
yaml.dump(_config, file)
print("config.yml file changed successfully, please check sure this file correctly, then to run command: rogue")
return
def rogue_detail(_cmd, _mac_list, _netmiko, _folder, _device_type, _file_write=True):
_detail_str = ""
for i in _mac_list:
_detail_str += f'-------- {_cmd} {i} --------\n\n'
_detail_str += _netmiko.send_command(f'{_cmd} {i}')
_detail_str += "\n"
if _file_write:
with open(f'{_folder}/{_cmd}.txt', "w") as file:
file.write(_detail_str)
if _cmd in data_f_map[_device_type].keys():
return data_f_map[_device_type][_cmd](_detail_str)
else:
return []
def one_command_data(_cmd, _netmiko, _folder, _device_type, _file_write=True):
# all command functions
output = _netmiko.send_command(_cmd)
if _file_write:
with open(f'{_folder}/{_cmd}.txt', "w") as file:
file.write(output)
if _cmd in data_f_map[_device_type].keys():
return data_f_map[_device_type][_cmd](output)
else:
return []
@click.command()
@click.option('--conf', default="config.yml", prompt="please input the config filename", help="默认为 config.yml file,输入自定义配置文件")
def run(conf):
""" 步骤二:从无线控制器 - WLC 中抓取 Rogue AP 信息,命令格式可以是:rogue 或者 rogue run
如果config.yml中配置多个devices ,则全部抓取"""
config = {}
if not os.path.isfile(conf):
print(f"{conf} 文件不存在,please run it first: rogue init")
return
with open(conf) as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if not config.get("channels_5G") and not config.get("channels_24G"):
print(f"至少需要一个频段 5G、2.4G:在 {conf} 中设置 channels_5G、channels_24G,其中一个是 yes or true")
return
if not isinstance(config.get("rssi_min_dBm"), int):
print("rssi_min_dBm 需要设置为数值,其默认值为:-80 ")
return
print("Start to collect.......")
# add device type for consumed by netmiko
# C9800 type: cisco_ios
_wlcs = {}
for name, device in config["devices"].items():
if device.get("capture"):
device.pop("capture")
_wlcs[name] = device
if not _wlcs:
print("没有 WLC 需要信息抓取,检查devices至少一个 WLC 里面:capture: yes")
return
now = datetime.now()
# now_time = now.strftime("%Y%m%d_%H%M%S")
# now_time = now.strftime("%Y-%m-%d %H:%M:%S")
# Show command that we execute
rogue_ap_all = []
_header_list = []
for name, device in _wlcs.items():
print(f' start to collect info from device/host: {name}/{device.get("host")}')
with ConnectHandler(**device) as net_connect:
_folder = f'{FOLDER}/{name}_{device.get("host")}'
if not os.path.exists(_folder):
os.makedirs(_folder)
# -------------------------- 1: for rogue ap summary --------------------------
_summary = one_command_data(cmd_dict[device["device_type"]].get("rogue_summary"), net_connect, _folder, device["device_type"], config.get("write_file", True))
channel = channel_5G_24G(_summary, config["rssi_min_dBm"])
_mac_list = []
if config.get("channels_5G"):
_mac_list.extend([i.get("rogue_ap_mac") for i in channel.get("5G")])
elif config.get("channels_24G"):
_mac_list.extend([i.get("rogue_ap_mac") for i in channel.get("24G")])
if not _mac_list:
print(f'For WLC - {name} {device.get("host")}: No Rogue AP found')
continue
# print(f'拟抓取的 Rogue APs 个数: {len(_mac_list)}')
# -------------------------- 2: for rogue ap detailed --------------------------
_rogue_detail = rogue_detail(cmd_dict[device["device_type"]].get("rogue"), _mac_list, net_connect, _folder, device["device_type"], config.get("write_file", True))
# print(f'rogue dtailed : {len(_rogue_detail)}')
# print(_rogue_detail)
# print(_rogue_detail)
# run commands capture
# -------------------------- 3: for all other commands --------------------------
_commands_list = []
if config.get("channels_5G"):
_commands_list.extend(config["commands"][device["device_type"]]["5G"])
_commands_list.extend(config["commands"][device["device_type"]]["common"])
elif config.get("channels_24G"):
_commands_list.extend(config["commands"][device["device_type"]]["2.4G"])
_commands_list.extend(config["commands"][device["device_type"]]["common"])
_data = {}
for i in _commands_list:
_data[i] = one_command_data(i, net_connect, _folder, device["device_type"], config.get("write_file", True))
# -------------------------- 4: data relationship and output to table --------------------------
if device.get("device_type") == "cisco_ios":
_rogue = rogue_channel_data_ios_5G(_data.get("show ap dot11 5ghz load-info"), _data.get("show ap dot11 5ghz summary"), _rogue_detail)
elif device.get("device_type") == "cisco_wlc_ssh":
_rogue = rogue_channel_data_wlc_5G(_data.get("show advanced 802.11a summary"), _rogue_detail)
for i in _rogue:
i.update({
"client_name": name,
"WLC_host": device.get("host"),
"captured_date": now.strftime("%Y-%m-%d %H:%M:%S")
})
rogue_ap_all.append(i)
# print(json.dumps(rogue_ap_all, indent=4))
print(
f' For {name}/{device.get("host")}: rogue AP count in channels 5G/2.4G: {len(channel.get("5G"))}/{len(channel.get("24G"))}')
print(" --------------------------------------------------")
_header_list.extend(rogue_ap_all[0].keys())
# -------------------------- 5: final to csv --------------------------
# print(rogue_ap_all)
_header = []
_header_missing_data = []
for _c_name in ["rogue_mac", "rogue_ssid", "rogue_rssi", "ap_name", "check_channel",
"channel", "rogue_channels", "channel_utilization", "clients", "txpwr", "client_name",
"WLC_host", "captured_date"]:
if _c_name in list(set(_header_list)):
_header.append(_c_name)
else:
_header_missing_data.append(_c_name)
if _header_missing_data:
print(f'Missing data for this columns: {_header_missing_data}')
df = pd.DataFrame(rogue_ap_all)
df.sort_values(by=['rogue_rssi'], ascending=False).to_csv(
f'{FOLDER}/rogue ap {now.strftime("%Y%m%d-%H%M%S")}.csv', index=False, columns=_header)
print(f"请检查子目录-{FOLDER} 下,检查show命令输出文件是否生成,重复运行将覆盖目录下文件。")
if __name__ == '__main__':
# raw data folder
FOLDER = "output"
if not os.path.exists(FOLDER):
os.makedirs(FOLDER)
cli.add_command(init)
cli.add_command(run)
cli()