Create a Post
cancel
Showing results for 
Search instead for 
Did you mean: 
Neil_Deane
Participant
Jump to solution

SmartMove usage assistance

Hi,
looking for some assistance with running the SmartMove process.

So far I've completed the following
- Migrated a policy with SmartMove, created a smartconnect_policy.tar.gz file
- Copied smartconnect_.tar.gz to management server temp dir and extracted to get these files cp_objects.json, cp_objects_opt.json, smartconnector.py
- SmartMove documentation indicated to add SDK library from github.com/CheckPointSW/cp_mgmt_api_python_sdk
- added the library and set Python env variable according to Github notes

Now when I run smartconnector script I get the following:

[Expert@hostname:0]# ./smartconnector.py
: No such file or directory


I'm stuck here ... any ideas please ?

Thanks,
Neil

0 Kudos
1 Solution

Accepted Solutions
Ofir_Shikolski
Employee
Employee

You can contact me offline 🙂 ofirs@checkpoint.com 

View solution in original post

0 Kudos
12 Replies
masher
Employee
Employee

Does it run if you try it with #python3 ./smartconnecter.py ?

Neil_Deane
Participant

Thanks Masher.

Same / similar error unfortunately ...


[Expert@hostname:0]# python3 ./smartconnecter.py
python3: can't open file './smartconnecter.py': [Errno 2] No such file or directory

 

Although your suggestion made me think to check the file permissions for migrate script, json files and SDK assets. All look ok to me, I added execute bits to SDK scripts just in case but makes no difference.

0 Kudos
masher
Employee
Employee

Which version of SmartMove did you use ?  Did you build it yourself or use the revision from download center?

0 Kudos
Neil_Deane
Participant

I took the latest Windows SmartMove build from the download centre a few days ago.

v9.40.8333.17361

0 Kudos
the_rock
Legend
Legend

Just wondering, from what vendor? As an idea, can you run below from expert mode and see what it gives you?

find / -name smartconnector.py

0 Kudos
Neil_Deane
Participant

Thanks Rock.

Its a Fortigate migration.

 

[Expert@hostname:0]# find / -name smartconnector.py
/home/admin/mig/smartconnector.py
find: /proc/6852/task/16074: No such file or directory
find: /proc/6852/task/1396: No such file or directory
/var/tmp/migrate/smartconnector.py
[Expert@hostname:0]#

 

I copied the SmartMove output to /home first, then copied to /var/tmp. I wanted to keep the SDK in that path as well.

I'm running the smartconnector script from within the local directory.

 

0 Kudos
the_rock
Legend
Legend

Hm, thats odd. From my experience, I always found smart move works best with CP if you are converting from Cisco. IN all honesty, I find same for any other vendor...if you are moving from Cisco, converter tools never have any issues. Sadly, other vendors are different story. Anyway, thats totally different topic : - )

Now, on to your actual issue...so, put aside technical aspect, thinking of this just purely from logical side, it clearly shows file is there when you do find command, so to me personally, IF you are in right directory when you run a script (which I will take your word for it that you indeed are), then it makes no sense why it would say "no such file or directory". Are you able to attach that file here? I would like to take a look at it.

0 Kudos
Neil_Deane
Participant

Think its maybe a bug in the Python script. I see its a canned file, nothing customer related in it so I'll post. I see its a fairly new script (25th Oct) version .

I'm looking to install strace package on my Gaia box to try debug it - good luck if you spot something.

 

Here's the raw script smartconnector.py, it wouldnt attach so just pasted it in

 

#!/usr/bin/env python

import sys
import argparse
import json
import os
import re
import operator
import uuid

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '.')))
from cpapi import APIClient, APIClientArgs


# printing messages to console and log file
# res_action - response from server, used if response is not OK
# message - message to inform user
# error - message with mark to inform user about issue
# ---
# returns: nothing
def printStatus(res_action, message, error=None):
line = ""
if res_action is not None and res_action.success is False:
if 'errors' in res_action.data:
for msg_err in res_action.data['errors']:
line += "WARN:" + "\t" + msg_err['message'] + "\n"
if 'warnings' in res_action.data:
for msg_wrn in res_action.data['warnings']:
line += "WARN:" + "\t" + msg_wrn['message'] + "\n"
if line == "":
if "message" in res_action.data:
line = "WARN:" + "\t" + res_action.data['message'] + "\n"
else:
line = "WARN:" + "\t" + "Err of getting message from the mgmt server" + "\n"
elif message is not None:
line += "\t" + message + "\n"
elif error is not None:
line += "WARN:" + "\t" + error + "\n"
if line != "":
print(line.rstrip())
file_log.write(line)
file_log.flush()


# printing info message "process..." with delimeters
# objectsType - string of objects type
# ---
# returns: nothing
def printMessageProcessObjects(objectsType):
printStatus(None, "==========")
printStatus(None, "process " + objectsType + " ...")
printStatus(None, "")


# publishing to database new updates by condition; increasing counter by 1
# counter - is number of new updates. if it equals threshold then updates will be published
# isForced - publishing to database anyway
# ---
# returns: updated counter
def publishUpdate(counter, isForced):
if counter < 0:
counter = 0
counter += 1
if isForced or counter >= args.threshold:
if not isForced:
printStatus(None, "")
printStatus(None, "----------")
printStatus(None, "publishing to database...")
res_publish = client.api_call("publish", {})
if res_publish.success:
counter = 0
printStatus(res_publish, "publish is completed")
printStatus(None, "----------")
if isForced:
printStatus(None, "")
return counter


# check if response contains message that name of "new" object exists in database
# res_add_obj - response from server
# ---
# returns: True - if the name is duplicated, False - otherwise
def isNameDuplicated(res_add_obj):
isNameDuplicated = False
if 'errors' in res_add_obj.data:
for msg in res_add_obj.data['errors']:
if msg['message'].startswith("More than one object named") and msg['message'].endswith("exists."):
isNameDuplicated = True
return isNameDuplicated


# check if response contains message that IP of "new" object exists in database
# res_add_obj - response from server
# ---
# returns: True - if the IP is duplicated, False - otherwise
def isIpDuplicated(res_add_obj):
isIpDuplicated = False
if 'warnings' in res_add_obj.data:
messagePrefixes = ("Multiple objects have the same IP address",)
messagePrefixes += ("More than one network have the same IP",)
messagePrefixes += ("More than one network has the same IP",)
messagePrefixes += ("More than one object have the same IPv6",)
messagePrefixes += ("More than one object has the same IPv6",)
for msg in res_add_obj.data['warnings']:
if msg['message'].startswith(messagePrefixes):
isIpDuplicated = True
return isIpDuplicated


# check if object from server comes from "global" domain
# serverObject - JSON presentation of object
# ---
# returns: True - if object comes from "global" domain, False - otherwise
def isServerObjectGlobal(serverObject):
return serverObject['domain']['domain-type'] == "global domain"


# check if object from server comes from "local" domain
# serverObject - JSON presentation of object
# ---
# returns: True - if object comes from "local" domain, False - otherwise
def isServerObjectLocal(serverObject):
return serverObject['domain']['domain-type'] == "domain"


# adding "new" object to server
# adjusting the name if object with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# apiCommand - short string which indicates what should be done
# payload - JSON representation of "new" object
# userObjectNamePostfix - postfix as number
# changeName=True - True: to try to add object and adjust the name; False: to try to add object and NOT adjust the name
# ---
# returns: added object from server in JSON format, None - otherwise
def addUserObjectToServer(client, apiCommand, payload, userObjectNamePostfix=1, changeName=True):
isObjectAdded = False
userObjectNameInitial = ""
if changeName:
userObjectNameInitial = payload['name']
addedObject = None
while not isObjectAdded:
res_add_obj = client.api_call(apiCommand, payload)
printStatus(res_add_obj, None)
if res_add_obj.success is False:
if not changeName:
addedObject = None
break
if isNameDuplicated(res_add_obj):
if (apiCommand == 'add-time' or apiCommand == 'add-time-group') and (len(str(userObjectNamePostfix)) + len(userObjectNameInitial) + 1) > 11:
#if we have time object need to fill name with condition 11 symbols as max length
payload['name'] = userObjectNameInitial[:-(len(str(userObjectNamePostfix))+1)] + '_' + str(userObjectNamePostfix)
userObjectNamePostfix += 1
elif args.reuse_group_name.lower() == "true" and \
(apiCommand == 'add-group'
or apiCommand == 'add-service-group'
or apiCommand == 'add-time-group'
or apiCommand == 'add-group-with-exclusion'
or apiCommand == 'add-application-site-group'):
# In the case of duplicate names and the user uses the 'reuse-group-name' flag,
# the smartconnector will not create a new name, it will add the data to the existing name
addedObject = res_add_obj.data
isObjectAdded = True
addedObject["name"] = payload['name']
else:
payload['name'] = userObjectNameInitial + '_' + str(userObjectNamePostfix)
userObjectNamePostfix += 1
else:
break
else:
addedObject = res_add_obj.data
isObjectAdded = True

return addedObject


# adding to server the object which contains fields with IP: hosts, networks
# adjusting the name if object with the name exists at server: <initial_object_name>_<postfix>
# using the object from server side if object exits with the same IP at server
# client - client object
# payload - JSON representation of "new" object
# userObjectType - the type of object: host or network
# userObjectIp - IP which will be used as filter in request to server
# mergedObjectsNamesMap - the map which contains name of user's object (key) and name of resulting object (value)
# ---
# returns: updated mergedObjectsNamesMap
def addCpObjectWithIpToServer(client, payload, userObjectType, userObjectIp, mergedObjectsNamesMap):
printStatus(None, "processing " + userObjectType + ": " + payload['name'])
userObjectNameInitial = payload['name']
userObjectNamePostfix = 1
isFinished = False
isIgnoreWarnings = False
while not isFinished:
payload["ignore-warnings"] = isIgnoreWarnings
# payload["--user-agent"] = "mgmt_cli_smartmove";
res_add_obj_with_ip = client.api_call("add-" + userObjectType, payload)
print("add-" + userObjectType, payload)
printStatus(res_add_obj_with_ip, "REPORT: " + userObjectNameInitial + " is added as " + payload['name'])
if res_add_obj_with_ip.success is False:
if isIpDuplicated(res_add_obj_with_ip) and not isIgnoreWarnings:
res_get_obj_with_ip = client.api_query("show-objects", payload={"filter": userObjectIp, "ip-only": False,
"type": userObjectType})
printStatus(res_get_obj_with_ip, None)
if res_get_obj_with_ip.success is True:
if len(res_get_obj_with_ip.data) > 0:
if userObjectType == "network" and next((x for x in res_get_obj_with_ip.data if x['subnet4' if is_valid_ipv4(payload['subnet']) else 'subnet6'] == payload['subnet'] and (x['subnet-mask'] == payload['subnet-mask']) if is_valid_ipv4(payload['subnet'])), None) is None:
isIgnoreWarnings = True
else:
if userObjectType == "host":
mergedObjectsNamesMap[userObjectNameInitial] = res_get_obj_with_ip.data[0]['name']
printStatus(None, "REPORT: " + "CP object " + mergedObjectsNamesMap[
userObjectNameInitial] + " is used instead of " + userObjectNameInitial)
isFinished = True
break
for serverObject in res_get_obj_with_ip.data:
# if more then one network in res_get_obj_with_ip, map to the one that matches subnet
mergedObjectsNamesMap[userObjectNameInitial] = next(x['name'] for x in res_get_obj_with_ip.data if x['subnet4' if is_valid_ipv4(payload['subnet']) else 'subnet6'] == payload['subnet'] and (x['subnet-mask'] == payload['subnet-mask']) if is_valid_ipv4(payload['subnet']))
if (isServerObjectLocal(serverObject) and not isReplaceFromGlobalFirst) or (
isServerObjectGlobal(serverObject) and isReplaceFromGlobalFirst):
break
printStatus(None, "REPORT: " + "CP object " + mergedObjectsNamesMap[
userObjectNameInitial] + " is used instead of " + userObjectNameInitial)
isFinished = True
else:
isIgnoreWarnings = True
else:
isFinished = True
elif isNameDuplicated(res_add_obj_with_ip):
payload['name'] = userObjectNameInitial + '_' + str(userObjectNamePostfix)
userObjectNamePostfix += 1
else:
isFinished = True
else:
mergedObjectsNamesMap[userObjectNameInitial] = payload['name']
isFinished = True
return mergedObjectsNamesMap


# processing and adding to server the groups which contains list of members
# adjusting the name if group with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# apiCommand - short string which indicates what should be done
# userGroup - group which will be processed and added to server
# mergedObjectsMap - map of objects which will be used for replacing
# mergedGroupsNamesMap - the map which contains name of user's object (key) and name of resulting object (value)
# ---
# returns: updated mergedGroupsNamesMap
def processGroupWithMembers(client, apiCommand, userGroup, mergedObjectsMap, mergedGroupsNamesMap, isNeedSplitted):
apiSetCommand = "set-group"
addedGroup = None
if "time" in apiCommand:
apiSetCommand = "set-time-group"
elif "service" in apiCommand:
apiSetCommand = "set-service-group"

if isNeedSplitted:
if ("Members" in userGroup): #group with list of members
for i, userGroupMember in enumerate(userGroup['Members']):
print(userGroupMember)
if userGroupMember in mergedObjectsMap:
userGroupMember = mergedObjectsMap[userGroupMember]
res_add_obj = client.api_call(
apiSetCommand,
{
"name": userGroup['Name'],
"members": {"add": userGroupMember}
})
printStatus(None, "REPORT: " + userGroup['Name'] + " is set with new member " + str(userGroupMember))
else: #group with exclusion with include/exclude fields with groups name
printStatus(None, "WARN: " + userGroup['Name'] + " hasn't any member by the type GroupWithExlusions")
else:
addedGroup = addUserObjectToServer(
client,
apiCommand,
{
"name": userGroup['Name'],
"comments": userGroup['Comments'],
"tags": userGroup['Tags']
}
)
return addedGroup


# processing and adding to server the CheckPoint Domains
# adjusting the name if domain with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userDomains - the list of domains which will be processed and added to server
# ---
# returns: mergedDomainsNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processDomains(client, userDomains):
printMessageProcessObjects("domains...")
publishCounter = 0
mergedDomainsNamesMap = {}
if len(userDomains) == 0:
return mergedDomainsNamesMap
for userDomain in userDomains:
userDomainNameInitial = userDomain['Name']
printStatus(None, "processing domain: " + userDomain['Name'])
addedDomain = addUserObjectToServer(
client,
"add-dns-domain",
{
"name": userDomain['Name'],
"is-sub-domain": userDomain['IsSubDomain'],
"comments": userDomain['Comments'],
"tags": userDomain['Tags']
},
changeName = False
)
if addedDomain is not None:
mergedDomainsNamesMap[userDomainNameInitial] = addedDomain['name']
printStatus(None, "REPORT: " + userDomainNameInitial + " is added as " + addedDomain['name'])
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: " + userDomainNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedDomainsNamesMap


# processing and adding to server the CheckPoint Hosts
# adjusting the name if host with the name exists at server: <initial_object_name>_<postfix>
# if host contains existing IP address then Host object from server will be used instead
# client - client object
# userHosts - the list of hosts which will be processed and added to server
# ---
# returns: mergedHostsNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processHosts(client, userHosts):
printMessageProcessObjects("hosts")
publishCounter = 0
mergedHostsNamesMap = {}
if len(userHosts) == 0:
return mergedHostsNamesMap
for userHost in userHosts:
payload = {
"name": userHost['Name'],
"ip-address": userHost['IpAddress'],
"comments": userHost['Comments'],
"tags": userHost['Tags']
}
initialMapLength = len(mergedHostsNamesMap)
mergedHostsNamesMap = addCpObjectWithIpToServer(client, payload, "host", userHost['IpAddress'],
mergedHostsNamesMap)
if initialMapLength == len(mergedHostsNamesMap):
printStatus(None, "REPORT: " + userHost['Name'] + ' is not added.')
else:
publishCounter = publishUpdate(publishCounter, False)
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedHostsNamesMap


def is_valid_ipv4(ip):
pattern = re.compile(r"""
^
(?:
# Dotted variants:
(?:
# Decimal 1-255 (no leading 0's)
[3-9]\d?|2(?:5[0-5]|[0-4]?\d)?|1\d{0,2}
|
0x0*[0-9a-f]{1,2} # Hexadecimal 0x0 - 0xFF (possible leading 0's)
|
0+[1-3]?[0-7]{0,2} # Octal 0 - 0377 (possible leading 0's)
)
(?: # Repeat 0-3 times, separated by a dot
\.
(?:
[3-9]\d?|2(?:5[0-5]|[0-4]?\d)?|1\d{0,2}
|
0x0*[0-9a-f]{1,2}
|
0+[1-3]?[0-7]{0,2}
)
){0,3}
|
0x0*[0-9a-f]{1,8} # Hexadecimal notation, 0x0 - 0xffffffff
|
0+[0-3]?[0-7]{0,10} # Octal notation, 0 - 037777777777
|
# Decimal notation, 1-4294967295:
429496729[0-5]|42949672[0-8]\d|4294967[01]\d\d|429496[0-6]\d{3}|
42949[0-5]\d{4}|4294[0-8]\d{5}|429[0-3]\d{6}|42[0-8]\d{7}|
4[01]\d{8}|[1-3]\d{0,9}|[4-9]\d{0,8}
)
$
""", re.VERBOSE | re.IGNORECASE)
return pattern.match(ip) is not None


def is_valid_ipv6(ip):
pattern = re.compile(r"""
^
\s* # Leading whitespace
(?!.*::.*::) # Only a single whildcard allowed
(?:(?!:)|:(?=:)) # Colon iff it would be part of a wildcard
(?: # Repeat 6 times:
[0-9a-f]{0,4} # A group of at most four hexadecimal digits
(?:(?<=::)|(?<!::):) # Colon unless preceeded by wildcard
){6} #
(?: # Either
[0-9a-f]{0,4} # Another group
(?:(?<=::)|(?<!::):) # Colon unless preceeded by wildcard
[0-9a-f]{0,4} # Last group
(?: (?<=::) # Colon iff preceeded by exacly one colon
| (?<!:) #
| (?<=:) (?<!::) : #
) # OR
| # A v4 address with NO leading zeros
(?:25[0-4]|2[0-4]\d|1\d\d|[1-9]?\d)
(?: \.
(?:25[0-4]|2[0-4]\d|1\d\d|[1-9]?\d)
){3}
)
\s* # Trailing whitespace
$
""", re.VERBOSE | re.IGNORECASE | re.DOTALL)
return pattern.match(ip) is not None


# processing and adding to server the CheckPoint Networks
# adjusting the name if network with the name exists at server: <initial_object_name>_<postfix>
# if network contains existing IP subnet then Network object from server will be used instead
# client - client object
# userNetworks - the list of networks which will be processed and added to server
# ---
# returns: mergedNetworksNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processNetworks(client, userNetworks):
printMessageProcessObjects("networks")
publishCounter = 0
mergedNetworksNamesMap = {}
userNetworks = sorted(userNetworks, key=lambda K: ('' if K['Netmask'] is None else K['Netmask'], '' if K['MaskLength'] is None else K['MaskLength']), reverse=True)
if len(userNetworks) == 0:
return mergedNetworksNamesMap
for userNetwork in userNetworks:
payload = {
"name": userNetwork['Name'],
"comments": userNetwork['Comments'],
"tags": userNetwork['Tags'],
"subnet": userNetwork['Subnet']
}
if is_valid_ipv4(userNetwork['Subnet']):
payload["subnet-mask"] = userNetwork['Netmask']
else:
payload["mask-length6"] = userNetwork['MaskLength']
initialMapLength = len(mergedNetworksNamesMap)
mergedNetworksNamesMap = addCpObjectWithIpToServer(client, payload, "network", userNetwork['Subnet'],
mergedNetworksNamesMap)
if initialMapLength == len(mergedNetworksNamesMap):
printStatus(None, "REPORT: " + userNetwork['Name'] + ' is not added.')
else:
publishCounter = publishUpdate(publishCounter, False)
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedNetworksNamesMap


# processing and adding to server the CheckPoint Ranges
# adjusting the name if range with the name exists at server: <initial_object_name>_<postfix>
# if range contains existing IP start and end then Range object from server will be used instead
# client - client object
# userRanges - the list of ranges which will be processed and added to server
# ---
# returns: mergedRangesNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processRanges(client, userRanges):
printMessageProcessObjects("ranges")
publishCounter = 0
mergedRangesNamesMap = {}
if len(userRanges) == 0:
return mergedRangesNamesMap
serverRangesMap = {}
serverRangesMapGlobal = {}
serverRangesMapLocal = {}
printStatus(None, "reading address ranges from server")
res_get_ranges = client.api_query("show-address-ranges")
printStatus(res_get_ranges, None)
for serverRange in res_get_ranges.data:
key = ''
if 'ipv4-address-first' in serverRange:
key = serverRange['ipv4-address-first'] + '_' + serverRange['ipv4-address-last']
else:
key = serverRange['ipv6-address-first'] + '_' + serverRange['ipv6-address-last']

if isServerObjectGlobal(serverRange) and key not in serverRangesMapGlobal:
serverRangesMapGlobal[key] = serverRange['name']
elif isServerObjectLocal(serverRange) and key not in serverRangesMapLocal:
serverRangesMapLocal[key] = serverRange['name']
elif key not in serverRangesMapGlobal and key not in serverRangesMapLocal and key not in serverRangesMap:
serverRangesMap[key] = serverRange['name']

printStatus(None, "")
if sys.version_info >= (3, 0):
serverRangesMap = serverRangesMap.copy()
if isReplaceFromGlobalFirst:
serverRangesMap.update(serverRangesMapLocal)
serverRangesMap.update(serverRangesMapGlobal)
else:
serverRangesMap.update(serverRangesMapGlobal)
serverRangesMap.update(serverRangesMapLocal)
else:
if isReplaceFromGlobalFirst:
serverRangesMap = dict(
serverRangesMap.items() + serverRangesMapLocal.items() + serverRangesMapGlobal.items())
else:
serverRangesMap = dict(
serverRangesMap.items() + serverRangesMapGlobal.items() + serverRangesMapLocal.items())
for userRange in userRanges:
printStatus(None, "processing range: " + userRange['Name'])
userRangeNameInitial = userRange['Name']
rngFrom = '' if userRange['RangeFrom'] is None else userRange['RangeFrom']
rngTo = '' if userRange['RangeTo'] is None else userRange['RangeTo']
key = rngFrom + '_' + rngTo
if key in serverRangesMap:
printStatus(None, None,
"More than one range has the same ip: '" + userRange['RangeFrom'] + "' and '" + userRange[
'RangeTo'] + "'")
mergedRangesNamesMap[userRangeNameInitial] = serverRangesMap[key]
printStatus(None, "REPORT: " + "CP object " + mergedRangesNamesMap[
userRangeNameInitial] + " is used instead of " + userRangeNameInitial)
else:
userRangeNamePostfix = 1
if userRange['Name'] in serverRangesMap.values():
printStatus(None, None, "More than one object named '" + userRange['Name'] + "' exists.")
while userRange['Name'] in serverRangesMap.values():
userRange['Name'] = userRangeNameInitial + '_' + str(userRangeNamePostfix)
userRangeNamePostfix += 1
payload = {
"name": userRange['Name'],
"ip-address-first": userRange['RangeFrom'],
"ip-address-last": userRange['RangeTo'],
"comments": userRange['Comments'],
"tags": userRange['Tags'],
"ignore-warnings": True
}
addedRange = addUserObjectToServer(client, "add-address-range", payload, userRangeNamePostfix)
if addedRange is not None:
mergedRangesNamesMap[userRangeNameInitial] = addedRange['name']
key = ''
if 'ipv4-address-first' in addedRange:
key = addedRange['ipv4-address-first'] + '_' + addedRange['ipv4-address-last']
else:
key = addedRange['ipv6-address-first'] + '_' + addedRange['ipv6-address-last']
serverRangesMap[key] = addedRange['name']
printStatus(None, "REPORT: " + userRangeNameInitial + " is added as " + addedRange['name'])
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: " + userRangeNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedRangesNamesMap


# processing and adding to server the CheckPoint Network Groups
# adjusting the name if network group with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userNetworkGroups - the list of network groups which will be processed and added to server
# mergedNetworkObjectsMap - map of network objects which will be used for replacing
# ---
# returns: mergedGroupsNamesDict dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processNetGroups(client, userNetworkGroups, mergedNetworkObjectsMap):
printMessageProcessObjects("network groups")
publishCounter = 0
mergedGroupsNamesDict = {}
if len(userNetworkGroups) == 0:
return mergedGroupsNamesDict
for userNetworkGroup in userNetworkGroups:
userNetworkGroupNameInitial = userNetworkGroup['Name']
addedNetworkGroup = None
if userNetworkGroup['TypeName'] == 'CheckPoint_GroupWithExclusion':
printStatus(None, "processing network group with exclusion: " + userNetworkGroup['Name'])
if userNetworkGroup['Include'] in mergedGroupsNamesDict:
userNetworkGroup['Include'] = mergedGroupsNamesDict[userNetworkGroup['Include']]
if userNetworkGroup['Except'] in mergedGroupsNamesDict:
userNetworkGroup['Except'] = mergedGroupsNamesDict[userNetworkGroup['Except']]
addedNetworkGroup = addUserObjectToServer(
client,
"add-group-with-exclusion",
{
"name": userNetworkGroup['Name'],
"include": userNetworkGroup['Include'],
"except": userNetworkGroup['Except'],
"comments": userNetworkGroup['Comments'],
"tags": userNetworkGroup['Tags']
}
)
else:
printStatus(None, "processing network group: " + userNetworkGroup['Name'])
addedNetworkGroup = processGroupWithMembers(client, "add-group", userNetworkGroup, mergedNetworkObjectsMap,
mergedGroupsNamesDict, False)
if addedNetworkGroup is not None:
mergedGroupsNamesDict[userNetworkGroupNameInitial] = addedNetworkGroup['name']
if 'errors' in addedNetworkGroup:
if 'More than one object' in addedNetworkGroup['errors'][0]['message']:
printStatus(None, "REPORT: Using the existing object '{}'".format(addedNetworkGroup['name']))
else:
printStatus(None, "REPORT: " + userNetworkGroupNameInitial + " is added as " + addedNetworkGroup['name'])
publishCounter = publishUpdate(publishCounter, True)
userNetworkGroup["Name"] = addedNetworkGroup['name']
if userNetworkGroup['TypeName'] != 'CheckPoint_GroupWithExclusion':
processGroupWithMembers(client, "add-group", userNetworkGroup, mergedNetworkObjectsMap,
mergedGroupsNamesDict, True)
else:
printStatus(None, "REPORT: " + userNetworkGroupNameInitial + " is not added.")
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedGroupsNamesDict


# processing and adding to server the CheckPoint Simple Gateways
# adjusting the name if simple gateway with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userSimpleGateways - the list of simple gateways which will be processed and added to server
# ---
# returns: mergedSimpleGatewaysNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processSimpleGateways(client, userSimpleGateways):
printMessageProcessObjects("simple gateways")
publishCounter = 0
mergedSimpleGatewaysNamesMap = {}
if len(userSimpleGateways) == 0:
return mergedSimpleGatewaysNamesMap
for userSimpleGateway in userSimpleGateways:
printStatus(None, "processing simple gateway: " + userSimpleGateway['Name'])
userSimpleGatewayNameInitial = userSimpleGateway['Name']
addedSimpleGateway = addUserObjectToServer(
client,
"add-simple-gateway",
{
"name": userSimpleGateway['Name'],
"ip-address": userSimpleGateway['IpAddress'],
"comments": userSimpleGateway['Comments'],
"tags": userSimpleGateway['Tags']
}
)
if addedSimpleGateway is not None:
mergedSimpleGatewaysNamesMap[userSimpleGatewayNameInitial] = addedSimpleGateway['name']
printStatus(None, "REPORT: " + userSimpleGatewayNameInitial + " is added as " + addedSimpleGateway['name'])
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: " + userSimpleGatewayNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedSimpleGatewaysNamesMap


# processing and adding to server the CheckPoint Zones
# adjusting the name if zone with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userZones - the list of zones which will be processed and added to server
# ---
# returns: mergedZonesNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processZones(client, userZones):
printMessageProcessObjects("zones")
publishCounter = 0
mergedZonesNamesMap = {}
if len(userZones) == 0:
return mergedZonesNamesMap
for userZone in userZones:
printStatus(None, "processing zone: " + userZone['Name'])
userZoneNameInitial = userZone['Name']
addedZone = addUserObjectToServer(
client,
"add-security-zone",
{
"name": userZone['Name'],
"comments": userZone['Comments'],
"tags": userZone['Tags']
}
)
if addedZone is not None:
mergedZonesNamesMap[userZoneNameInitial] = addedZone['name']
printStatus(None, "REPORT: " + userZoneNameInitial + " is added as " + addedZone['name'])
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: " + userZoneNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedZonesNamesMap


# generate and provide key for Services dictionary
# serverService - service in JSON format
# ---
# returns: string as key
def provideServerServiceKey(serverService):
key = ""
if 'port' in serverService: # key for TCP or UDP or SCTP
key = serverService['port']
elif 'icmp-type' in serverService: # key for ICMP
key = str(serverService['icmp-type'])
if 'icmp-code' in serverService and serverService['icmp-code'] != 'null':
key += "_" + str(serverService['icmp-code'])
elif 'ip-protocol' in serverService: # key for Other
key = serverService['ip-protocol']
return key


# processing and adding to server the CheckPoint Services (TCP, UDP, SCTP, ICMP or Other)
# adjusting the name if service with the name exists at server: <initial_object_name>_<postfix>
# if service contains existing port then Service object from server will be used instead
# client - client object
# userServices - the list of services which will be processed and added to server
# userServiceType - the type of service which should be processed
# ---
# returns: mergedServicesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processServices(client, userServices, userServiceType):
printMessageProcessObjects(userServiceType + " services")
publishCounter = 0
mergedServicesMap = {}
serverServicesMap = {}
serverServicesMapGlobal = {}
serverServicesMapLocal = {}
printStatus(None, "reading " + userServiceType + " services from server")
res_get_services = client.api_query("show-services-" + userServiceType)
printStatus(res_get_services, None)
for serverService in res_get_services.data:
mergedServicesMap[serverService['name']] = serverService['uid']
key = provideServerServiceKey(serverService)
isServiceReplacing = False
if 'port' in serverService and ('protocol' not in serverService or serverService['protocol'] == 'null'):
isServiceReplacing = True
if isServerObjectGlobal(serverService) and (key not in serverServicesMapGlobal or isServiceReplacing):
serverServicesMapGlobal[key] = (serverService['name'], serverService['uid'])
elif isServerObjectLocal(serverService) and (key not in serverServicesMapLocal or isServiceReplacing):
serverServicesMapLocal[key] = (serverService['name'], serverService['uid'])
elif not isServerObjectGlobal(serverService) and not isServerObjectLocal(serverService) and (
key not in serverServicesMap or isServiceReplacing):
serverServicesMap[key] = (serverService['name'], serverService['uid'])
printStatus(None, "")
if sys.version_info >= (3, 0):
serverServicesMap = serverServicesMap.copy()
if isReplaceFromGlobalFirst:
serverServicesMap.update(serverServicesMapLocal)
serverServicesMap.update(serverServicesMapGlobal)
else:
serverServicesMap.update(serverServicesMapGlobal)
serverServicesMap.update(serverServicesMapLocal)
else:
if isReplaceFromGlobalFirst:
serverServicesMap = dict(
serverServicesMap.items() + serverServicesMapLocal.items() + serverServicesMapGlobal.items())
else:
serverServicesMap = dict(
serverServicesMap.items() + serverServicesMapGlobal.items() + serverServicesMapLocal.items())
if len(userServices) == 0:
return mergedServicesMap
for userService in userServices:
printStatus(None, "processing " + userServiceType + " service: " + userService['Name'])
userServiceNameInitial = userService['Name']
key = ""
duplicationValueMessagePostfix = ""
if 'Port' in userService:
key = userService['Port']
duplicationValueMessagePostfix = "port: " + userService['Port']
elif 'Type' in userService:
key = userService['Type']
duplicationValueMessagePostfix = "type: " + userService['Type']
if 'Code' in userService and userService['Code'] != 'null':
key += "_" + userService['Code']
duplicationValueMessagePostfix = "type / code: " + userService['Type'] + " / " + userService['Code']
elif 'IpProtocol' in userService:
key = userService['IpProtocol']
duplicationValueMessagePostfix = "ip-protocol: " + userService['IpProtocol']
if key in serverServicesMap:
printStatus(None, None,
"More than one " + userServiceType + " service has the same " + duplicationValueMessagePostfix)
mergedServicesMap[userServiceNameInitial] = serverServicesMap[key][1]
printStatus(None, "REPORT: " + "CP object " + serverServicesMap[key][
0] + " is used instead of " + userServiceNameInitial)
else:
userServiceNamePostfix = 1
serverServicesNames = [serverServiceNameUid[0] for serverServiceNameUid in serverServicesMap.values()]
if userService['Name'] in serverServicesNames:
printStatus(None, None, "More than one object named '" + userService['Name'] + "' exists.")
while userService['Name'] in serverServicesNames:
userService['Name'] = userServiceNameInitial + '_' + str(userServiceNamePostfix)
userServiceNamePostfix += 1
payload = {}
payload["name"] = userService['Name']
payload["comments"] = userService['Comments']
payload["tags"] = userService['Tags']
payload["ignore-warnings"] = True
if 'Port' in userService:
payload["port"] = userService['Port']
payload["source-port"] = userService['SourcePort']
payload["session-timeout"] = userService['SessionTimeout']
elif 'Type' in userService:
payload["icmp-type"] = userService['Type']
if 'Code' in userService and userService['Code'] != 'null':
payload["icmp-code"] = userService['Code']
elif 'IpProtocol' in userService:
payload["ip-protocol"] = userService['IpProtocol']
payload["match-for-any"] = True
addedService = addUserObjectToServer(client, "add-service-" + userServiceType, payload,
userServiceNamePostfix)
if addedService is not None:
mergedServicesMap[userServiceNameInitial] = addedService['uid']
key = provideServerServiceKey(addedService)
serverServicesMap[key] = (addedService['name'], addedService['uid'])
printStatus(None, "REPORT: " + userServiceNameInitial + " is added as " + addedService['name'])
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: " + userServiceNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedServicesMap


# processing and adding to server the CheckPoint Service Groups
# adjusting the name if service group with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userServicesGroups - the list of service groups which will be processed and added to server
# mergedServicesMap - map of service objects which will be used for replacing
# ---
# returns: mergedServicesGroupsNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processServicesGroups(client, userServicesGroups, mergedServicesMap):
printMessageProcessObjects("services groups")
publishCounter = 0
mergedServicesGroupsNamesMap = {}
if len(userServicesGroups) == 0:
return mergedServicesGroupsNamesMap
for userServicesGroup in userServicesGroups:
printStatus(None, "processing services group: " + userServicesGroup['Name'])
userServicesGroupNameInitial = userServicesGroup['Name']
addedServicesGroup = processGroupWithMembers(client, "add-service-group", userServicesGroup, mergedServicesMap,
mergedServicesGroupsNamesMap, False)
if addedServicesGroup is not None:
mergedServicesGroupsNamesMap[userServicesGroupNameInitial] = addedServicesGroup['name']
if 'errors' in addedServicesGroup:
if 'More than one object' in addedServicesGroup['errors'][0]['message']:
printStatus(None, "REPORT: Using the existing object '{}'".format(addedServicesGroup['name']))
else:
printStatus(None, "REPORT: " + userServicesGroupNameInitial + " is added as " + addedServicesGroup['name'])
publishCounter = publishUpdate(publishCounter, True)
userServicesGroup["Name"] = addedServicesGroup['name']
processGroupWithMembers(client, "add-service-group", userServicesGroup, mergedServicesMap,
mergedServicesGroupsNamesMap, True)
else:
printStatus(None, "REPORT: " + userServicesGroupNameInitial + " is not added.")
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedServicesGroupsNamesMap


# processing and adding to server the CheckPoint Time Groups
# adjusting the name if time group with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userTimesGroups - the list of time groups which will be processed and added to server
# mergedTimesNamesMap - the list of the time names
# ---
# returns: mergedTimesGroupsNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processTimesGroups(client, userTimesGroups, mergedTimesNamesMap):
printMessageProcessObjects("times groups")
publishCounter = 0
mergedTimesGroupsNamesMap = {}
if len(userTimesGroups) == 0:
return mergedTimesGroupsNamesMap
for userTimesGroup in userTimesGroups:
printStatus(None, "processing times group: " + userTimesGroup['Name'])
userTimesGroupNameInitial = userTimesGroup['Name']
addedTimesGroup = processGroupWithMembers(client, "add-time-group", userTimesGroup, mergedTimesNamesMap,
mergedTimesGroupsNamesMap, False)
if addedTimesGroup is not None:
mergedTimesGroupsNamesMap[userTimesGroupNameInitial] = addedTimesGroup['name']
if 'errors' in addedTimesGroup:
if 'More than one object' in addedTimesGroup['errors'][0]['message']:
printStatus(None, "REPORT: Using the existing object '{}'".format(addedTimesGroup['name']))
else:
printStatus(None, "REPORT: " + userTimesGroupNameInitial + " is added as " + addedTimesGroup['name'])
publishCounter = publishUpdate(publishCounter, True)
userTimesGroup["Name"] = addedTimesGroup['name']
processGroupWithMembers(client, "add-time-group", userTimesGroup, mergedTimesNamesMap,
mergedTimesGroupsNamesMap, True)
else:
printStatus(None, "REPORT: " + userTimesGroupNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedTimesGroupsNamesMap


# processing and adding to server the CheckPoint Time objects
# adjusting the name if time object with the name exists at server: <initial_object_name>_<postfix>
# client - client object
# userTimes - the list of time objects which will be processed and added to server
# ---
# returns: mergedTimesNamesMap dictionary
# the map contains name of user's object (key) and name of resulting object (value)
def processTimes(client, userTimes):
printMessageProcessObjects("times")
publishCounter = 0
mergedTimesNamesMap = {}
payload = {}
if len(userTimes) == 0:
return mergedTimesNamesMap
weekdays = {0: "Sun", 1: "Mon", 2: "Tue", 3: "Wed", 4: "Thu", 5: "Fri", 6: "Sat"}
for userTime in userTimes:
printStatus(None, "processing time: " + userTime['Name'])
userTimeNameInitial = userTime['Name']

payload["name"] = userTime['Name']
payload["comments"] = userTime['Comments']

payload["start-now"] = userTime['StartNow']
payload["start"] = {
"date": userTime['StartDate'],
"time": userTime['StartTime']
}

payload["end-never"] = userTime['EndNever']
payload["end"] = {
"date": userTime['EndDate'],
"time": userTime['EndTime']
}

payload["hours-ranges"] = [
{
"enabled": userTime['HoursRangesEnabled_1'],
"from": userTime['HoursRangesFrom_1'] if userTime['HoursRangesFrom_1'] is not None else "00:00",
"to": userTime['HoursRangesTo_1'] if userTime['HoursRangesTo_1'] is not None else "00:00",
"index": 1
},
{
"enabled": userTime['HoursRangesEnabled_2'],
"from": userTime['HoursRangesFrom_2'] if userTime['HoursRangesFrom_2'] is not None else "00:00",
"to": userTime['HoursRangesTo_2'] if userTime['HoursRangesTo_2'] is not None else "00:00",
"index": 2
},
{
"enabled": userTime['HoursRangesEnabled_3'],
"from": userTime['HoursRangesFrom_3'] if userTime['HoursRangesFrom_3'] is not None else "00:00",
"to": userTime['HoursRangesTo_3'] if userTime['HoursRangesTo_3'] is not None else "00:00",
"index": 3
}
]

daysNames = [] # list of weekdays names e.g. "Sun", "Mon"...
# weekdays are presented as [1,2,3.. ] in userTime['RecurrenceWeekdays']
for day in userTime['RecurrenceWeekdays']:
daysNames.append(weekdays[day])

payload["recurrence"] = {
"pattern": "Daily" if userTime['RecurrencePattern'] == 1 else (
"Weekly" if userTime['RecurrencePattern'] == 2 else (
"Monthly" if userTime['RecurrencePattern'] == 3 else None)),
"weekdays": daysNames
}

payload["tags"] = userTime['Tags']

addedTime = addUserObjectToServer(
client,
"add-time",
payload
)

if addedTime is not None:
mergedTimesNamesMap[userTimeNameInitial] = addedTime['name']
printStatus(None, "REPORT: " + userTimeNameInitial + " is added as " + addedTime['name'])
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: " + userTimeNameInitial + ' is not added.')
printStatus(None, "")
publishUpdate(publishCounter, True)
return mergedTimesNamesMap


# processing and adding to server the CheckPoint Access Rules
# the rules is added in back order: the last item of list goes first and the first item goes last
# client - client object
# userRules - the list of access rules which will be processed and added to server
# userLayerName - the name of layer where access rules will be added
# skipCleanUpRule - the flag which indicates to exclude "Clean up" rule from layer or not; "Clean up" rule is the last rule in the layer always
# mergedNetworkObjectsMap - map of all network objects (groups is included) which will be used for replacing
# mergedServiceObjectsMap - map of all services objects (groups is included) which will be used for replacing
# mergedTimesGroupsNamesMap - map of time groups objects which will be used for replacing
# ---
# returns: nothing
def addAccessRules(client, userRules, userLayerName, skipCleanUpRule, mergedNetworkObjectsMap, mergedServiceObjectsMap,
mergedTimesGroupsNamesMap, mergedTimesNamesMap):
if userRules is not None:
publishCounter = 0
printStatus(None, "processing access rules to " + userLayerName + " layer")
printStatus(None, "")
userRulesStartPosition = -2 if skipCleanUpRule else -1
# userRules[userRulesStartPosition::-1]:
# -1 - minus means to iterate backwards, 1 means step
# userRulesStartPosition - start point, length of list "- userRulesStartPosition" because reverse mode is specified
# end point is not specified - all elements
for i, userRule in enumerate(userRules[userRulesStartPosition::-1]):
printStatus(None, "processing access rule: #" + str(len(userRules) - i) + ", " + (
userRule['Name'] if userRule['Name'] is not None else ""))
# JSON access rules contain "action" as number
# "action" number points to the next list of values from SmartMove:
# 0 = Accept
# 1 = Drop
# 2 = Reject
# 3 = SubPolicy
actions = {0: "accept", 1: "drop", 2: "reject", 3: "apply layer"}
sources = []
for source in userRule['Source']:
sourceName = source['Name']
sourceName = mergedNetworkObjectsMap[
sourceName] if sourceName in mergedNetworkObjectsMap else sourceName
sources.append(sourceName)
destinations = []
for destination in userRule['Destination']:
destinationName = destination['Name']
destinationName = mergedNetworkObjectsMap[
destinationName] if destinationName in mergedNetworkObjectsMap else destinationName
destinations.append(destinationName)
services = []
for service in userRule['Service']:
serviceName = service['Name']
serviceName = mergedServiceObjectsMap[
serviceName] if serviceName in mergedServiceObjectsMap else serviceName
services.append(serviceName)
times = []
for time in userRule['Time']:
timeName = time['Name']

# support of time-ranges along with time-groups is added
if timeName in mergedTimesGroupsNamesMap:
timeName = mergedTimesGroupsNamesMap[timeName]
elif timeName in mergedTimesNamesMap:
timeName = mergedTimesNamesMap[timeName]
else:
timeName = timeName
# timeName = mergedTimesGroupsNamesMap[timeName] if timeName in mergedTimesGroupsNamesMap else timeName

times.append(timeName)
payload = {
"layer": userRule['Layer'],
"position": "top",
"name": userRule['Name'],
"action": actions[userRule['Action']],
"destination": destinations,
"destination-negate": userRule['DestinationNegated'],
"enabled": userRule['Enabled'],
"service": services,
"source": sources,
"source-negate": userRule['SourceNegated'],
"time": times,
"track": {"type": "None" if userRule['Track'] == 0 else "Log"},
"comments": userRule['Comments']
}
if userRule['Action'] == 3:
payload["inline-layer"] = userRule['SubPolicyName']
if userRule['ConversionComments'].strip() != "":
payload["custom-fields"] = {"field-1": userRule['ConversionComments']}
addedRule = addUserObjectToServer(client, "add-access-rule", payload, changeName=False)
if addedRule is not None:
printStatus(None, "REPORT: access rule is added")
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: access rule is not added")
printStatus(None, "")
publishUpdate(publishCounter, True)


# processing and adding to server the CheckPoint Package with Layers and Access Rules
# client - client object
# userPackage - the package which contains layers and access rules
# mergedNetworkObjectsMap - map of all network objects (groups is included) which will be used for replacing
# mergedServiceObjectsMap - map of all services objects (groups is included) which will be used for replacing
# mergedTimesGroupsNamesMap - map of time groups objects which will be used for replacing
# ---
# returns: added package in JSON format
def processPackage(client, userPackage, mergedNetworkObjectsMap, mergedServiceObjectsMap, mergedTimesGroupsNamesMap,
mergedTimesNamesMap):
printMessageProcessObjects("package")
allExistingLayers = {}
addedPackage = None
if userPackage is not None:
original_package_name = userPackage['Name']
userPackage['Name'] = userPackage['Name'] + "_" + str(uuid.uuid4().hex[:3].upper())
publishCounter = 0
printStatus(None, "processing package: " + userPackage['Name'])
addedPackage = addUserObjectToServer(
client,
"add-package",
{
"name": userPackage['Name'],
"threat-prevention": False,
"tags": userPackage['Tags']
},
changeName=False
)
if addedPackage is None:
printStatus(None, "REPORT: " + userPackage['Name'] + " package is not added")
return addedPackage
printStatus(None, "REPORT: " + userPackage['Name'] + " package is added")
printStatus(None, "")
publishCounter = publishUpdate(publishCounter, True)
if userPackage['SubPolicies'] is not None:
for userSubLayer in userPackage['SubPolicies']:
originalName = userSubLayer['Name']
userSubLayer['Name'] = userSubLayer['Name'] + "_" + str(uuid.uuid4().hex[:3].upper())
allExistingLayers[originalName] = userSubLayer['Name']
for rule in userSubLayer['Rules']:
rule['Layer'] = userSubLayer['Name']
if rule['SubPolicyName'] != "":
rule['SubPolicyName'] = allExistingLayers[rule['SubPolicyName']]
printStatus(None, "processing access layer: " + userSubLayer['Name'])

addedSubLayer = addUserObjectToServer(
client,
"add-access-layer",
{
"name": userSubLayer['Name'],
"add-default-rule": False,
"applications-and-url-filtering": userSubLayer['ApplicationsAndUrlFiltering'],
"comments": userSubLayer['Comments'],
"tags": userSubLayer['Tags']
},
changeName=False
)
if addedSubLayer is None:
printStatus(None, "REPORT: " + userSubLayer['Name'] + " layer is not added")
continue
printStatus(None, "REPORT: " + userSubLayer['Name'] + " layer is added")
printStatus(None, "")
publishCounter = publishUpdate(publishCounter, True)
addAccessRules(client, userSubLayer['Rules'], userSubLayer['Name'], False, mergedNetworkObjectsMap,
mergedServiceObjectsMap, mergedTimesGroupsNamesMap, mergedTimesNamesMap)
if userPackage['ParentLayer'] is not None:
userPackage['ParentLayer']['Name'] = userPackage['ParentLayer']['Name'].replace(original_package_name,
userPackage['Name'])
for parentRule in userPackage['ParentLayer']['Rules']:
parentRule['Layer'] = userPackage['ParentLayer']['Name']
if parentRule['SubPolicyName'] != "":
parentRule['SubPolicyName'] = allExistingLayers[parentRule['SubPolicyName']]
addAccessRules(client, userPackage['ParentLayer']['Rules'], "parent", True, mergedNetworkObjectsMap,
mergedServiceObjectsMap, mergedTimesGroupsNamesMap, mergedTimesNamesMap)
return addedPackage


# resolver for nat method type
# typeValue - number of type [ static, hide, nat64, nat46 ]
def getMethodType(typeValue):
_type = "static"
if typeValue == 1:
_type = "hide"
elif typeValue == 2:
_type = "nat64"
elif typeValue == 3:
_type = "nat46"
return _type


# processing and adding to server the CheckPoint NAT rules
# NAT rules are added if package has been added
# client - client object
# addedPackage - added package in JSON format
# userNatRules - the list of NAT rules which will be processed and added to server
# mergedNetworkObjectsMap - map of all network objects (groups is included) which will be used for replacing
# mergedServiceObjectsMap - map of all services objects (groups is included) which will be used for replacing
# ---
# returns: nothing
def processNatRules(client, addedPackage, userNatRules, mergedNetworkObjectsMap, mergedServiceObjectsMap):
printMessageProcessObjects("nat rules")
if addedPackage is None:
printStatus(None, "REPORT: nat rules can not been added because package was not added")
return
publishCounter = 0
for i, userNatRule in enumerate(userNatRules):
userNatRule['Package'] = addedPackage['name']
printStatus(None, "processing nat rule: #" + str(i))
sourceOrig = ""
if userNatRule['Source'] is not None:
sourceOrig = userNatRule['Source']['Name']
sourceOrig = mergedNetworkObjectsMap[sourceOrig] if sourceOrig in mergedNetworkObjectsMap else sourceOrig
destinationOrig = ""
if userNatRule['Destination'] is not None:
destinationOrig = userNatRule['Destination']['Name']
destinationOrig = mergedNetworkObjectsMap[
destinationOrig] if destinationOrig in mergedNetworkObjectsMap else destinationOrig
serviceOrig = ""
if userNatRule['Service'] is not None:
serviceOrig = userNatRule['Service']['Name']
serviceOrig = mergedServiceObjectsMap[
serviceOrig] if serviceOrig in mergedServiceObjectsMap else serviceOrig
sourceTrans = ""
if userNatRule['TranslatedSource'] is not None:
sourceTrans = userNatRule['TranslatedSource']['Name']
sourceTrans = mergedNetworkObjectsMap[
sourceTrans] if sourceTrans in mergedNetworkObjectsMap else sourceTrans
destinationTrans = ""
if userNatRule['TranslatedDestination'] is not None:
destinationTrans = userNatRule['TranslatedDestination']['Name']
destinationTrans = mergedNetworkObjectsMap[
destinationTrans] if destinationTrans in mergedNetworkObjectsMap else destinationTrans
serviceTrans = ""
if userNatRule['TranslatedService'] is not None:
serviceTrans = userNatRule['TranslatedService']['Name']
serviceTrans = mergedServiceObjectsMap[
serviceTrans] if serviceTrans in mergedServiceObjectsMap else serviceTrans
payload = {
"package": userNatRule['Package'],
"position": "bottom",
"comments": userNatRule['Comments'],
"enabled": userNatRule['Enabled'],
"method": getMethodType(userNatRule['Method']),
"original-source": sourceOrig,
"original-destination": destinationOrig,
"original-service": serviceOrig,
"translated-source": sourceTrans,
"translated-destination": destinationTrans,
"translated-service": serviceTrans
}
addedNatRule = addUserObjectToServer(client, "add-nat-rule", payload, changeName=False)
if addedNatRule is not None:
printStatus(None, "REPORT: nat rule is added")
publishCounter = publishUpdate(publishCounter, False)
else:
printStatus(None, "REPORT: nat rule is not added")
printStatus(None, "")
publishCounter = publishUpdate(publishCounter, True)


# START

args_parser = argparse.ArgumentParser()

args_parser._optionals.title = "arguments"

args_parser.add_argument('-m', '--management', default='127.0.0.1',
help="Management server IP address or name. Default: 127.0.0.1")
args_parser.add_argument('--port', type=int,
help="Server port. Default: 443")
mxg = args_parser.add_mutually_exclusive_group(required=True)
mxg.add_argument('-r', '--root', action="store_true",
help="For a logged in administrator that wants to receive SuperUser permissions. Additional login credentials are not required.")
mxg.add_argument('-k', '--key',
help="api_key")
mxg.add_argument('-u', '--user',
help="User name")
args_parser.add_argument('-p', '--password',
help="User password")
args_parser.add_argument('-f', '--file', default='cp_objects.json',
help="JSON file with CheckPoint Objects. Default: cp_objects.json")
args_parser.add_argument('-c', '--context', default='web_api',
help="Context of WebAPI.")
args_parser.add_argument('-t', '--threshold', type=int, default=100,
help="Parameter specifies maximum number of Check Point objects/rules to add before starting publish operation. Default: 100")
args_parser.add_argument('-d', '--domain', default=None,
help="The name/uid of the domain you want to log into in an MDS environment.")
args_parser.add_argument('--replace-from-global-first', default="false",
help="The argument indicates that SmartConnector should use 'Global' objects at first, by default it uses 'Local' objects. [true, false]")
args_parser.add_argument('--reuse-group-name', default="false",
help="The argument indicates that SmartConnector should use reuse the group by name instead "
"of creating a new group, take cautions. [true, false]")

args = args_parser.parse_args()

file_name_log = "smartconnector"
if args.file != "cp_objects.json":
file_name_log += "_" + os.path.splitext(args.file)[0]
file_name_log += ".log"
if os.path.exists(file_name_log):
os.remove(file_name_log)
file_log = open(file_name_log, "w+")

if args.root and args.user and args.key is not None:
print("")
printStatus(None, None, "Command contains ambiguous parameters. User is unexpected when logging in as root.")
print("")
args_parser.print_help()
elif args.root and args.management != '127.0.0.1':
print("")
printStatus(None, None, "Command contains ambiguous parameters. Management is unexpected when logging in as root.")
print("")
args_parser.print_help()
elif not args.root and args.password is None and args.user is not None:
print("")
printStatus(None, None, "No password option is specified.")
print("")
args_parser.print_help()
elif not os.path.isfile(args.file):
print("")
printStatus(None, None, "The file does not exists")
print("")
args_parser.print_help()
elif args.replace_from_global_first.lower() != "true" and args.replace_from_global_first.lower() != "false":
print("")
printStatus(None, None,
"smartconnector.py: error: argument --replace-from-global-first: invalid boolean value: '" + args.replace_from_global_first + "'")
print("")
args_parser.print_help()
elif args.reuse_group_name.lower() != "true" and args.reuse_group_name.lower() != "false":
print("")
printStatus(None, None, "smartconnector.py: error: argument --reuse-group-name: invalid boolean value: '" + args.reuse_group_name + "'")
print("")
else:
if args.replace_from_global_first.lower() == "true":
isReplaceFromGlobalFirst = True
elif args.replace_from_global_first.lower() == "false":
isReplaceFromGlobalFirst = False
printStatus(None, "Input arguments:")
printStatus(None, "root flag is set" if args.root else "root flag is not set")
printStatus(None, "management: " + args.management)
printStatus(None,
"port: " + str(args.port) if args.port is not None else "port: is not set, default value will be used")
printStatus(None, "domain: " + args.domain if args.domain is not None else "domain: is not set")
printStatus(None, "user: " + args.user if args.user is not None else "user: is not set")
printStatus(None, "password: ***" if args.password is not None else "password: is not set")
printStatus(None, "API_KEY: " + args.key if args.key is not None else "API_KEY: is not set")
printStatus(None, "file: " + args.file)
printStatus(None, "threshold: " + str(args.threshold))
printStatus(None, "replace-from-global-first: " + str(isReplaceFromGlobalFirst))
printStatus(None, "reuse-group-name: " + str(args.reuse_group_name).lower())
printStatus(None, "===========================================")
printStatus(None, "reading and parsing processes are started for JSON file: " + args.file)
with open(args.file) as json_file:
json_data = json.load(json_file)
# define lists of CheckPoint Objects
userDomains = []
userHosts = []
userNetworks = []
userRanges = []
userNetGroups = []
userSimpleGateways = []
userZones = []
userServicesTcp = []
userServicesUdp = []
userServicesSctp = [] # is not used in Cisco
userServicesIcmp = [] # is not used in Cisco
userServicesOther = []
userServicesGroups = []
userTimesGroups = []
userTimes = []
userPackage = None
userNatRules = []
for jsonObject in json_data:
if jsonObject is None or 'TypeName' not in jsonObject:
continue
if jsonObject['TypeName'] == 'CheckPoint_Domain':
userDomains.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_Host':
userHosts.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_Network':
userNetworks.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_Range':
userRanges.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_NetworkGroup' or jsonObject['TypeName'] == 'CheckPoint_GroupWithExclusion':
userNetGroups.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_SimpleGateway':
userSimpleGateways.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_Zone':
userZones.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_TcpService':
userServicesTcp.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_UdpService':
userServicesUdp.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_SctpService':
userServicesSctp.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_IcmpService':
userServicesIcmp.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_OtherService':
userServicesOther.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_ServiceGroup':
userServicesGroups.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_TimeGroup':
userTimesGroups.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_Time':
userTimes.append(jsonObject)
if jsonObject['TypeName'] == 'CheckPoint_Package':
userPackage = jsonObject
if jsonObject['TypeName'] == 'CheckPoint_NAT_Rule':
userNatRules.append(jsonObject)

printStatus(None, "reading and parsing processes are completed for JSON file: " + args.file)
client_args = None
if args.port is not None:
client_args = APIClientArgs(server=args.management, port=args.port, context=args.context, user_agent="mgmt_cli_smartmove")
else:
client_args = APIClientArgs(server=args.management, context=args.context, user_agent="mgmt_cli_smartmove")
with APIClient(client_args) as client:
client.debug_file = "api_calls.json"
printStatus(None, "checking fingerprint")
if client.check_fingerprint() is False:
printStatus(None, "Could not get the server's fingerprint - Check connectivity with the server.")
else:
if args.root:
msg = "login as root to "
if args.domain is not None:
msg += args.domain + " domain of local server"
else:
msg += "local server"
printStatus(None, msg)
login_res = client.login_as_root(domain=args.domain)
elif args.user:
msg = "login as " + args.user + " to "
if args.domain is not None:
msg += args.domain + " domain of " + args.management + " server"
else:
msg += args.management + " server"
printStatus(None, msg)
login_res = client.login(args.user, args.password, domain=args.domain)
else:
msg = "login by api key to "
if args.domain is not None:
msg += args.domain + " domain of " + args.management + " server"
else:
msg += args.management + " server"
printStatus(None, msg)

login_res = client.login_with_api_key(args.key, domain=args.domain)

if login_res.success is False:
printStatus(None, f"Login failed: {login_res.error_message}")
else:
printStatus(None, "")
mergedNetworkObjectsMap = {}
mergedNetworkObjectsMap.update(processDomains(client, userDomains))
mergedNetworkObjectsMap.update(processHosts(client, userHosts))
mergedNetworkObjectsMap.update(processNetworks(client, userNetworks))
mergedNetworkObjectsMap.update(processRanges(client, userRanges))
mergedNetworkObjectsMap.update(processNetGroups(client, userNetGroups, mergedNetworkObjectsMap))
mergedNetworkObjectsMap.update(processSimpleGateways(client, userSimpleGateways))
mergedNetworkObjectsMap.update(processZones(client, userZones))
mergedServicesObjectsMap = {}
mergedServicesObjectsMap.update(processServices(client, userServicesTcp, "tcp"))
mergedServicesObjectsMap.update(processServices(client, userServicesUdp, "udp"))
mergedServicesObjectsMap.update(processServices(client, userServicesSctp, "sctp"))
mergedServicesObjectsMap.update(processServices(client, userServicesIcmp, "icmp"))
mergedServicesObjectsMap.update(processServices(client, userServicesOther, "other"))
mergedServicesObjectsMap.update(
processServicesGroups(client, userServicesGroups, mergedServicesObjectsMap))
mergedTimesMap = processTimes(client, userTimes)
mergedTimesGroupsMap = processTimesGroups(client, userTimesGroups, mergedTimesMap)
addedPackage = processPackage(client, userPackage, mergedNetworkObjectsMap, mergedServicesObjectsMap,
mergedTimesGroupsMap, mergedTimesMap)
processNatRules(client, addedPackage, userNatRules, mergedNetworkObjectsMap, mergedServicesObjectsMap)
printStatus(None, "==========")
file_log.close()
# END

 

0 Kudos
Ofir_Shikolski
Employee
Employee

You can contact me offline 🙂 ofirs@checkpoint.com 

0 Kudos
Neil_Deane
Participant

Thanks Ofir, your suggestion via mail worked for me.

In case anyone else has this problem this is what worked when running on a Management server

 

Based on documentation I first used this and got an error

[Expert@hostname:0]# ./smartconnector.py -r
: No such file or directory

@masher  your suggestion - python3 prefix, should have worked (likely a user problem here  ..)

[Expert@hostname:0]# python3 ./smartconnector.py -r
Traceback (most recent call last):
File "./smartconnector.py", line 12, in <module>
from cpapi import APIClient, APIClientArgs
ModuleNotFoundError: No module named 'cpapi'

Above error is because no env variable is set for PYTHONPATH and you must download the SDK from : https://github.com/CheckPointSW/cp_mgmt_api_python_sdk#sdk-usage-from-a-management-machine and add it to some directory on management server

[Expert@hostname:0]# export PYTHONPATH=$PYTHONPATH:/var/tmp/migrate/sdk/cp_mgmt_api_python_sdk-master

Now the CLI works:

[Expert@hostname:0]# python3 ./smartconnector.py -r
Input arguments:
root flag is set

<working>

Ofir suggests: "cp_objects_opt.json is the recommended policy – usually more optimized "

So ideal command is:


python3 ./smartconnector.py -r -f cp_objects_opt.json

CSSBE_Avenger
Explorer

Hi, We hade the same issue, can you share the solution??

 

0 Kudos
the_rock
Legend
Legend

Hey @CSSBE_Avenger ...if you look at last reply from @Neil_Deane , thats what worked for him. 

0 Kudos
Upcoming Events

    CheckMates Events