#!/usr/bin/python DOCUMENTATION = r''' --- module: decort_security_group description: See L(Module Documentation,https://repository.basistech.ru/BASIS/decort-ansible/wiki/Home). # noqa: E501 ''' from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.decort_utils import DecortController from dynamix_sdk import exceptions as sdk_exceptions import dynamix_sdk.types as sdk_types class DecortSecurityGroup(DecortController): id: int = 0 facts: dict = dict() def __init__(self): super().__init__(AnsibleModule(**self.amodule_init_args)) @property def amodule_init_args(self) -> dict: return self.pack_amodule_init_args( argument_spec=dict( account_id=dict( type='int', ), description=dict( type='str', ), id=dict( type='int', ), name=dict( type='str', ), rules=dict( type='dict', options=dict( mode=dict( type='str', choices=[ e.value for e in self.SecurityGroupRuleMode ], default=self.SecurityGroupRuleMode.update.value, ), objects=dict( type='list', elements='dict', required=True, options=dict( direction=dict( type='str', choices=( sdk_types.TrafficDirection. _member_names_ ), required=True, ), ethertype=dict( type='str', choices=( sdk_types.SGRuleEthertype. _member_names_ ), ), id=dict( type='int', ), port_range=dict( type='dict', options=dict( min=dict( type='int', ), max=dict( type='int', ), ), ), protocol=dict( type='str', choices=( sdk_types.SGRuleProtocol._member_names_ ), ), remote_net_cidr=dict( type='str', ), ), ), ), ), state=dict( type='str', choices=[e.value for e in self.SecurityGroupState], ), ), supports_check_mode=True, ) @DecortController.handle_sdk_exceptions def run(self): if self.aparams['id'] is not None: self.id = self.aparams['id'] elif self.aparams['name'] is not None: security_groups = self.api.cloudapi.security_group.list( account_id=self.aparams['account_id'], name=self.aparams['name'], ) if security_groups.data: self.id = security_groups.data[0].id if self.id: self.get_info() self.check_amodule_args_for_change() self.change() else: self.check_amodule_args_for_create() self.create() if not self.amodule.check_mode: self.change_rules() if self.result['changed']: self.get_info() self.exit() def get_info(self): try: storage_policy_model = self.api.cloudapi.security_group.get( security_group_id=self.id ) except sdk_exceptions.RequestException as e: if ( e.orig_exception.response and e.orig_exception.response.status_code == 404 ): self.message( self.MESSAGES.obj_not_found( obj='security_group', id=self.id, ) ) self.exit(fail=True) raise e self.facts = storage_policy_model.model_dump() def check_amodule_args_for_create(self): check_errors = False aparam_account_id = self.aparams['account_id'] if aparam_account_id is None: check_errors = True self.message( msg='Check for parameter "account_id" failed: ' 'account_id must be specified when creating ' 'a new security group' ) aparam_name = self.aparams['name'] if aparam_name is None: check_errors = True self.message( msg='Check for parameter "name" failed: ' 'name must be specified when creating ' 'a new security group' ) aparam_state = self.aparams['state'] if ( aparam_state is not None and aparam_state == self.SecurityGroupState.absent.value ): check_errors = True self.message( msg='Check for parameter "state" failed: ' 'state can not be "absent" when creating ' 'a new security group' ) if self.check_aparam_rules(for_create=True) is False: check_errors = True if check_errors: self.exit(fail=True) def check_amodule_args_for_change(self): check_errors = False if self.check_aparam_rules() is False: check_errors = True if check_errors: self.exit(fail=True) def check_aparam_rules(self, for_create: bool = False): check_errors = False aparam_rules = self.aparams['rules'] if aparam_rules is None: return True mode = aparam_rules['mode'] rules = aparam_rules['objects'] if for_create and mode == self.SecurityGroupRuleMode.delete.value: check_errors = True self.message( msg='Check for parameter "rules.mode" failed: ' 'mode can not be "delete" when creating ' 'new security group' ) sg_rules_ids = [] if self.facts.get('rules'): sg_rules_ids = [rule['id'] for rule in self.facts['rules']] for rule in rules: rule_id = rule.get('id') if rule_id is not None: if for_create: check_errors = True self.message( msg='Check for parameter "rules.objects.id" failed: ' 'can not set rule id when creating new ' 'security group' ) elif rule_id not in sg_rules_ids: check_errors = True self.message( msg='Check for parameter "rules.objects.id" failed: ' f'rule ID {rule_id} not found for ' f'security group ID {self.id}' ) if mode == self.SecurityGroupRuleMode.delete.value: for param, value in rule.items(): if param != 'id' and value is not None: check_errors = True self.message( msg='Check for parameter "rules.objects" ' 'failed: only rule id can be specified if' 'rules.mode is "delete"' ) break elif mode == self.SecurityGroupRuleMode.delete.value: check_errors = True self.message( msg='Check for parameter "rules.objects" ' 'failed: rule id must be specified if mode is delete' ) return not check_errors def create(self): id = self.sdk_checkmode(self.api.cloudapi.security_group.create)( account_id=self.aparams['account_id'], name=self.aparams['name'], description=self.aparams['description'], ) if id: self.id = id def change(self): self.change_state() self.change_params() self.change_rules() def change_state(self): if self.aparams['state'] == self.SecurityGroupState.absent.value: self.delete() def change_params(self): aparam_name = self.aparams['name'] aparam_description = self.aparams['description'] new_name, new_description = None, None if ( aparam_name is not None and aparam_name != self.facts['name'] ): new_name = aparam_name if ( aparam_description is not None and aparam_description != self.facts['description'] ): new_description = aparam_description if new_name or new_description: self.sdk_checkmode(self.api.cloudapi.security_group.update)( security_group_id=self.id, name=new_name, description=new_description, ) def change_rules(self): aparam_rules = self.aparams['rules'] if aparam_rules is not None: rules = aparam_rules['objects'] match aparam_rules['mode']: case self.SecurityGroupRuleMode.delete.value: for rule in rules: self.security_group_detele_rule( security_group_id=self.id, rule_id=rule['id'], ) case self.SecurityGroupRuleMode.match.value: for rule in rules: if rule.get('id') is None: self.create_rule(rule=rule) sg_rules_ids = set( [rule['id'] for rule in self.facts['rules']] ) aparam_rules_ids = set( [rule['id'] for rule in rules if rule.get('id')] ) rules_ids_to_delete = sg_rules_ids - aparam_rules_ids for rule_id in rules_ids_to_delete: self.security_group_detele_rule( security_group_id=self.id, rule_id=rule_id, ) case self.SecurityGroupRuleMode.update.value: for rule in rules: if rule.get('id') is None: self.create_rule(rule=rule) def delete(self): self.sdk_checkmode(self.api.cloudapi.security_group.delete)( security_group_id=self.id, ) self.facts = {} self.exit() def create_rule(self, rule: dict): port_range_min, port_range_max = None, None if rule.get('port_range'): port_range_min = rule['port_range'].get('min') port_range_max = rule['port_range'].get('max') self.sdk_checkmode(self.api.cloudapi.security_group.create_rule)( security_group_id=self.id, traffic_direction=( sdk_types.TrafficDirection[rule['direction']] ), ethertype=( sdk_types.SGRuleEthertype[rule['ethertype']] if rule.get('ethertype') else sdk_types.SGRuleEthertype.IPV4 ), protocol=( sdk_types.SGRuleProtocol[rule['protocol']] if rule.get('protocol') else None ), port_range_min=port_range_min, port_range_max=port_range_max, remote_net_cidr=rule.get('remote_net_cidr'), ) def main(): DecortSecurityGroup().run() if __name__ == '__main__': main()