You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
decort-ansible/library/decort_account.py

370 lines
13 KiB

#!/usr/bin/python
DOCUMENTATION = r'''
---
module: decort_account
description: See L(Module Documentation,https://repository.basistech.ru/BASIS/decort-ansible/wiki/Home).
'''
from typing import Iterable
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.decort_utils import DecortController
class DecortAccount(DecortController):
OBJ = 'account'
def __init__(self):
super().__init__(AnsibleModule(**self.amodule_init_args))
self.check_amodule_args()
@property
def amodule_init_args(self) -> dict:
return self.pack_amodule_init_args(
argument_spec=dict(
access_emails=dict(
type='bool',
),
acl=dict(
type='dict',
options=dict(
mode=dict(
type='str',
choices=[
'match',
'revoke',
'update',
],
default='update',
),
users=dict(
type='list',
required=True,
elements='dict',
options=dict(
rights=dict(
type='str',
choices=['R', 'RCX', 'ARCXDU'],
default='R',
),
id=dict(
type='str',
required=True,
),
),
),
),
),
id=dict(
type='int',
),
name=dict(
type='str',
),
quotas=dict(
type='dict',
options=dict(
cpu=dict(
type='int',
),
disks_size=dict(
type='int',
),
ext_traffic=dict(
type='int',
),
gpu=dict(
type='int',
),
public_ip=dict(
type='int',
),
ram=dict(
type='int',
),
),
),
state=dict(
type='str',
choices=[
'absent',
'absent_permanently',
'confirmed',
'disabled',
'present',
],
default='present',
),
sep_pools=dict(
type='list',
elements='dict',
options=dict(
sep_id=dict(
type='int',
required=True,
),
pool_names=dict(
type='list',
required=True,
elements='str',
),
),
),
),
required_one_of=[
('id', 'name')
],
supports_check_mode=True,
)
def check_amodule_args(self):
"""
Additional Ansible Module arguments validation that
cannot be implemented using Ansible Argument spec.
"""
arg_state = self.aparams['state']
if 'absent' in arg_state:
# Parameters or combinations of parameters that can
# cause changing the object.
changing_params = [
'access_emails',
'acl',
['id', 'name'],
'quotas',
]
check_error = False
for elem in changing_params:
if isinstance(elem, str):
param = elem
if self.aparams[elem] is not None:
self.message(
f'If the parameter "state" is set to'
f' "{arg_state}", then using the parameter'
f' "{param}" is not allowed.'
)
check_error = True
elif isinstance(elem, Iterable):
params = elem
params_using = map(
lambda x: self.aparams[x] is not None, params
)
if all(params_using):
params_str = ', '.join(f'"{p}"' for p in params)
self.message(
f'If the parameter "state" is set to'
f' "{arg_state}", then using the combination'
f' of parameters {params_str} are not allowed.'
)
check_error = True
if check_error:
self.exit(fail=True)
def run(self):
self.get_info()
self.change()
self.exit()
def get_info(self):
# If this is the first getting info
if not getattr(self, 'id', None):
self.id, self.facts = self.account_find(
account_name=self.aparams['name'],
account_id=self.aparams['id'],
)
# If this is a repeated getting info
else:
# If check mode is enabled, there is no needed to
# request info again
if not self.amodule.check_mode:
self.id, self.facts = self.account_find(account_id=self.id)
def change(self):
self.change_state()
self.change_acl()
if self.account_update_args:
self.account_update(account_id=self.id,
**self.account_update_args)
self.get_info()
def change_state(self):
match self.facts:
case None:
self.message(self.MESSAGES.obj_not_found(obj=self.OBJ))
match self.aparams:
case {'state': 'absent' | 'absent_permanently'}:
pass
case {'state': 'confirmed' | 'disabled' | 'present'}:
self.exit(fail=True)
case {'status': 'DESTROYED'}:
match self.aparams:
case {'state': 'absent' | 'absent_permanently'}:
self.message(
self.MESSAGES.obj_deleted(
obj=self.OBJ,
id=self.id,
permanently=True,
already=True,
)
)
case {'state': 'confirmed' | 'disabled' | 'present'}:
self.message(
self.MESSAGES.obj_not_restored(obj=self.OBJ,
id=self.id)
)
self.exit(fail=True)
case {'status': 'DELETED'}:
match self.aparams:
case {'state': 'absent'}:
self.message(
self.MESSAGES.obj_deleted(
obj=self.OBJ,
id=self.id,
permanently=False,
already=True,
)
)
case {'state': 'absent_permanently'}:
self.delete(permanently=True)
case {'state': 'confirmed' | 'present'}:
self.restore()
case {'state': 'disabled'}:
self.restore()
self.disable()
case {'status': 'CONFIRMED'}:
match self.aparams:
case {'state': 'absent'}:
self.delete()
case {'state': 'absent_permanently'}:
self.delete(permanently=True)
case {'state': 'confirmed' | 'present'}:
pass
case {'state': 'disabled'}:
self.disable()
case {'status': 'DISABLED'}:
match self.aparams:
case {'state': 'absent'}:
self.delete()
case {'state': 'absent_permanently'}:
self.delete(permanently=True)
case {'state': 'confirmed'}:
self.enable()
case {'state': 'present' | 'disabled'}:
pass
def delete(self, permanently=False):
self.account_delete(account_id=self.id, permanently=permanently)
self.get_info()
def disable(self):
self.account_disable(account_id=self.id)
self.get_info()
def enable(self):
self.account_enable(account_id=self.id)
self.get_info()
def restore(self):
self.account_restore(account_id=self.id)
self.get_info()
def change_acl(self):
if not self.aparams['acl']:
return
actual_users = {u['userGroupId']: u['right'] for u in self.facts['acl']}
actual_users_ids = set(actual_users.keys())
aparams_acl = self.aparams['acl']
aparams_users = {u['id']: u['rights'] for u in aparams_acl['users']}
aparams_users_ids = set(aparams_users.keys())
del_users_ids = None
upd_users = None
new_users = None
match aparams_acl:
case {'mode': 'revoke'}:
del_users_ids = aparams_users_ids.intersection(actual_users_ids)
case {'mode': 'update' | 'match' as mode}:
new_users_ids = aparams_users_ids.difference(actual_users_ids)
new_users = dict(
u for u in aparams_users.items() if u[0] in new_users_ids
)
upd_users_ids =\
aparams_users_ids.intersection(actual_users_ids)
upd_users = dict()
for id in upd_users_ids:
if actual_users[id] == 'CXDRAU':
actual_user_rights = 'ARCXDU'
else:
actual_user_rights = actual_users[id]
if actual_user_rights != aparams_users[id]:
upd_users[id] = aparams_users[id]
if mode == 'match':
del_users_ids =\
actual_users_ids.difference(aparams_users_ids)
if del_users_ids or new_users or upd_users:
self.account_change_acl(account_id=self.id,
del_users=del_users_ids,
add_users=new_users,
upd_users=upd_users)
self.get_info()
@property
def account_update_args(self) -> dict:
result_args = dict()
aparam_access_emails = self.aparams['access_emails']
if (aparam_access_emails is not None
and self.facts['sendAccessEmails'] != aparam_access_emails):
result_args['access_emails'] = aparam_access_emails
aparam_name = self.aparams['name']
if (self.aparams['id'] and aparam_name
and self.facts['name'] != aparam_name):
result_args['name'] = aparam_name
aparam_quotas = self.aparams['quotas']
if aparam_quotas:
quotas_naming = [
['cpu', 'CU_C', 'cpu_quota'],
['disks_size', 'CU_DM', 'disks_size_quota'],
['ext_traffic', 'CU_NP', 'ext_traffic_quota'],
['gpu', 'gpu_units', 'gpu_quota'],
['public_ip', 'CU_I', 'public_ip_quota'],
['ram', 'CU_M', 'ram_quota'],
]
for aparam, info_key, result_arg in quotas_naming:
current_value = int(self.facts['resourceLimits'][info_key])
if (aparam_quotas[aparam] is not None
and current_value != aparam_quotas[aparam]):
result_args[result_arg] = aparam_quotas[aparam]
aparam_sep_pools = self.aparams['sep_pools']
if aparam_sep_pools is not None:
sep_pools = set()
for sep in aparam_sep_pools:
for pool_name in sep['pool_names']:
sep_pools.add(
f'{sep["sep_id"]}_{pool_name}'
)
if set(self.facts['uniqPools']) != sep_pools:
result_args['sep_pools'] = sep_pools
return result_args
def main():
DecortAccount().run()
if __name__ == '__main__':
main()