#!/usr/bin/python DOCUMENTATION = r''' --- module: decort_security_group description: See L(Module Documentation,https://repository.basistech.ru/BASIS/decort-ansible/wiki/Home). # noqa: E501 ''' from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.decort_utils import DecortController class DecortSecurityGroup(DecortController): id: int = 0 facts: dict = dict() def __init__(self): super().__init__(AnsibleModule(**self.amodule_init_args)) @property def amodule_init_args(self) -> dict: return self.pack_amodule_init_args( argument_spec=dict( account_id=dict( type='int', ), description=dict( type='str', ), id=dict( type='int', ), name=dict( type='str', ), rules=dict( type='dict', options=dict( mode=dict( type='str', choices=[ e.value for e in self.SecurityGroupRuleMode ], default=self.SecurityGroupRuleMode.update.value, ), objects=dict( type='list', elements='dict', required=True, options=dict( direction=dict( type='str', choices=[ e.name for e in self.SecurityGroupRuleDirection ], required=True, ), ethertype=dict( type='str', choices=[ e.name for e in self.SecurityGroupRuleEtherType ], ), id=dict( type='int', ), port_range=dict( type='dict', options=dict( min=dict( type='int', ), max=dict( type='int', ), ), ), protocol=dict( type='str', choices=[ e.name for e in self.SecurityGroupRuleProtocol ], ), remote_ip_prefix=dict( type='str', ), ), ), ), ), state=dict( type='str', choices=[e.value for e in self.SecurityGroupState], ), ), supports_check_mode=True, ) def run(self): if self.aparams['id'] is not None: self.id = self.aparams['id'] elif self.aparams['name'] is not None: security_group = self.security_group_find( account_id=self.aparams['account_id'], name=self.aparams['name'], ) if security_group: self.id = security_group['id'] if self.id: self.get_info() self.check_amodule_args_for_change() self.change() else: self.check_amodule_args_for_create() self.create() if not self.amodule.check_mode: self.change_rules() if self.result['changed']: self.get_info() self.exit() def get_info(self): self.facts: dict = self.security_group_get(id=self.id) self.facts['created_timestamp'] = self.facts.pop('created_at') self.facts['updated_timestamp'] = self.facts.pop('updated_at') for rule in self.facts['rules']: rule['port_range'] = { 'min': rule.pop('port_range_min'), 'max': rule.pop('port_range_max'), } def check_amodule_args_for_create(self): check_errors = False aparam_account_id = self.aparams['account_id'] if aparam_account_id is None: check_errors = True self.message( msg='Check for parameter "account_id" failed: ' 'account_id must be specified when creating ' 'a new security group' ) aparam_name = self.aparams['name'] if aparam_name is None: check_errors = True self.message( msg='Check for parameter "name" failed: ' 'name must be specified when creating ' 'a new security group' ) aparam_state = self.aparams['state'] if ( aparam_state is not None and aparam_state == self.SecurityGroupState.absent.value ): check_errors = True self.message( msg='Check for parameter "state" failed: ' 'state can not be "absent" when creating ' 'a new security group' ) if self.check_aparam_rules(for_create=True) is False: check_errors = True if check_errors: self.exit(fail=True) def check_amodule_args_for_change(self): check_errors = False if self.check_aparam_rules() is False: check_errors = True if check_errors: self.exit(fail=True) def check_aparam_rules(self, for_create: bool = False): check_errors = False aparam_rules = self.aparams['rules'] if aparam_rules is None: return True mode = aparam_rules['mode'] rules = aparam_rules['objects'] if for_create and mode == self.SecurityGroupRuleMode.delete.value: check_errors = True self.message( msg='Check for parameter "rules.mode" failed: ' 'mode can not be "delete" when creating ' 'new security group' ) sg_rules_ids = [] if self.facts.get('rules'): sg_rules_ids = [rule['id'] for rule in self.facts['rules']] for rule in rules: rule_id = rule.get('id') if rule_id is not None: if for_create: check_errors = True self.message( msg='Check for parameter "rules.objects.id" failed: ' 'can not set rule id when creating new ' 'security group' ) elif rule_id not in sg_rules_ids: check_errors = True self.message( msg='Check for parameter "rules.objects.id" failed: ' f'rule ID {rule_id} not found for ' f'security group ID {self.id}' ) if mode == self.SecurityGroupRuleMode.delete.value: for param, value in rule.items(): if param != 'id' and value is not None: check_errors = True self.message( msg='Check for parameter "rules.objects" ' 'failed: only rule id can be specified if' 'rules.mode is "delete"' ) break elif mode == self.SecurityGroupRuleMode.delete.value: check_errors = True self.message( msg='Check for parameter "rules.objects" ' 'failed: rule id must be specified if mode is delete' ) return not check_errors def create(self): security_groups_by_account_id = self.user_security_groups( account_id=self.aparams['account_id'] ) sg_names = [sg['name'] for sg in security_groups_by_account_id] if self.aparams['name'] not in sg_names: self.id = self.security_group_create( account_id=self.aparams['account_id'], name=self.aparams['name'], description=self.aparams['description'], ) def change(self): self.change_state() self.change_params() self.change_rules() def change_state(self): if self.aparams['state'] == self.SecurityGroupState.absent.value: self.delete() def change_params(self): aparam_name = self.aparams['name'] aparam_description = self.aparams['description'] new_name, new_description = None, None if ( aparam_name is not None and aparam_name != self.facts['name'] ): new_name = aparam_name if ( aparam_description is not None and aparam_description != self.facts['description'] ): new_description = aparam_description if new_name or new_description: self.security_group_update( security_group_id=self.id, name=new_name, description=new_description, ) def change_rules(self): aparam_rules = self.aparams['rules'] if aparam_rules is not None: rules = aparam_rules['objects'] match aparam_rules['mode']: case self.SecurityGroupRuleMode.delete.value: for rule in rules: self.security_group_detele_rule( security_group_id=self.id, rule_id=rule['id'], ) case self.SecurityGroupRuleMode.match.value: for rule in rules: if rule.get('id') is None: self.create_rule(rule=rule) sg_rules_ids = set( [rule['id'] for rule in self.facts['rules']] ) aparam_rules_ids = set( [rule['id'] for rule in rules if rule.get('id')] ) rules_ids_to_delete = sg_rules_ids - aparam_rules_ids for rule_id in rules_ids_to_delete: self.security_group_detele_rule( security_group_id=self.id, rule_id=rule_id, ) case self.SecurityGroupRuleMode.update.value: for rule in rules: if rule.get('id') is None: self.create_rule(rule=rule) def delete(self): self.security_group_detele(security_group_id=self.id) self.facts = {} self.exit() def create_rule(self, rule: dict): port_range_min, port_range_max = None, None if rule.get('port_range'): port_range_min = rule['port_range'].get('min') port_range_max = rule['port_range'].get('max') self.security_group_create_rule( security_group_id=self.id, direction=self.SecurityGroupRuleDirection[rule['direction']], ethertype=( self.SecurityGroupRuleEtherType[rule['ethertype']] if rule.get('ethertype') else None ), protocol=( self.SecurityGroupRuleProtocol[rule['protocol']] if rule.get('protocol') else None ), port_range_min=port_range_min, port_range_max=port_range_max, remote_ip_prefix=rule.get('remote_ip_prefix'), ) def main(): DecortSecurityGroup().run() if __name__ == '__main__': main()