#!/usr/bin/python # Copyright: ... # ... DOCUMENTATION = r''' --- module: decort_account version_added: "2.16" description: See L(Module Documentation,https://repository.basistech.ru/BASIS/decort-ansible/wiki/Home). ''' # EXAMPLES = r''' # ''' # RETURN = r''' # ''' from typing import Iterable from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.decort_utils import DecortController class DecortAccount(DecortController): OBJ = 'account' def __init__(self): super().__init__(AnsibleModule(**self.amodule_init_args)) self.check_amodule_args() @property def amodule_init_args(self) -> dict: return self.pack_amodule_init_args( argument_spec=dict( access_emails=dict( type='bool', ), acl=dict( type='dict', options=dict( mode=dict( type='str', choices=[ 'match', 'revoke', 'update', ], default='update', ), users=dict( type='list', required=True, elements='dict', options=dict( rights=dict( type='str', choices=['R', 'RCX', 'ARCXDU'], default='R', ), id=dict( type='str', required=True, ), ), ), ), ), id=dict( type='int', ), name=dict( type='str', ), quotas=dict( type='dict', options=dict( cpu=dict( type='int', ), disks_size=dict( type='int', ), ext_traffic=dict( type='int', ), gpu=dict( type='int', ), public_ip=dict( type='int', ), ram=dict( type='int', ), ), ), state=dict( type='str', choices=[ 'absent', 'absent_permanently', 'confirmed', 'disabled', 'present', ], default='present', ), ), required_one_of=[ ('id', 'name') ], supports_check_mode=True, ) def check_amodule_args(self): """ Additional Ansible Module arguments validation that cannot be implemented using Ansible Argument spec. """ arg_state = self.aparams['state'] if 'absent' in arg_state: # Parameters or combinations of parameters that can # cause changing the object. changing_params = [ 'access_emails', 'acl', ['id', 'name'], 'quotas', ] check_error = False for elem in changing_params: if isinstance(elem, str): param = elem if self.aparams[elem] is not None: self.message( f'If the parameter "state" is set to' f' "{arg_state}", then using the parameter' f' "{param}" is not allowed.' ) check_error = True elif isinstance(elem, Iterable): params = elem params_using = map( lambda x: self.aparams[x] is not None, params ) if all(params_using): params_str = ', '.join(f'"{p}"' for p in params) self.message( f'If the parameter "state" is set to' f' "{arg_state}", then using the combination' f' of parameters {params_str} are not allowed.' ) check_error = True if check_error: self.exit(fail=True) def run(self): self.get_info() self.change() self.exit() def get_info(self): # If this is the first getting info if not getattr(self, 'id', None): self.id, self.facts = self.account_find( account_name=self.aparams['name'], account_id=self.aparams['id'], ) # If this is a repeated getting info else: # If check mode is enabled, there is no needed to # request info again if not self.amodule.check_mode: self.id, self.facts = self.account_find(account_id=self.id) def change(self): self.change_state() self.change_acl() if self.account_update_args: self.account_update(account_id=self.id, **self.account_update_args) self.get_info() def change_state(self): match self.facts: case None: self.message(self.MESSAGES.obj_not_found(obj=self.OBJ)) match self.aparams: case {'state': 'absent' | 'absent_permanently'}: pass case {'state': 'confirmed' | 'disabled' | 'present'}: self.exit(fail=True) case {'status': 'DESTROYED'}: match self.aparams: case {'state': 'absent' | 'absent_permanently'}: self.message( self.MESSAGES.obj_deleted( obj=self.OBJ, id=self.id, permanently=True, already=True, ) ) case {'state': 'confirmed' | 'disabled' | 'present'}: self.message( self.MESSAGES.obj_not_restored(obj=self.OBJ, id=self.id) ) self.exit(fail=True) case {'status': 'DELETED'}: match self.aparams: case {'state': 'absent'}: self.message( self.MESSAGES.obj_deleted( obj=self.OBJ, id=self.id, permanently=False, already=True, ) ) case {'state': 'absent_permanently'}: self.delete(permanently=True) case {'state': 'confirmed' | 'present'}: self.restore() case {'state': 'disabled'}: self.restore() self.disable() case {'status': 'CONFIRMED'}: match self.aparams: case {'state': 'absent'}: self.delete() case {'state': 'absent_permanently'}: self.delete(permanently=True) case {'state': 'confirmed' | 'present'}: pass case {'state': 'disabled'}: self.disable() case {'status': 'DISABLED'}: match self.aparams: case {'state': 'absent'}: self.delete() case {'state': 'absent_permanently'}: self.delete(permanently=True) case {'state': 'confirmed'}: self.enable() case {'state': 'present' | 'disabled'}: pass def delete(self, permanently=False): self.account_delete(account_id=self.id, permanently=permanently) self.get_info() def disable(self): self.account_disable(account_id=self.id) self.get_info() def enable(self): self.account_enable(account_id=self.id) self.get_info() def restore(self): self.account_restore(account_id=self.id) self.get_info() def change_acl(self): if not self.aparams['acl']: return actual_users = {u['userGroupId']: u['right'] for u in self.facts['acl']} actual_users_ids = set(actual_users.keys()) aparams_acl = self.aparams['acl'] aparams_users = {u['id']: u['rights'] for u in aparams_acl['users']} aparams_users_ids = set(aparams_users.keys()) del_users_ids = None upd_users = None new_users = None match aparams_acl: case {'mode': 'revoke'}: del_users_ids = aparams_users_ids.intersection(actual_users_ids) case {'mode': 'update' | 'match' as mode}: new_users_ids = aparams_users_ids.difference(actual_users_ids) new_users = dict( u for u in aparams_users.items() if u[0] in new_users_ids ) upd_users_ids =\ aparams_users_ids.intersection(actual_users_ids) upd_users = dict() for id in upd_users_ids: if actual_users[id] == 'CXDRAU': actual_user_rights = 'ARCXDU' else: actual_user_rights = actual_users[id] if actual_user_rights != aparams_users[id]: upd_users[id] = aparams_users[id] if mode == 'match': del_users_ids =\ actual_users_ids.difference(aparams_users_ids) if del_users_ids or new_users or upd_users: self.account_change_acl(account_id=self.id, del_users=del_users_ids, add_users=new_users, upd_users=upd_users) self.get_info() @property def account_update_args(self) -> dict: result_args = dict() aparam_access_emails = self.aparams['access_emails'] if (aparam_access_emails is not None and self.facts['sendAccessEmails'] != aparam_access_emails): result_args['access_emails'] = aparam_access_emails aparam_name = self.aparams['name'] if (self.aparams['id'] and aparam_name and self.facts['name'] != aparam_name): result_args['name'] = aparam_name aparam_quotas = self.aparams['quotas'] if aparam_quotas: quotas_naming = [ ['cpu', 'CU_C', 'cpu_quota'], ['disks_size', 'CU_DM', 'disks_size_quota'], ['ext_traffic', 'CU_NP', 'ext_traffic_quota'], ['gpu', 'gpu_units', 'gpu_quota'], ['public_ip', 'CU_I', 'public_ip_quota'], ['ram', 'CU_M', 'ram_quota'], ] for aparam, info_key, result_arg in quotas_naming: current_value = int(self.facts['resourceLimits'][info_key]) if (aparam_quotas[aparam] is not None and current_value != aparam_quotas[aparam]): result_args[result_arg] = aparam_quotas[aparam] return result_args def main(): DecortAccount().run() if __name__ == '__main__': main()