cisco_fmc_api_via_excel_operations

Форк
0
409 строк · 20.4 Кб
1
#!/usr/bin/python3
2
import cfg
3
import json
4
import requests
5
from datetime import datetime
6
import logging
7
from get_func import get_domain_uuid, get_domain_name_by_uuid, api_call_counter, check_token_status
8

9

10
def post_network_objects(domain_data, domains_uuid):
11
    fmc_ip = cfg.fmc_ip
12
    headers_json = cfg.headers_json
13
          
14
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to add network objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
15
    try:
16
        check_token_status()
17
        api_call_counter()
18
        resp = requests.post(
19
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domains_uuid}/object/networks?bulk=true",
20
            headers=headers_json, data=json.dumps(domain_data), verify=False)
21
        # time.sleep(0.7)
22
        # logging.info(f'request_body:\n{resp.request.body}')
23
        # logging.info(f'Headers: {str(resp.headers)}\n')
24
        # logging.info(f'Text: {str(resp.text)}\n')
25
        logging.info(
26
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Network objects add operation in domain {get_domain_name_by_uuid(domains_uuid)} status Code: {str(resp.status_code)}\n')
27
        if not str(resp.status_code).startswith('2'):
28
            errors_filename = 'outputs/errors.txt'
29
            with open(errors_filename, "a") as f:
30
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}  Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
31
    except requests.exceptions.HTTPError as errh:
32
        logging.info(f'{errh}')
33
        raise SystemExit(errh)
34
    except requests.exceptions.RequestException as err:
35
        logging.info(f'{err}')
36
        raise SystemExit(err)
37
    logging.info(f'\n{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished adding network objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
38

39

40
def post_range_objects(domain_data, domains_uuid):
41
    fmc_ip = cfg.fmc_ip
42
    headers_json = cfg.headers_json
43

44
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to add range objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
45
    try:
46
        check_token_status()
47
        api_call_counter()
48
        resp = requests.post(
49
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domains_uuid}/object/ranges?bulk=true",
50
            headers=headers_json, data=json.dumps(domain_data), verify=False)
51
        # time.sleep(0.7)
52
        # logging.info(f'request_body:\n{resp.request.body}')
53
        # logging.info(f'Headers: {str(resp.headers)}\n')
54
        # logging.info(f'Text: {str(resp.text)}\n')
55
        logging.info(
56
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} range objects add operation in domain {get_domain_name_by_uuid(domains_uuid)} status Code: {str(resp.status_code)}\n')
57
        if not str(resp.status_code).startswith('2'):
58
            errors_filename = 'outputs/errors.txt'
59
            with open(errors_filename, "a") as f:
60
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}  Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
61
    except requests.exceptions.HTTPError as errh:
62
        logging.info(f'{errh}')
63
        raise SystemExit(errh)
64
    except requests.exceptions.RequestException as err:
65
        logging.info(f'{err}')
66
        raise SystemExit(err)
67
    logging.info(f'\n{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished adding range objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
68

69

70
def post_host_objects(domain_data, domains_uuid):
71
    fmc_ip = cfg.fmc_ip
72
    headers_json = cfg.headers_json
73

74
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to add host objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
75
    try:
76
        check_token_status()
77
        api_call_counter()
78
        resp = requests.post(
79
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domains_uuid}/object/hosts?bulk=true",
80
            headers=headers_json, data=json.dumps(domain_data), verify=False)
81
        # time.sleep(0.7)
82
        # logging.info(f'request_body:\n{resp.request.body}')
83
        # logging.info(f'Headers: {str(resp.headers)}\n')
84
        # logging.info(f'Text: {str(resp.text)}\n')
85
        logging.info(f' {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Host objects add operation in domain {get_domain_name_by_uuid(domains_uuid)} status Code: {str(resp.status_code)}\n')
86
        if not str(resp.status_code).startswith('2'):
87
            errors_filename = 'outputs/errors.txt'
88
            with open(errors_filename, "a") as f:
89
                f.write(
90
                    f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
91
    except requests.exceptions.HTTPError as errh:
92
        logging.info(f'{errh}')
93
        raise SystemExit(errh)
94
    except requests.exceptions.RequestException as err:
95
        logging.info(f'{err}')
96
        raise SystemExit(err)
97
    logging.info(f'\n{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished adding host objests in domain {get_domain_name_by_uuid(domains_uuid)}\n')
98

99

100
def post_url_objects(domain_data, domains_uuid):
101
    fmc_ip = cfg.fmc_ip
102
    headers_json = cfg.headers_json
103
          
104
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to add url objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
105
    try:
106
        check_token_status()
107
        api_call_counter()
108
        resp = requests.post(
109
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domains_uuid}/object/urls?bulk=true",
110
            headers=headers_json, data=json.dumps(domain_data), verify=False)
111
        # time.sleep(0.7)
112
        # logging.info(f'request_body:\n{resp.request.body}')
113
        # logging.info(f'Headers: {str(resp.headers)}\n')
114
        # logging.info(f'Text: {str(resp.text)}\n')
115
        logging.info(
116
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Url objects add operation in domain {get_domain_name_by_uuid(domains_uuid)} status Code: {str(resp.status_code)}\n')
117
        if not str(resp.status_code).startswith('2'):
118
            errors_filename = 'outputs/errors.txt'
119
            with open(errors_filename, "a") as f:
120
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}  Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
121
    except requests.exceptions.HTTPError as errh:
122
        logging.info(f'{errh}')
123
        raise SystemExit(errh)
124
    except requests.exceptions.RequestException as err:
125
        logging.info(f'{err}')
126
        raise SystemExit(err)
127
    logging.info(f'\n{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished adding url objects in domain {get_domain_name_by_uuid(domains_uuid)}\n')
128

129

130
def post_groups_objects(domain_data, domain_uuid):
131
    fmc_ip = cfg.fmc_ip
132
    headers_json = cfg.headers_json
133

134
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to add POST group in domain {get_domain_name_by_uuid(domain_uuid)}\n')
135
    try:
136
        check_token_status()
137
        api_call_counter()
138
        resp = requests.post(
139
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/networkgroups?bulk=true",
140
            headers=headers_json, data=json.dumps(domain_data), verify=False)
141
        # time.sleep(0.7)
142
        logging.info(
143
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} POST group ADD status Code: {str(resp.status_code)}\n')
144
        if not str(resp.status_code).startswith('2'):
145
            errors_filename = 'outputs/errors.txt'
146
            with open(errors_filename, "a") as f:
147
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
148
    except requests.exceptions.HTTPError as errh:
149
        logging.info(f'{errh}')
150
        raise SystemExit(errh)
151
    except requests.exceptions.RequestException as err:
152
        logging.info(f'{err}')
153
        raise SystemExit(err)
154
    logging.info(f'\n{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished adding POST group in domain {get_domain_name_by_uuid(domain_uuid)}"\n')
155

156

157
def post_urlgroups_objects(domain_data, domain_uuid):
158
    fmc_ip = cfg.fmc_ip
159
    headers_json = cfg.headers_json
160

161
    logging.info(
162
        f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to add POST group in domain {get_domain_name_by_uuid(domain_uuid)}\n')
163
    try:
164
        check_token_status()
165
        api_call_counter()
166
        resp = requests.post(
167
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/urlgroups?bulk=true",
168
            headers=headers_json, data=json.dumps(domain_data), verify=False)
169
        # time.sleep(0.7)
170
        logging.info(
171
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} POST urlgroup ADD status Code: {str(resp.status_code)}\n')
172
        if not str(resp.status_code).startswith('2'):
173
            errors_filename = 'outputs/errors.txt'
174
            with open(errors_filename, "a") as f:
175
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
176
    except requests.exceptions.HTTPError as errh:
177
        logging.info(f'{errh}')
178
        raise SystemExit(errh)
179
    except requests.exceptions.RequestException as err:
180
        logging.info(f'{err}')
181
        raise SystemExit(err)
182
    logging.info(
183
        f'\n{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished adding POST urlgroup in domain {get_domain_name_by_uuid(domain_uuid)}\n')
184

185

186
def del_groups(group_data, domain_name):
187
    fmc_ip = cfg.fmc_ip
188
    headers_json = cfg.headers_json
189
        
190
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to DEL  groups in domain {domain_name}\n')
191
    object_type = group_data['type']
192
    group_id = group_data['id']
193
    domain_uuid = get_domain_uuid(domain_name)['uuid']
194
    try:
195
        check_token_status()
196
        api_call_counter()
197
        resp = requests.delete(
198
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/networkgroups/{group_id}",
199
            headers=headers_json, verify=False)
200
        # time.sleep(0.7)
201
        # logging.info(f'Text: {str(resp.text)}\n')
202
        logging.info(
203
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} DEL group {group_data["name"]} status Code: {str(resp.status_code)}\n')
204
        if not str(resp.status_code).startswith('2'):
205
            errors_filename = 'outputs/errors.txt'
206
            with open(errors_filename, "a") as f:
207
                f.write(
208
                    f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
209
        elif str(resp.status_code).startswith('2'):
210
            del cfg.all_obj_domain[domain_name][object_type][group_data['name']]
211
            del cfg.all_detailed_networkgroups[domain_name][group_data['name']]
212
    except requests.exceptions.HTTPError as errh:
213
        logging.info(f'{errh}')
214
        raise SystemExit(errh)
215
    except requests.exceptions.RequestException as err:
216
        logging.info(f'{err}')
217
        raise SystemExit(err)
218
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished to DEL groups in domain {domain_name}\n')
219

220

221
def del_urlgroups(group_data, domain_name):
222
    fmc_ip = cfg.fmc_ip
223
    headers_json = cfg.headers_json
224

225
    logging.info(
226
        f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to DEL urlgroups in domain {domain_name}\n')
227
    object_type = group_data['type']
228
    group_id = group_data['id']
229
    domain_uuid = get_domain_uuid(domain_name)['uuid']
230
    try:
231
        check_token_status()
232
        api_call_counter()
233
        resp = requests.delete(
234
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/{object_type.lower()}s/{group_id}",
235
            headers=headers_json, verify=False)
236
        # time.sleep(0.7)
237
        # logging.info(f'Text: {str(resp.text)}\n')
238
        logging.info(
239
            f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} DEL urlgroup {group_data["name"]} status Code: {str(resp.status_code)}\n')
240
        if not str(resp.status_code).startswith('2'):
241
            errors_filename = 'outputs/errors.txt'
242
            with open(errors_filename, "a") as f:
243
                f.write(
244
                    f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
245
        elif str(resp.status_code).startswith('2'):
246
            del cfg.all_obj_domain[domain_name][f'{object_type.lower()}s'][group_data['name']]
247
    except requests.exceptions.HTTPError as errh:
248
        logging.info(f'{errh}')
249
        raise SystemExit(errh)
250
    except requests.exceptions.RequestException as err:
251
        logging.info(f'{err}')
252
        raise SystemExit(err)
253
    logging.info(
254
        f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished to DEL urlgroups in domain {domain_name}\n')
255

256

257
def put_networkgroups(group_data, domain_name):
258
    fmc_ip = cfg.fmc_ip
259
    headers_json = cfg.headers_json
260

261
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to modify PUT group in domain {domain_name}\n')
262
    group_id = group_data['id']
263
    domain_uuid = get_domain_uuid(domain_name)['uuid']
264
    try:
265
        check_token_status()
266
        api_call_counter()
267
        resp = requests.put(
268
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/networkgroups/{group_id}",
269
            headers=headers_json, data=json.dumps(group_data), verify=False)
270
        # time.sleep(0.7)
271
        # logging.info(f'Text: {str(resp.text)}\n')
272
        logging.info(
273
            f' {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} PUT group {group_data["name"]} modify operation status Code: {str(resp.status_code)}\n')
274
        if not str(resp.status_code).startswith('2'):
275
            errors_filename = 'outputs/errors.txt'
276
            with open(errors_filename, "a") as f:
277
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
278
    except requests.exceptions.HTTPError as errh:
279
        logging.info(f'{errh}')
280
        raise SystemExit(errh)
281
    except requests.exceptions.RequestException as err:
282
        logging.info(f'{err}')
283
        raise SystemExit(err)
284
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished to modify PUT group in domain {domain_name}\n')
285

286

287
def put_urlgroups(group_data, domain_name):
288
    fmc_ip = cfg.fmc_ip
289
    headers_json = cfg.headers_json
290

291
    logging.info(
292
        f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to modify PUT urlgroup in domain {domain_name}\n')
293
    group_id = group_data['id']
294
    domain_uuid = get_domain_uuid(domain_name)['uuid']
295
    try:
296
        check_token_status()
297
        api_call_counter()
298
        resp = requests.put(
299
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/urlgroups/{group_id}",
300
            headers=headers_json, data=json.dumps(group_data), verify=False)
301
        # time.sleep(0.7)
302
        # logging.info(f'Text: {str(resp.text)}\n')
303
        logging.info(
304
            f' {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} PUT urlgroup {group_data["name"]} modify operation status Code: {str(resp.status_code)}\n')
305
        if not str(resp.status_code).startswith('2'):
306
            errors_filename = 'outputs/errors.txt'
307
            with open(errors_filename, "a") as f:
308
                f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
309
        
310
    except requests.exceptions.HTTPError as errh:
311
        logging.info(f'{errh}')
312
        raise SystemExit(errh)
313
    except requests.exceptions.RequestException as err:
314
        logging.info(f'{err}')
315
        raise SystemExit(err)
316
    logging.info(
317
        f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished to modify PUT urlgroup in domain {domain_name}\n')
318

319

320
def put_object(object_data, domain_uuid):
321
    fmc_ip = cfg.fmc_ip
322
    headers_json = cfg.headers_json
323
    all_obj_domain = cfg.all_obj_domain
324
  
325
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to modify PUT object in domain {get_domain_name_by_uuid(domain_uuid)}\n')
326
    object_id = object_data['id']
327
    
328
    # object_type = check_object_type(object_data.get('name'))
329
    object_type = object_data.get('type')
330
    # domain_uuid = get_domain_uuid(domain_name)['uuid']
331
    try:
332
        check_token_status()
333
        api_call_counter()
334
        resp = requests.put(
335
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{domain_uuid}/object/{object_type}s/{object_id}",
336
            headers=headers_json, data=json.dumps(object_data), verify=False)
337
        # time.sleep(0.7)
338
        # logging.info(f'Text: {str(resp.text)}\n')
339
        logging.info(
340
            f' {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} PUT object {object_data["name"]} modify operation status Code: {str(resp.status_code)}\n')
341
        if not str(resp.status_code).startswith('2'):
342
            errors_filename = 'outputs/errors.txt'
343
            with open(errors_filename, "a") as f:
344
                f.write(
345
                    f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
346
        elif str(resp.status_code).startswith('2'):
347
            tmp_object_data = all_obj_domain[get_domain_name_by_uuid(domain_uuid)][f'{object_type}s'][object_data.get('name')]
348
            if object_data.get('value'):
349
                tmp_object_data.update({'value': object_data.get('value')})
350
            elif object_data.get('url'):
351
                tmp_object_data.update({'url': object_data.get('url')})
352
            cfg.all_obj_domain[get_domain_name_by_uuid(domain_uuid)][f'{object_type}s'][object_data.get('name')].update({object_data.get('name'): tmp_object_data})
353
    except requests.exceptions.HTTPError as errh:
354
        logging.info(f'{errh}')
355
        raise SystemExit(errh)
356
    except requests.exceptions.RequestException as err:
357
        logging.info(f'{err}')
358
        raise SystemExit(err)
359
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished to modify PUT object in domain {get_domain_name_by_uuid(domain_uuid)}\n')
360

361

362
def del_objects(object_data, domain_name):
363
    fmc_ip = cfg.fmc_ip
364
    headers_json = cfg.headers_json
365
    
366
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Starting to DEL objects in domain {domain_name}\n')
367
    # obj_id = all_devices[object_data['name']]['id']
368
    obj_id = object_data['id']
369
    api_obj_type = f"{object_data['type'].lower()}s"
370
    try:
371
        uuid = get_domain_uuid(domain_name)['uuid']
372
    except TypeError as error:
373
        logging.info(
374
            f'Domain {domain_name} do not exist either on FMC, either in Excel sheet')
375
        errors_filename = 'outputs/errors.txt'
376
        with open(errors_filename, "a") as f:
377
            f.write(
378
                f'Domain {domain_name} do not exist either on FMC, either in Excel sheet\n Error: {error}\n')
379

380
    try:
381
        check_token_status()
382
        api_call_counter()
383
        resp = requests.delete(
384
            f"https://{fmc_ip}/api/fmc_config/v1/domain/{uuid}/object/{api_obj_type}/{obj_id}",
385
            headers=headers_json,
386
            # data=json.dumps(domain_data),
387
            verify=False)
388
        # time.sleep(0.7)
389
        # logging.info(f'resp.request.body')
390
        # logging.info(f'Headers: {str(resp.headers)}')
391
        # logging.info(f'Text: {str(resp.text)}')
392
        logging.info(
393
            f' {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} DEL object {object_data["name"]} in domain {domain_name} status code: {str(resp.status_code)}')
394
        if not str(resp.status_code).startswith('2'):
395
            errors_filename = 'outputs/errors.txt'
396
            with open(errors_filename, "a") as f:
397
                f.write(
398
                    f'Status code: {resp.status_code}\n Request.body: {resp.request.body}\n Request text: {resp.text}\n')
399
        elif str(resp.status_code).startswith('2'):
400
            del cfg.all_obj_domain[domain_name][api_obj_type][object_data['name']]
401
        # elif str(resp.status_code).startswith('4'):
402
        #     del cfg.all_obj_domain[domain_name][api_obj_type][object_data['name']]
403
    except requests.exceptions.HTTPError as errh:
404
        logging.info(f'{errh}')
405
        raise SystemExit(errh)
406
    except requests.exceptions.RequestException as err:
407
        logging.info(f'{err}')
408
        raise SystemExit(err)
409
    logging.info(f'\n {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished to DEL objects in domain {domain_name}\n')
410

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.