This website uses Cookies. Click Accept to agree to our website's cookie use as described in our Privacy Policy. Click Preferences to customize your cookie settings.
Sign in with your Check Point UserCenter/PartnerMap account to access more great content and get a chance to win some Apple AirPods! If you don't have an account, create one now for free!
Упрощаем работу с Check Point API с помощью Python SDK
Вся мощь взаимодействия с API раскрывается при совместном использовании с програмным кодом, когда появляются возможности динамически формировать API запросы и инструменты для анализа API ответов . Однако, малозаметным до сих пор остаётся Python Software Development Kit (далее - Python SDK) для Check Point Management API, а зря. Он ощутимо упрощает жизнь разработчикам и любителям автоматизации. Python приобрёл огромную популярность в последнее время и я решил устранить пробел и сделать обзор основных возмоностей Check Point API Python Development Kit. Данная статья служит отличным дополнением другой статьи на Хабре Check Point R80.10 API. Управление через CLI, скрипты и не только. Мы же рассмотрим как написать скрипты с использованием Python SDK и остановимся подробнее на новом функционале Management API в версии 1.6(поддерживается начиная с R80.40). Для понимания статьи понадобятся базовые знания по работе с API и Python. Check Point активно развивает API и на данный момент на свет появились:
Python SDK на данный момент поддерживает взаимодействие только с Management API и Gaia API. Мы рассмотрим самые важные классы, методы и переменные в данном модуле.
Установка модуля
Модуль cpapi устанавливается быстро и просто из официального репозитария Check Point на github с помощью pip. Подбробная инструкция по установке есть в README.md. Данный модуль адаптирован для работы с версиями Python 2.7 и 3.7. В данной статье примеры будут приведены с использованием Python 3.7. Однако, Python SDK можно запускать прямо с сервера управления Check Point (Smart Management), но на них поддерживается только версия Python 2.7, поэтому в последнем разделе будет приведён код для версии 2.7. Сразу после установки модуля рекомендую посмотреть на примеры в директориях examples_python2 и examples_python3.
Начало работы
Для того, чтобы у нас появилась возможность работать с компонентами модуля cpapi необходимо импортировать из модуля cpapi как минимум два необходимых класса: APIClient и APIClientArgs
from cpapi import APIClient, APIClientArgs
Класс APIClientArgs отвечает за параметры подключения к API серверу, а класс APIClient отвечает за взаимодействие с API.
Определяем параметры подключения
Чтобы определить различные параметры подключения к API, нужно создать экземпляр класса APIClientArgs. В принципе его параметры предопределены и при запуске скрипта на сервере управления их можно не указывать.
client_args = APIClientArgs()
Но при запуске на стороннем хосте нужно указать как минимум IP адрес или имя хоста API сервера(он же сервер управления). В примере ниже мы определяем параметр подключения server и присваиваем ему в виде строки IP адрес сервера управления.
class APIClientArgs:
"""
This class provides arguments for APIClient configuration.
All the arguments are configured with their default values.
"""
# port is set to None by default, but it gets replaced with 443 if not specified
# context possible values - web_api (default) or gaia_api
def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
self.port = port
# management server fingerprint
self.fingerprint = fingerprint
# session-id.
self.sid = sid
# management server name or IP-address
self.server = server
# debug level
self.http_debug_level = http_debug_level
# an array with all the api calls (for debug purposes)
self.api_calls = api_calls if api_calls else []
# name of debug file. If left empty, debug data will not be saved to disk.
self.debug_file = debug_file
# HTTP proxy server address (without "http://")
self.proxy_host = proxy_host
# HTTP proxy port
self.proxy_port = proxy_port
# Management server's API version
self.api_version = api_version
# Indicates that the client should not check the server's certificate
self.unsafe = unsafe
# Indicates that the client should automatically accept and save the server's certificate
self.unsafe_auto_accept = unsafe_auto_accept
# The context of using the client - defaults to web_api
self.context = context
Полагаю, что аргументы, которые можно использовать в экземплярах класса APIClientArgs, интуитивно понятны администраторам Check Point и в дополнительных комментариях не нуждаются.
Подключаемся через APIClient и менеджер контекста
Класс APIClient удобнее всего использовать через менеджер контекста. Всё, что нужно передать экземпляру класса APIClient это параметры подключения, которые были определены в прошлом шаге.
with APIClient(client_args) as client:
Менеджер контекста не станет выполнять автоматически login вызов на API сервер, однако он выполнит вызов logout при выходе из него. Если по каким-то причинам logout по окончанию работы с API вызовами не требуется, нужно начать работу без использования менеджера контекста:
client = APIClient(clieng_args)
Проверка соединения
Проверить проходит ли соединение по заданным параметрам проще всего с помощью метода check_fingerprint. Если проверка хеш-суммы sha1 для fingerprint сертификата API сервера не прошла(метод вернул False), то обычно это вызвано проблемами с соединением и мы можем остановить выполнение программы(или дать пользователю возможность исправить данные для подключения):
if client.check_fingerprint() is False:
print("Could not get the server's fingerprint - Check connectivity with the server.")
exit(1)
Учтите, что в дальнейшем класс APIClient будет проверять при каждом API вызове (методы api_call и api_query, о них речь пойдёт чуть дальше) sha1 fingerprint сертификата на API сервере. А вот если при проверке sha1 fingerprint сертификата API сервера будет выявлена ошибка (сертификат неизвестен или был изменён), метод check_fingerprint предоставит возможность добавить/изменить информацию о нём на локальной машине в автоматическом режиме. Данную проверку можно отключить вовсе (но такое можно рекомендовать только в случае запуска скриптов на самом API сервере, при подключении к 127.0.0.1), используя аргумент APIClientArgs - unsafe_auto_accept (см. подробнее про APIClientArgs ранее в "Определяем параметры подключения").
У APIClient есть целых 3 метода логина на API сервер, и каждый из них запонимает значение sid(session-id), которое использует автоматически в каждом последующем API вызове в заголовке(имя в заголовке у данного параметра - X-chkp-sid), так что не нужно дополнительно обрабатывать данный параметр.
Метод login
Вариант с использованием логина и пароля(в примере имя пользователя admin и пароль 1q2w3e переданы как позиционные аргументы):
login = client.login('admin', '1q2w3e')
В методе login доступны также дополнительные опциональные параметры, привожу их имена и значения по умолчанию:
Вариант с использованием api ключа(поддерживается начиная с версии менеджмента R80.40/Management API v1.6, "3TsbPJ8ZKjaJGvFyoFqHFA==" это значение ключа API для одного из пользователей на сервере управления с методом авторизации API key):
В методе login_with_api_key доступны такие же опциональные параметры как и в методе login.
Метод login_as_root
Вариант login'а на локальную машину с API сервером:
login = client.login_as_root()
Для данного метода доступны всего два опциональных параметра:
domain=None, payload=None
И наконец сами API вызовы
У нас есть два варианта делать API вызовы через методы api_call и api_query. Давайте разберёмся в чём между ними разница.
api_call
Данный метод применим для любых вызовов. Нам нужно передать последнюю часть для api вызова и payload в теле запроса при необходимости. Если payload пустой, то его можно не передавать вовсе:
Сразу оговорюсь, что данный метод применим только для вызовов, вывод которых предполагает offset(сдвиг). Такой вывод происходит в том случае, когда в нём содержится или может содержатся большое количество информации. Например, это может быть запрос списка всех созданных объектов типа хост на сервере управления. Для таких запросов API возвращает список из 50 объектов по умолчанию(можно увеличить лимит до 500 объектов в ответе). И для того, чтобы не дёргать информацию по несколько раз, меняя параметр offset в API запросе, есть метод api_query, который данную работу проделывает автоматически. Примеры вызовов, где нужен данный метод: show-sessions, show-hosts, show-networks, show-wildcards, show-groups, show-address-ranges, show-simple-gateways, show-simple-clusters, show-access-roles, show-trusted-clients, show-packages. По факту, в имени этих API вызовов мы видим слова во множественном числе, так что эти вызовы будет проще обрабатывать через api_query
После этого можно использовать переменные и методы класса APIResponse(как внутри менеджера контекста, так и снаружи). У класса APIResponse предопределено 4 метода и 5 переменных, на самых важных мы остановимся подробнее.
success
Для начала было бы неплохо убедиться в том, что API вызов прошёл успешно и вернул результат. Для этого есть метод success:
In [49]: api_versions.success
Out[49]: True
Возвращает True если API вызов прошёл успешно(Код ответа - 200) и False если не успешно(любой другой код ответа). Удобно использовать сразу после API вызова, чтобы в зависимости от кода ответа вывести разную информацию.
if api_ver.success:
print(api_versions.data)
else:
print(api_versions.err_message)
statuscode
Возвращает код ответа после выполнения API вызова.
In [62]: api_versions.status_code
Out[62]: 400
Возможные коды ответов: 200,400,401,403,404,409,500,501.
set_success_status
При этом может быть необходимость изменить значение статуса success. Технически, туда можно поместить что угодно, даже обычную строку. Но реальным примером может быть сброс данного параметра в False при определенных сопутствующих условиях. Ниже обратите внимание на пример, когда есть задачи, выполняемые на сервере управления, но мы будем считать этот запрос неудачным(выставим переменную success в False, несмотря на то, что API вызов был успешный и вернул код 200).
for task in task_result.data["tasks"]:
if task["status"] == "failed" or task["status"] == "partially succeeded":
task_result.set_success_status(False)
break
response()
Метод response позволяет посмотреть словарь с кодом ответа(status_code) и с телом ответа(body).
Данная информация доступна, только когда при обработке API запроса возникла ошибка(код ответа не 200). Пример вывода
In [107]: api_versions.error_message
Out[107]: 'code: generic_err_invalid_parameter_name\nmessage: Unrecognized parameter [1]\n'
Полезные примеры
Ниже перечислены примеры, в которых используются API вызовы, которые были добавлены в версию Management API 1.6.
Для начала рассмотрим работу вызовов add-host и add-address-range. Допустим, нам нужно создать в качестве объектов типа хост все ip адреса подсети 192.168.0.0/24, последний октет которых равен 5, а все остальные ip адреса записать в качестве объектов типа диапазон адресов. При этом, адрес подсети и широковещательный адрес исключить.
Итак, ниже представлен скрипт в котором решается данная задача и создаются 50 объектов типа хост и 51 объект типа диапазон адресов. На решение задачи требуется 101 API вызов(не считая финального вызова publish). Также с помощью модуля timeit мы подсчитываем время на выполнение скрипта до момента публикации изменений.
import timeit
from cpapi import APIClient, APIClientArgs
start = timeit.default_timer()
first_ip = 1
last_ip = 4
client_args = APIClientArgs(server="192.168.47.240")
with APIClient(client_args) as client:
login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
for ip in range(5,255,5):
add_host = client.api_call("add-host", {"name" : f"h_192.168.0.{ip}", "ip-address": f'192.168.0.{ip}'})
while last_ip < 255:
add_range = client.api_call("add-address-range", {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"})
first_ip+=5
last_ip+=5
stop = timeit.default_timer()
publish = client.api_call("publish")
print(f'Time to execute batch request: {stop - start} seconds')
В моей лабораторной среде на выполнение данного скрипта уходит от 30 до 50 секунд в зависимости от нагрузки на сервер управления.
А теперь посмотрим как решить эту же задачу с помощью API вызова add-objects-batch, поддержка которого добавлена в версию API 1.6. Данный вызов позволяет за один API запрос создать сразу множество объектов. Причём это могут быть объекты разных типов(например хосты, подсети и диапазоны адресов). Таким образом наша задача может быть решена в рамказ одного API вызова.
import timeit
from cpapi import APIClient, APIClientArgs
start = timeit.default_timer()
client_args = APIClientArgs(server="192.168.47.240")
objects_list_ip = []
objects_list_range = []
for ip in range(5,255,5):
data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
objects_list_ip.append(data)
first_ip = 1
last_ip = 4
while last_ip < 255:
data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
objects_list_range.append(data)
first_ip+=5
last_ip+=5
data_for_batch = {
"objects" : [ {
"type" : "host",
"list" : objects_list_ip
}, {
"type" : "address-range",
"list" : objects_list_range
}]
}
with APIClient(client_args) as client:
login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
stop = timeit.default_timer()
publish = client.api_call("publish")
print(f'Time to execute batch request: {stop - start} seconds')
А на выполнение данного скрипта в моей лабораторной среде уходит от 3 до 7 секунд в зависимости от нагрузки на сервер управления. То есть, в среднем, на 101 объекте API вызов типа batch, отрабатывает в 10 раз быстрее. На большем количестве объектов разница будет ещё более впечатляющей.
Теперь давайте посмотрим как работать с set-objects-batch. С помощью данного API вызова мы можем массово изменить любой параметр. Давайте настроим первой половине адресов из прошлого примера (до .124 хоста, причём и диапазонам тоже) цвет sienna, а второй половине адресов назначим цвет khaki.
from cpapi import APIClient, APIClientArgs
client_args = APIClientArgs(server="192.168.47.240")
objects_list_ip_first = []
objects_list_range_first = []
objects_list_ip_second = []
objects_list_range_second = []
for ip in range(5,125,5):
data = {"name": f'h_192.168.0.{ip}', "color": "sienna"}
objects_list_ip_first.append(data)
for ip in range(125,255,5):
data = {"name": f'h_192.168.0.{ip}', "color": "khaki"}
objects_list_ip_second.append(data)
first_ip = 1
last_ip = 4
while last_ip < 125:
data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "sienna"}
objects_list_range_first.append(data)
first_ip+=5
last_ip+=5
while last_ip < 255:
data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "khaki"}
objects_list_range_second.append(data)
first_ip+=5
last_ip+=5
data_for_batch_first = {
"objects" : [ {
"type" : "host",
"list" : objects_list_ip_first
}, {
"type" : "address-range",
"list" : objects_list_range_first
}]
}
data_for_batch_second = {
"objects" : [ {
"type" : "host",
"list" : objects_list_ip_second
}, {
"type" : "address-range",
"list" : objects_list_range_second
}]
}
with APIClient(client_args) as client:
login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
set_objects_batch_first = client.api_call("set-objects-batch", data_for_batch_first)
set_objects_batch_second = client.api_call("set-objects-batch", data_for_batch_second)
publish = client.api_call("publish")
Удалить множество объектов в одном API вызове можно с помощью delete-objects-batch. А теперь посмотрим на пример кода, который удаляет все хосты, созданные ранее через add-objects-batch.
from cpapi import APIClient, APIClientArgs
client_args = APIClientArgs(server="192.168.47.240")
objects_list_ip = []
objects_list_range = []
for ip in range(5,255,5):
data = {"name": f'h_192.168.0.{ip}'}
objects_list_ip.append(data)
first_ip = 1
last_ip = 4
while last_ip < 255:
data = {"name": f"r_192.168.0.{first_ip}-{last_ip}"}
objects_list_range.append(data)
first_ip+=5
last_ip+=5
data_for_batch = {
"objects" : [ {
"type" : "host",
"list" : objects_list_ip
}, {
"type" : "address-range",
"list" : objects_list_range
}]
}
with APIClient(client_args) as client:
login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
delete_objects_batch = client.api_call("delete-objects-batch", data_for_batch)
publish = client.api_call("publish")
print(delete_objects_batch.data)
Все функции, которые появляются в новых релизах ПО Check Point, сразу же обретают и API вызовы. Так, в R80.40 появились такие "фичи" как Revert to revision и Smart Task, и для них сразу же подготовили соответствующие API вызовы. Более того, весь функционал при переходе из Legacy консолей в режим Unified Policy также обрастает поддержкой API. Например, долгожданным обновлением в версии ПО R80.40 стал переезд политики HTTPS Inspection из Legacy режима в режим Unified Policy, и данный функционал сразу же получил API вызовы. Вот пример кода, который добавляет на верхнюю позицию HTTPS Inspection policy правило, которое исключает из инспекции 3 категории(Здоровье, Финансы, Государственные услуги), которые запрещено инспектировать в соответствии с законадательством в ряде стран.
Запуск Python скриптов на сервере управления Check Point
Всё в том же README.md содержится информация как запускать скрипты на Python прямо с сервера управления. Это может быть удобно, когда у вас нет возможности подключиться к API серверу с другой машины. Я записал шестиминутное видео в котором рассматриваю установку модуля cpapi и особенности запуска Python скриптов на сервере управления. В качестве примера запускается скрипт, который автоматизирует конфигурацию нового шлюза для такой задачи, как аудит сети Security CheckUp. Из особенностей, с которыми пришлось столкнуться: в версии Python 2.7 ещё не появилась функция input, поэтому для обработки информации, которую вводит пользователь, используется функция raw_input. В остальном, код такой же как для запуска с других машин, только удобнее использовать функцию login_as_root, дабы не указывать свои же собственные username, пароль и IP адрес сервера управления ещё раз.
from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs
def main():
with APIClient() as client:
# if client.check_fingerprint() is False:
# print("Could not get the server's fingerprint - Check connectivity with the server.")
# exit(1)
login_res = client.login_as_root()
if login_res.success is False:
print("Login failed:\n{}".format(login_res.error_message))
exit(1)
gw_name = raw_input("Enter the gateway name:")
gw_ip = raw_input("Enter the gateway IP address:")
if sys.stdin.isatty():
sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
else:
print("Attention! Your password will be shown on the screen!")
sic = raw_input("Enter one-time password for the gateway(SIC): ")
version = raw_input("Enter the gateway version(like RXX.YY):")
add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
if add_gw.success and add_gw.data['sic-state'] != "communicating":
print("Secure connection with the gateway hasn't established!")
exit(1)
elif add_gw.success:
print("The gateway was added successfully.")
gw_uid = add_gw.data['uid']
gw_name = add_gw.data['name']
else:
print("Failed to add the gateway - {}".format(add_gw.error_message))
exit(1)
change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
if change_policy.success:
print("The policy has been changed successfully")
else:
print("Failed to change the policy- {}".format(change_policy.error_message))
change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
if change_rule.success:
print("The cleanup rule has been changed successfully")
else:
print("Failed to change the cleanup rule- {}".format(change_rule.error_message))
# publish the result
publish_res = client.api_call("publish", {})
if publish_res.success:
print("The changes were published successfully.")
else:
print("Failed to publish the changes - {}".format(install_tp_policy.error_message))
install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true', "threat-prevention" : 'false', "targets" : gw_uid})
if install_access_policy.success:
print("The access policy has been installed")
else:
print("Failed to install access policy - {}".format(install_tp_policy.error_message))
install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false', "threat-prevention" : 'true', "targets" : gw_uid})
if install_tp_policy.success:
print("The threat prevention policy has been installed")
else:
print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
# add passwords and passphrases to dictionary
with open('additional_pass.conf') as f:
line_num = 0
for line in f:
line_num += 1
add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf \"{}\" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
if add_password_dictionary.success:
print("The password dictionary line {} was added successfully".format(line_num))
else:
print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))
main()
Пример файла со словарём паролей additional_pass.conf
Данная статья рассматривает лишь основные возможности работы Python SDK и модуля cpapi(как вы могли догадаться, это фактически синонимы), и изучив код в данном модуле вы откроете для себя ещё больше возможностей в работе с ним.
Приятного кодинга и спасибо, что дочитали до конца!